package main import ( "database/sql" _ "github.com/go-sql-driver/mysql" "log" "time" ) const ( TIMEZONE = "UTC" ) //Retrieves limit rawdata entries that are older than tim //limit <= 0 returns all entries that are older than tim func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) { var prepSel *sql.Stmt if limit > 0 { prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM acct WHERE stamp_processed IS NULL AND stamp_inserted < ? LIMIT ?") } else { prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM acct WHERE stamp_processed IS NULL AND stamp_inserted < ?") } if err != nil { log.Println("Failed to prepare select") return } var rows *sql.Rows if limit > 0 { rows, err = prepSel.Query(tim, limit) } else { rows, err = prepSel.Query(tim) } if err != nil { log.Println("Failed to query prepared selection") return } defer rows.Close() tx, err := db.Begin() if err != nil { log.Println("Failed to initialize transaction") return } prepUp, err := tx.Prepare("UPDATE acct SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?") if err != nil { log.Println("Failed to prepare update") return } loc, err := time.LoadLocation(TIMEZONE) for rows.Next() { var r rawData var tim []byte err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim) if err != nil { log.Println("Failed to scan result of query") return } r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc) if err != nil { log.Println("Failed to parse timestamp") return } _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist, r.time) if err != nil { log.Println("Failed to query prepared update") tx.Rollback() return } rDat = append(rDat, r) } tx.Commit() return } //Removes the stamp_processed from every entry that started being proccesed before tim func reprocess(db *sql.DB, tim time.Time) (err error) { stmt, err := db.Prepare("UPDATE acct SET stamp_processed = NULL WHERE stamp_processed < ?") if err != nil { return } _, err = stmt.Exec(tim) return } //Removes all entries in the database that started being processed before tim func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { stmt, err := db.Prepare("DELETE FROM acct WHERE stamp_processed < ? ") if err != nil { return } _, err = stmt.Exec(tim) return } //Removes all rawdata that is in rDat from the database func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) { prepStmt, err := tx.Prepare("DELETE FROM acct WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1") if err != nil { return } for _, r := range rDat { _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist) if err != nil { return } } return } func insertCleanData(tx *sql.Tx, cd []cleanedData) error { prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") return err } for ix := range cd { _, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences) if err != nil { log.Println("Failed to execute statement") return err } } return nil } func insertASNIP(db *sql.DB, asn int, ipBlock string) error { prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?") if err != nil { return err } defer prepCheck.Close() rows, err := prepCheck.Exec(ipBlock) if err != nil { return err } if rows != nil { return nil } prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )") if err != nil { return err } defer prepIns.Close() _, err = prepIns.Exec(asn, ipBlock) if err != nil { return err } return nil } func removeASNIP(db *sql.DB, asn int, ipBlock string) error { prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?") if err != nil { return err } defer prepStmt.Close() _, err = prepStmt.Exec(asn, ipBlock) if err != nil { return err } return nil } // Adds differential privacy to all entries in the // database that is older than t and haven't had // differential privacy added to them yet. func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) { query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time,occurences FROM clean_data WHERE time_added < ? FOR UPDATE") if err != nil { log.Println("Failed to prepare query") return } rows, err := query.Query(t) if err != nil { log.Println("Failed to query for unprivitized rows") return } update, err := db.Prepare("UPDATE clean_data SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ") if err != nil { log.Println("Failed to prepare update") return } var cd cleanedData for rows.Next() { err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &cd.time, &cd.occurences) if err != nil { log.Println("Failed to scan row") return } // Add differential privacy noise cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon) // Update the entry _, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time) if err != nil { log.Println(err) } } return } func availableRows(tx *sql.Tx, limit time.Time) (numRows int, err error) { stmt, err := tx.Prepare("SELECT COUNT(*) FROM acct WHERE stamp_inserted < ? ") if err != nil { log.Println("Could not prepare statement") return } row := stmt.QueryRow(limit) err = row.Scan(&numRows) if err != nil { log.Println("Failed to scan result") } return }