diff options
-rw-r--r-- | cleaner.go | 32 | ||||
-rw-r--r-- | config.go | 19 | ||||
-rw-r--r-- | datastructs.go | 6 | ||||
-rw-r--r-- | diffpriv.go | 2 | ||||
-rw-r--r-- | main.go | 17 | ||||
-rw-r--r-- | sqlQueries.go | 53 |
6 files changed, 98 insertions, 31 deletions
@@ -19,24 +19,14 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro defer db.Close() //Remove the processed mark on entries older than 6 hours - err = reprocess(db, time.Now().Add(-6*time.Hour)) + err = reprocess(db, time.Now().Add(-1*time.Hour)) if err != nil { return } - var interval time.Duration - switch conf.Interval { - case "5min": - interval = time.Minute * 5 - case "10min": - interval = time.Minute * 10 - case "hour": - interval = time.Hour - case "day": - interval = time.Hour * 24 - default: - err = errors.New(fmt.Sprintf("Invalid interval: %s", conf.Interval)) - return + interval, err := conf.getInterval() + if err != nil { + return err } //Fetch data that should be cleaned rDat, err := fetchRawData(db, time.Now().Add(-2*interval), conf.Limit) @@ -51,11 +41,6 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro return } - //Add noise for differential privacy - for i := range cDat { - cDat[i].occurences = diffpriv(cDat[i].occurences, 1, conf.Epsilon) - } - //Begin transaction tx, err := db.Begin() if err != nil { @@ -82,6 +67,7 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro } tx.Commit() + return } @@ -120,7 +106,7 @@ func getTimespan(t time.Time, conf Config) (span time.Time, err error) { return } -func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { +func clean(rDat []rawData, conf Config) (cDat []cleanedData, err error) { // collect all ips so we can query for their ip blocks ips := make(map[string]struct{}) for _, rd := range rDat { @@ -145,7 +131,7 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { return } cDat = append(cDat, - CleanData{ + cleanedData{ ipbSrc: pairs[rd.ipSrc], ipbDst: pairs[rd.ipDst], asSrc: rd.asSrc, @@ -163,8 +149,8 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { return } -func removeDups(cDat []CleanData) []CleanData { - ret := make([]CleanData, 0) +func removeDups(cDat []cleanedData) []cleanedData { + ret := make([]cleanedData, 0) var found bool for ci := range cDat { found = false @@ -2,8 +2,11 @@ package main import ( "encoding/json" + "errors" + "fmt" "io/ioutil" "log" + "time" ) type Config struct { @@ -12,6 +15,22 @@ type Config struct { Epsilon float64 `json:epsilon` } +func (cfg *Config) getInterval() (interval time.Duration, err error) { + switch cfg.Interval { + case "5min": + interval = time.Minute * 5 + case "10min": + interval = time.Minute * 10 + case "hour": + interval = time.Hour + case "day": + interval = time.Hour * 24 + default: + err = errors.New(fmt.Sprintf("Invalid interval: %s", cfg.Interval)) + } + return +} + func readConfig() (conf Config, err error) { content, err := ioutil.ReadFile("config.json") if err != nil { diff --git a/datastructs.go b/datastructs.go index 4a30da7..8220c62 100644 --- a/datastructs.go +++ b/datastructs.go @@ -4,7 +4,7 @@ import ( "time" ) -type RawData struct { +type rawData struct { ipSrc string ipDst string asSrc int @@ -16,7 +16,7 @@ type RawData struct { time time.Time } -type CleanData struct { +type cleanedData struct { ipbSrc string ipbDst string asSrc int @@ -28,7 +28,7 @@ type CleanData struct { time time.Time } -func (cd *CleanData) equals(other *CleanData) bool { +func (cd *cleanedData) equals(other *cleanedData) bool { return cd.ipbSrc == other.ipbSrc && cd.ipbDst == other.ipbDst && cd.asSrc == other.asSrc && diff --git a/diffpriv.go b/diffpriv.go index 382ef37..fb6d2fc 100644 --- a/diffpriv.go +++ b/diffpriv.go @@ -19,7 +19,7 @@ func diffpriv(value int, sensitivity, epsilon float64) int { return 0 } noise := laplaceDist(0, sensitivity/epsilon) - return round(float64(value) + noise) + return round(math.Abs(float64(value) + noise)) } // Returns a random value from a laplace @@ -2,8 +2,11 @@ package main import ( "bufio" + "database/sql" + _ "github.com/go-sql-driver/mysql" "log" "os" + "time" //"strings" ) @@ -30,10 +33,24 @@ func main() { } } */ + starttime := time.Now() err = cleanData(conf, DATABASE_USER, DATABASE_PASS, DATABASE_CONNECTION, DATABASE_NAME) if err != nil { log.Println(err) } + + db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@"+DATABASE_CONNECTION+"/"+DATABASE_NAME) + if err != nil { + log.Println("Failed to connect to db") + return + } + defer db.Close() + ival, err := conf.getInterval() + if err != nil { + log.Println("erronous interval in conf prevents the privatization of data") + return + } + privatizeCleaned(db, starttime.Add(-2*ival), conf) } //Starts a process that reads from stdin and diff --git a/sqlQueries.go b/sqlQueries.go index 05c08c1..65b0a7a 100644 --- a/sqlQueries.go +++ b/sqlQueries.go @@ -13,7 +13,7 @@ const ( //Retrieves limit rawdata entries that are older than tim //limit <= 0 returns all entries that are older than tim -func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err error) { +func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) { var prepSel *sql.Stmt if limit > 0 { @@ -52,7 +52,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err loc, err := time.LoadLocation(TIMEZONE) for rows.Next() { - var r RawData + var r rawData var tim []byte err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim) if err != nil { @@ -101,7 +101,7 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { } //Removes all rawdata that is in rDat from the database -func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { +func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) { prepStmt, err := tx.Prepare("DELETE FROM acct WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1") if err != nil { return @@ -116,7 +116,7 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { return } -func insertCleanData(tx *sql.Tx, cd *CleanData) error { +func insertCleanData(tx *sql.Tx, cd *cleanedData) error { prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") @@ -175,3 +175,48 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error { return nil } + +// Adds differential privacy to all entries in the +// database that is older than t and haven't had +// differential privacy added to them yet. +// If epsilon == 0 in conf. Then nothing is done. +func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) { + if conf.Epsilon == 0 { + return + } + query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time,occurences FROM clean_data WHERE time < ? FOR UPDATE") + if err != nil { + log.Println("Failed to prepare query") + return + } + + rows, err := query.Query(t) + if err != nil { + log.Println("Failed to query for unprivitized rows") + return + } + + update, err := db.Prepare("UPDATE clean_data SET occurences = ? , privitazied = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time = ? ") + if err != nil { + log.Println("Failed to prepare update") + return + } + + var cd cleanedData + for rows.Next() { + err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &cd.time, &cd.occurences) + if err != nil { + log.Println("Failed to scan row") + return + } + // Add differential privacy noise + cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon) + + // Update the entry + _, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time) + if err != nil { + log.Println(err) + } + } + return +} |