diff options
-rw-r--r-- | cleaner.go | 36 | ||||
-rw-r--r-- | config.go | 11 | ||||
-rw-r--r-- | config.json | 9 | ||||
-rw-r--r-- | main.go | 19 | ||||
-rw-r--r-- | sqlQueries.go | 47 |
5 files changed, 65 insertions, 57 deletions
@@ -9,9 +9,9 @@ import ( "time" ) -func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft int, err error) { +func cleanData(cfg *Config) (rowsLeft int, err error) { - db, err := sql.Open("mysql", db_user+":"+db_pass+"@"+db_conn+"/"+db_name) + db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { log.Println("Failed to connect to db") return @@ -19,25 +19,25 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft defer db.Close() //Remove the processed mark on entries older than 6 hours - err = reprocess(db, time.Now().Add(-1*time.Hour)) + err = reprocess(db, cfg, time.Now().Add(-1*time.Hour)) if err != nil { return } - interval, err := conf.getInterval() + interval, err := cfg.getInterval() if err != nil { return } cleanLimit := time.Now().Add(-2 * interval) //Fetch data that should be cleaned - rDat, err := fetchRawData(db, cleanLimit, conf.Limit) + rDat, err := fetchRawData(db, cfg, cleanLimit) if err != nil { log.Println("Faild to fetch raw data") return } - cDat, err := clean(rDat, conf) + cDat, err := clean(rDat, cfg) if err != nil { log.Println("Failed to clean data") return @@ -51,7 +51,7 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft } //save cleaned data - err = insertCleanData(tx, cDat) + err = insertCleanData(tx, cfg, cDat) if err != nil { tx.Rollback() log.Println("Failed to save cleaned data") @@ -59,13 +59,13 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft } //remove old data - err = purgeRawData(tx, rDat) + err = purgeRawData(tx, cfg, rDat) if err != nil { tx.Rollback() log.Println("Failed to remove old data") return } - rowsLeft, err = availableRows(tx, cleanLimit) + rowsLeft, err = availableRows(tx, cfg, cleanLimit) if err != nil { tx.Rollback() log.Println("Failed to fetch available rows") @@ -76,42 +76,42 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft return } -func getTimespan(t time.Time, conf Config) (span time.Time, err error) { +func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) { loc, err := time.LoadLocation(TIMEZONE) if err != nil { return } switch { - case conf.Interval == "5min": //Round the date into 5 minutes + case cfg.Interval == "5min": //Round the date into 5 minutes y, m, d := t.Date() h := t.Hour() min := t.Minute() min = (min / 5) * 5 span = time.Date(y, m, d, h, min, 0, 0, loc) - case conf.Interval == "10min": //Round the date into 10 minutes + case cfg.Interval == "10min": //Round the date into 10 minutes y, m, d := t.Date() h := t.Hour() min := t.Minute() min = (min / 10) * 10 span = time.Date(y, m, d, h, min, 0, 0, loc) - case conf.Interval == "hour": //Round the date into hour + case cfg.Interval == "hour": //Round the date into hour y, m, d := t.Date() h := t.Hour() span = time.Date(y, m, d, h, 0, 0, 0, loc) - case conf.Interval == "day": //Round the date into day + case cfg.Interval == "day": //Round the date into day y, m, d := t.Date() span = time.Date(y, m, d, 0, 0, 0, 0, loc) default: - err = errors.New(fmt.Sprintf("Bad interval in config %s", conf.Interval)) + err = errors.New(fmt.Sprintf("Bad interval in config %s", cfg.Interval)) return } return } -func clean(rDat []rawData, conf Config) (cDat []cleanedData, err error) { +func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) { // collect all ips so we can query for their ip blocks ips := make(map[string]struct{}) for _, rd := range rDat { @@ -124,14 +124,16 @@ func clean(rDat []rawData, conf Config) (cDat []cleanedData, err error) { iplist = append(iplist, ip) } + log.Println("Querying for ip-blocks...") pairs, err := findIPBlock(iplist...) if err != nil { return } + log.Println("ip-blocks returned") for _, rd := range rDat { var tim time.Time - tim, err = getTimespan(rd.time, conf) + tim, err = getTimespan(rd.time, cfg) if err != nil { return } @@ -13,6 +13,13 @@ type Config struct { Limit int `json:limit` Interval string `json:interval` Epsilon float64 `json:epsilon` + + DBConn string `json:DBConn` + DBName string `json:DBName` + RawTable string `json:rawTable` + CleanTable string `json:cleanTable` + DBUser string `json:DBUser` + DBPass string `json:DBPass` } func (cfg *Config) getInterval() (interval time.Duration, err error) { @@ -31,12 +38,12 @@ func (cfg *Config) getInterval() (interval time.Duration, err error) { return } -func readConfig() (conf Config, err error) { +func readConfig() (cfg *Config, err error) { content, err := ioutil.ReadFile("config.json") if err != nil { log.Println(err) } - err = json.Unmarshal(content, &conf) + err = json.Unmarshal(content, &cfg) if err != nil { log.Println(err) } diff --git a/config.json b/config.json index ebb9f81..98e8854 100644 --- a/config.json +++ b/config.json @@ -6,5 +6,12 @@ "limit": 100, "comment Epsilon": "Epsilon is the epsilon value for differential privacy. epsilon < 1 high privacy, 10 < epsilon low privacy. If epsilon is set to 0, differential privacy will not be used.", - "epsilon": 0 + "epsilon": 0, + + "DBConn": "", + "DBName": "test", + "rawTable": "test_raw", + "cleanTable": "test_clean", + "DBUser": "flowcleaner", + "DBPass": "nil", } @@ -10,16 +10,9 @@ import ( //"strings" ) -const ( - DATABASE_USER = "flowcleaner" - DATABASE_PASS = "nil" - DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555) - DATABASE_NAME = "pmacct" -) - func main() { log.Println("Reading config...") - conf, err := readConfig() + cfg, err := readConfig() if err != nil { log.Println("Could not read config") return @@ -37,7 +30,7 @@ func main() { */ log.Print("Cleaning data...") starttime := time.Now() - numOfRowsNotCleaned, err := cleanData(conf, DATABASE_USER, DATABASE_PASS, DATABASE_CONNECTION, DATABASE_NAME) + numOfRowsNotCleaned, err := cleanData(cfg) if err != nil { log.Println(err) log.Println("Exiting...") @@ -47,22 +40,22 @@ func main() { // If either all rows are processed or if there is no limit for the processing // we can safely add noise to the cleaned data - if (numOfRowsNotCleaned == 0 || conf.Limit == 0) && conf.Epsilon >= 0 { + if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 { log.Println("Adding differential privacy noise to processed data...") - db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@"+DATABASE_CONNECTION+"/"+DATABASE_NAME) + db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { log.Println("Failed to connect to db:", err) return } defer db.Close() - ival, err := conf.getInterval() + ival, err := cfg.getInterval() if err != nil { log.Println("erronous interval in conf prevents the privatization of data:", err) return } - err = privatizeCleaned(db, starttime.Add(-2*ival), conf) + err = privatizeCleaned(db, starttime.Add(-2*ival), cfg) if err != nil { log.Println("Failed to privatize data:", err) } diff --git a/sqlQueries.go b/sqlQueries.go index efaffba..6048b4c 100644 --- a/sqlQueries.go +++ b/sqlQueries.go @@ -9,18 +9,17 @@ import ( const ( TIMEZONE = "UTC" - TABLE = "copyOfData" ) //Retrieves limit rawdata entries that are older than tim //limit <= 0 returns all entries that are older than tim -func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) { +func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err error) { var prepSel *sql.Stmt - if limit > 0 { - prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + TABLE + " WHERE stamp_processed IS NULL AND stamp_inserted < ? LIMIT ?") + if cfg.Limit > 0 { + prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ? LIMIT ?") } else { - prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + TABLE + " WHERE stamp_processed IS NULL AND stamp_inserted < ?") + prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ?") } if err != nil { log.Println("Failed to prepare select") @@ -28,8 +27,8 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err } var rows *sql.Rows - if limit > 0 { - rows, err = prepSel.Query(tim, limit) + if cfg.Limit > 0 { + rows, err = prepSel.Query(tim, cfg.Limit) } else { rows, err = prepSel.Query(tim) } @@ -45,7 +44,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err return } - prepUp, err := tx.Prepare("UPDATE " + TABLE + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?") + prepUp, err := tx.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?") if err != nil { log.Println("Failed to prepare update") return @@ -84,8 +83,8 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err } //Removes the stamp_processed from every entry that started being proccesed before tim -func reprocess(db *sql.DB, tim time.Time) (err error) { - stmt, err := db.Prepare("UPDATE " + TABLE + " SET stamp_processed = NULL WHERE stamp_processed < ?") +func reprocess(db *sql.DB, cfg *Config, tim time.Time) (err error) { + stmt, err := db.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = NULL WHERE stamp_processed < ?") if err != nil { return } @@ -95,8 +94,8 @@ func reprocess(db *sql.DB, tim time.Time) (err error) { } //Removes all entries in the database that started being processed before tim -func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { - stmt, err := db.Prepare("DELETE FROM " + TABLE + " WHERE stamp_processed < ? ") +func purgeAllProcessed(db *sql.DB, cfg *Config, tim time.Time) (err error) { + stmt, err := db.Prepare("DELETE FROM " + cfg.RawTable + " WHERE stamp_processed < ? ") if err != nil { return } @@ -106,8 +105,8 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { } //Removes all rawdata that is in rDat from the database -func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) { - prepStmt, err := tx.Prepare("DELETE FROM " + TABLE + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1") +func purgeRawData(tx *sql.Tx, cfg *Config, rDat []rawData) (err error) { + prepStmt, err := tx.Prepare("DELETE FROM " + cfg.RawTable + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1") if err != nil { return } @@ -121,8 +120,8 @@ func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) { return } -func insertCleanData(tx *sql.Tx, cd []cleanedData) error { - prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") +func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error { + prepStmt, err := tx.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") return err @@ -186,11 +185,11 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error { // Adds differential privacy to all entries in the // database that is older than t and haven't had // differential privacy added to them yet. -func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) { - if conf.Epsilon <= 0 { +func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) { + if cfg.Epsilon <= 0 { return } - query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM clean_data WHERE time_added < ?") + query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM " + cfg.CleanTable + " WHERE time_added < ?") if err != nil { log.Println("Failed to prepare query") return @@ -203,7 +202,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) { } defer rows.Close() - update, err := db.Prepare("UPDATE clean_data SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ") + update, err := db.Prepare("UPDATE " + cfg.CleanTable + " SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ") if err != nil { log.Println("Failed to prepare update") return @@ -228,7 +227,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) { return } // Add differential privacy noise - cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon) + cd.occurences = diffpriv(cd.occurences, 1, cfg.Epsilon) // Update the entry _, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time) @@ -239,13 +238,13 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) { return } -func availableRows(tx *sql.Tx, limit time.Time) (numRows int, err error) { - stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + TABLE + " WHERE stamp_inserted < ? ") +func availableRows(tx *sql.Tx, cfg *Config, timeLimit time.Time) (numRows int, err error) { + stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + cfg.RawTable + " WHERE stamp_inserted < ? ") if err != nil { log.Println("Could not prepare statement") return } - row := stmt.QueryRow(limit) + row := stmt.QueryRow(timeLimit) err = row.Scan(&numRows) if err != nil { |