diff options
Diffstat (limited to 'sqlQueries.go')
-rw-r--r-- | sqlQueries.go | 53 |
1 files changed, 49 insertions, 4 deletions
diff --git a/sqlQueries.go b/sqlQueries.go index 05c08c1..65b0a7a 100644 --- a/sqlQueries.go +++ b/sqlQueries.go @@ -13,7 +13,7 @@ const ( //Retrieves limit rawdata entries that are older than tim //limit <= 0 returns all entries that are older than tim -func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err error) { +func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) { var prepSel *sql.Stmt if limit > 0 { @@ -52,7 +52,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err loc, err := time.LoadLocation(TIMEZONE) for rows.Next() { - var r RawData + var r rawData var tim []byte err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim) if err != nil { @@ -101,7 +101,7 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { } //Removes all rawdata that is in rDat from the database -func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { +func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) { prepStmt, err := tx.Prepare("DELETE FROM acct WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1") if err != nil { return @@ -116,7 +116,7 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { return } -func insertCleanData(tx *sql.Tx, cd *CleanData) error { +func insertCleanData(tx *sql.Tx, cd *cleanedData) error { prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") @@ -175,3 +175,48 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error { return nil } + +// Adds differential privacy to all entries in the +// database that is older than t and haven't had +// differential privacy added to them yet. +// If epsilon == 0 in conf. Then nothing is done. +func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) { + if conf.Epsilon == 0 { + return + } + query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time,occurences FROM clean_data WHERE time < ? FOR UPDATE") + if err != nil { + log.Println("Failed to prepare query") + return + } + + rows, err := query.Query(t) + if err != nil { + log.Println("Failed to query for unprivitized rows") + return + } + + update, err := db.Prepare("UPDATE clean_data SET occurences = ? , privitazied = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time = ? ") + if err != nil { + log.Println("Failed to prepare update") + return + } + + var cd cleanedData + for rows.Next() { + err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &cd.time, &cd.occurences) + if err != nil { + log.Println("Failed to scan row") + return + } + // Add differential privacy noise + cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon) + + // Update the entry + _, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time) + if err != nil { + log.Println(err) + } + } + return +} |