package main import ( "database/sql" "errors" _ "github.com/go-sql-driver/mysql" "log" "time" ) const ( DATABASE_USER = "root" DATABASE_PASS = "pass" DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555) DATABASE_NAME = "netflow" MAXIMUM_ENTRIES = 10 TIMESPAN = "day" ) type RawData struct { ipSrc string ipDst string time time.Time port int packetSize int } type CleanData struct { ipbSrc string ipbDst string time time.Time port int volume string occurances int } func cleanData() (err error) { db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME) if err != nil { log.Println("Failed to connect to db") return } defer db.Close() //Fetch data that should be cleaned rDat, err := fetchRawData(db, MAXIMUM_ENTRIES) if err != nil { log.Println("Faild to fetch raw data") return } cDat, err := clean(rDat) if err != nil { log.Println("Failed to clean data") return } //Begin transaction tx, err := db.Begin() if err != nil { log.Println("Failed to initialize transaction") return } //save cleaned data for _, cd := range cDat { err = insertCleanData(tx, cd.ipbSrc, cd.ipbDst, cd.volume, cd.time, cd.port, cd.occurances) if err != nil { tx.Rollback() log.Println("Failed to save cleaned data") return } } //remove old data err = purgeRawData(tx, rDat) if err != nil { tx.Rollback() log.Println("Failed to remove old data") return } tx.Commit() return } func getVolSize(s int) string { return "medium" } func getTimespan(t time.Time) (span time.Time, err error) { loc, err := time.LoadLocation("Local") switch { case TIMESPAN == "5min": case TIMESPAN == "hour": case TIMESPAN == "day": y, m, d := t.Date() if err != nil { return } span = time.Date(y, m, d, 0, 0, 0, 0, loc) default: err = errors.New("Bad timespan") return } return } func clean(rDat []RawData) (cDat []CleanData, err error) { // collect all ips so we can query for their ip blocks var ips map[string]*asnipPair for _, rd := range rDat { ips[rd.ipSrc] = nil ips[rd.ipDst] = nil } var iplist []string for ip := range ips { iplist = append(iplist, ip) } pairs, err := findASAndIPBlock(iplist...) if err != nil { return } for _, p := range pairs { ips[p.ipAdr] = &p } for _, rd := range rDat { vol := getVolSize(rd.packetSize) var tim time.Time tim, err = getTimespan(rd.time) if err != nil { return } cDat = append(cDat, CleanData{ ipbSrc: ips[rd.ipSrc].ipBlock, ipbDst: ips[rd.ipDst].ipBlock, time: tim, port: rd.port, volume: vol, occurances: 1, }) } return } func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) { prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ") if err != nil { log.Println("Failed to prepare statement") return } rows, err := prepStmt.Query(numRows) if err != nil { log.Println("Failed to query prepared statement") return } for rows.Next() { var r RawData err = rows.Scan(&r.ipSrc, &r.ipDst, &r.time, &r.port, &r.packetSize) if err != nil { log.Println("Failed to scan result of query") return } rDat = append(rDat, r) } return } func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?") if err != nil { return } for _, r := range rDat { _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) if err != nil { return } } return } func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error { prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") return err } _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences) if err != nil { log.Println("Failed to execute statement") return err } return nil } func insertASNIP(db *sql.DB, asn int, ipBlock string) error { prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?") if err != nil { return err } defer prepCheck.Close() rows, err := prepCheck.Exec(ipBlock) if err != nil { return err } if rows != nil { return nil } prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )") if err != nil { return err } defer prepIns.Close() _, err = prepIns.Exec(asn, ipBlock) if err != nil { return err } return nil } func removeASNIP(db *sql.DB, asn int, ipBlock string) error { prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?") if err != nil { return err } defer prepStmt.Close() _, err = prepStmt.Exec(asn, ipBlock) if err != nil { return err } return nil }