diff options
Diffstat (limited to 'cleaner.go')
-rw-r--r-- | cleaner.go | 113 |
1 files changed, 100 insertions, 13 deletions
@@ -2,6 +2,7 @@ package main import ( "database/sql" + "errors" _ "github.com/go-sql-driver/mysql" "time" ) @@ -11,7 +12,8 @@ const ( DATABASE_PASS = "pass" DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555) DATABASE_NAME = "netflow" - PROCESS_BATCH = 3 + MAXIMUM_ENTRIES = 10 + TIMESPAN = "day" ) type RawData struct { @@ -22,14 +24,29 @@ type RawData struct { packetSize int } -func derp() (err error) { +type CleanData struct { + ipbSrc string + ipbDst string + time time.Time + port int + volume string + occurances int +} + +func cleanData() (err error) { db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME) if err != nil { return } defer db.Close() - rDat, err := fetchRawData(db) + //Fetch data that should be cleaned + rDat, err := fetchRawData(db, MAXIMUM_ENTRIES) + if err != nil { + return + } + + cDat, err := clean(rDat) if err != nil { return } @@ -40,7 +57,14 @@ func derp() (err error) { return } - //TODO insertCleanedData + //save cleaned data + for _, cd := range cDat { + err = insertCleanData(tx, cd.ipbSrc, cd.ipbDst, cd.volume, cd.time, cd.port, cd.occurances) + if err != nil { + tx.Rollback() + return + } + } //remove old data err = purgeRawData(tx, rDat) @@ -53,17 +77,79 @@ func derp() (err error) { return } -func insertCleanedData(tx *sql.DB, rDat []RawData) (err error) { +func getVolSize(s int) string { + return "medium" +} + +func getTimespan(t time.Time) (span time.Time, err error) { + loc, err := time.LoadLocation("Local") + switch { + case TIMESPAN == "5min": + + case TIMESPAN == "hour": + + case TIMESPAN == "day": + y, m, d := t.Date() + if err != nil { + return + } + span = time.Date(y, m, d, 0, 0, 0, 0, loc) + default: + err = errors.New("Bad timespan") + return + } return } -func fetchRawData(db *sql.DB) (rDat []RawData, err error) { +func clean(rDat []RawData) (cDat []CleanData, err error) { + // collect all ips so we can query for their ip blocks + var ips map[string]*asnipPair + for _, rd := range rDat { + ips[rd.ipSrc] = nil + ips[rd.ipDst] = nil + } + + var iplist []string + for ip := range ips { + iplist = append(iplist, ip) + } + + pairs, err := findASAndIPBlock(iplist...) + if err != nil { + return + } + for _, p := range pairs { + ips[p.ipAdr] = &p + } + + for _, rd := range rDat { + vol := getVolSize(rd.packetSize) + var tim time.Time + tim, err = getTimespan(rd.time) + if err != nil { + return + } + cDat = append(cDat, + CleanData{ + ipbSrc: ips[rd.ipSrc].ipBlock, + ipbDst: ips[rd.ipDst].ipBlock, + time: tim, + port: rd.port, + volume: vol, + occurances: 1, + }) + } + + return +} + +func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) { prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ") if err != nil { return } - rows, err := prepStmt.Query(PROCESS_BATCH) + rows, err := prepStmt.Query(numRows) if err != nil { return } @@ -79,11 +165,12 @@ func fetchRawData(db *sql.DB) (rDat []RawData, err error) { return } func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { + prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?") + if err != nil { + return + } + for _, r := range rDat { - prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?") - if err != nil { - return - } _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) if err != nil { return @@ -92,8 +179,8 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { return } -func insertCleanData(db *sql.DB, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error { - prepStmt, err := db.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") +func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error { + prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { return err } |