package main import ( "database/sql" "errors" "fmt" _ "github.com/go-sql-driver/mysql" "log" "os" "time" ) var ( cllogger *log.Logger ) func init() { cllogger = log.New(os.Stdout, "[ Cleaner ]", log.LstdFlags) } func cleanFromDB(cfg *Config) (rowsLeft int, err error) { cllogger.Println("Starting to clean from db...") db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { cllogger.Println("Failed to connect to db") return } defer db.Close() //Remove the processed mark on entries older than 6 hours err = reprocess(db, cfg, time.Now().Add(-1*time.Hour)) if err != nil { return } interval, err := cfg.getInterval() if err != nil { return } cleanLimit := time.Now().Add(-2 * interval) //Fetch data that should be cleaned rDat, err := fetchRawData(db, cfg, cleanLimit) if err != nil { cllogger.Println("Faild to fetch raw data") return } cDat, err := clean(rDat, cfg) if err != nil { cllogger.Println("Failed to clean data from db:", err) return } //Begin transaction tx, err := db.Begin() if err != nil { cllogger.Println("Failed to initialize transaction") return } //save cleaned data err = insertCleanData(tx, cfg, cDat) if err != nil { tx.Rollback() cllogger.Println("Failed to save cleaned data") return } //remove old data err = purgeRawData(tx, cfg, rDat) if err != nil { tx.Rollback() cllogger.Println("Failed to remove old data") return } rowsLeft, err = availableRows(tx, cfg, cleanLimit) if err != nil { tx.Rollback() cllogger.Println("Failed to fetch available rows") return } tx.Commit() cllogger.Println("Finished cleaning from db!") return } func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error { cllogger.Println("Starting to clean from stdin...") for rDat := range rDatChan { cDat, err := clean(rDat, cfg) if err != nil { cllogger.Println("Failed to clean data from stdin:", err) return err } err = insertCleanDataToDB(cfg, cDat) if err != nil { cllogger.Println("Failed to insert clean data from stdin:", err) return err } } cllogger.Println("Finished cleaning from stdin!") return nil } func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) { loc, err := time.LoadLocation(TIMEZONE) if err != nil { return } switch { case cfg.Interval == "5min": //Round the date into 5 minutes y, m, d := t.Date() h := t.Hour() min := t.Minute() min = (min / 5) * 5 span = time.Date(y, m, d, h, min, 0, 0, loc) case cfg.Interval == "10min": //Round the date into 10 minutes y, m, d := t.Date() h := t.Hour() min := t.Minute() min = (min / 10) * 10 span = time.Date(y, m, d, h, min, 0, 0, loc) case cfg.Interval == "30min": //Round the date into 10 minutes y, m, d := t.Date() h := t.Hour() min := t.Minute() min = (min / 30) * 30 span = time.Date(y, m, d, h, min, 0, 0, loc) case cfg.Interval == "hour": //Round the date into hour y, m, d := t.Date() h := t.Hour() span = time.Date(y, m, d, h, 0, 0, 0, loc) case cfg.Interval == "day": //Round the date into day y, m, d := t.Date() span = time.Date(y, m, d, 0, 0, 0, 0, loc) default: err = errors.New(fmt.Sprintf("Bad interval in config %s", cfg.Interval)) return } return } func clean(rDat []RawData, cfg *Config) (cDat []cleanedData, err error) { // collect all ips so we can query for their ip blocks ips := make(map[string]struct{}) for _, rd := range rDat { ips[rd.Ip_src] = struct{}{} ips[rd.Ip_dst] = struct{}{} } var iplist []string for ip := range ips { iplist = append(iplist, ip) } pairs, err := findIPBlock(iplist...) if err != nil { return } for _, rd := range rDat { var tim time.Time tim, err = getTimespan(rd.time, cfg) if err != nil { return } cDat = append(cDat, cleanedData{ ipbSrc: pairs[rd.Ip_src], ipbDst: pairs[rd.Ip_dst], asSrc: rd.As_src, asDst: rd.As_dst, portSrc: rd.Port_src, portDst: rd.Port_dst, occurences: rd.Packets, volume: rd.Pkt_len_distrib, time: tim, }) } cDat = removeDups(cDat) return } func removeDups(cDat []cleanedData) []cleanedData { ret := make([]cleanedData, 0) var found bool for ci := range cDat { found = false //Check if an equal struct already is appended for ri := range ret { if ret[ri].equals(&cDat[ci]) { //If found, increase it occurences instead of //appending a new struct ret[ri].occurences += cDat[ci].occurences found = true break } } if !found { //if no equal struct is found //append it ret = append(ret, cDat[ci]) } } return ret }