package main import ( "database/sql" _ "github.com/go-sql-driver/mysql" "log" "time" ) const ( TIMEZONE = "UTC" ) //Retrieves all rawdata older than tim func fetchRawData(db *sql.DB, tim time.Time) (rDat []RawData, err error) { prepSel, err := db.Prepare("SELECT ip_src,ip_dst,time,port,packet_size FROM raw_data WHERE time < ?") if err != nil { log.Println("Failed to prepare select") return } rows, err := prepSel.Query(tim) if err != nil { log.Println("Failed to query prepared selection") return } defer rows.Close() tx, err := db.Begin() if err != nil { log.Println("Failed to initialize transaction") return } prepUp, err := tx.Prepare("UPDATE raw_data SET process_time = ? where ip_src = ? AND ip_dst = ? AND time = ? AND port = ? and packet_size = ? AND process_time IS NULL LIMIT 1") if err != nil { log.Println("Failed to prepare update") return } loc, err := time.LoadLocation(TIMEZONE) for rows.Next() { var r RawData var tim []byte err = rows.Scan(&r.ipSrc, &r.ipDst, &tim, &r.port, &r.packetSize) r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc) if err != nil { log.Println("Failed to scan result of query") return } _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) if err != nil { log.Println("Failed to query prepared update") tx.Rollback() return } rDat = append(rDat, r) } tx.Commit() return } //Removes the process_time from every entry that started being proccesed before tim func reprocess(db *sql.DB, tim time.Time) (err error) { stmt, err := db.Prepare("UPDATE raw_data SET process_time = NULL WHERE process_time < ?") if err != nil { return } _, err = stmt.Exec(tim) return } //Removes all entries in the database that started being processed before tim func purgeProcessed(db *sql.DB, tim time.Time) (err error) { stmt, err := db.Prepare("DELETE FROM raw_data WHERE process_time < ? ") if err != nil { return } _, err = stmt.Exec(tim) return } func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? AND process_time IS NOT NULL LIMIT 1") if err != nil { return } for _, r := range rDat { _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) if err != nil { return } } return } func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, tim time.Time, port, occurences int) error { prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurences) VALUES ( ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") return err } _, err = prepStmt.Exec(ipbSrc, ipbDst, tim, port, volume, occurences, occurences) if err != nil { log.Println("Failed to execute statement") return err } return nil } func insertASNIP(db *sql.DB, asn int, ipBlock string) error { prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?") if err != nil { return err } defer prepCheck.Close() rows, err := prepCheck.Exec(ipBlock) if err != nil { return err } if rows != nil { return nil } prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )") if err != nil { return err } defer prepIns.Close() _, err = prepIns.Exec(asn, ipBlock) if err != nil { return err } return nil } func removeASNIP(db *sql.DB, asn int, ipBlock string) error { prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?") if err != nil { return err } defer prepStmt.Close() _, err = prepStmt.Exec(asn, ipBlock) if err != nil { return err } return nil }