summaryrefslogtreecommitdiff
path: root/sqlQueries.go
diff options
context:
space:
mode:
Diffstat (limited to 'sqlQueries.go')
-rw-r--r--sqlQueries.go53
1 files changed, 49 insertions, 4 deletions
diff --git a/sqlQueries.go b/sqlQueries.go
index 05c08c1..65b0a7a 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -13,7 +13,7 @@ const (
//Retrieves limit rawdata entries that are older than tim
//limit <= 0 returns all entries that are older than tim
-func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err error) {
+func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) {
var prepSel *sql.Stmt
if limit > 0 {
@@ -52,7 +52,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err
loc, err := time.LoadLocation(TIMEZONE)
for rows.Next() {
- var r RawData
+ var r rawData
var tim []byte
err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim)
if err != nil {
@@ -101,7 +101,7 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
}
//Removes all rawdata that is in rDat from the database
-func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
+func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) {
prepStmt, err := tx.Prepare("DELETE FROM acct WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1")
if err != nil {
return
@@ -116,7 +116,7 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
return
}
-func insertCleanData(tx *sql.Tx, cd *CleanData) error {
+func insertCleanData(tx *sql.Tx, cd *cleanedData) error {
prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
log.Println("Failed to prepare statement")
@@ -175,3 +175,48 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
return nil
}
+
+// Adds differential privacy to all entries in the
+// database that is older than t and haven't had
+// differential privacy added to them yet.
+// If epsilon == 0 in conf. Then nothing is done.
+func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
+ if conf.Epsilon == 0 {
+ return
+ }
+ query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time,occurences FROM clean_data WHERE time < ? FOR UPDATE")
+ if err != nil {
+ log.Println("Failed to prepare query")
+ return
+ }
+
+ rows, err := query.Query(t)
+ if err != nil {
+ log.Println("Failed to query for unprivitized rows")
+ return
+ }
+
+ update, err := db.Prepare("UPDATE clean_data SET occurences = ? , privitazied = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time = ? ")
+ if err != nil {
+ log.Println("Failed to prepare update")
+ return
+ }
+
+ var cd cleanedData
+ for rows.Next() {
+ err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &cd.time, &cd.occurences)
+ if err != nil {
+ log.Println("Failed to scan row")
+ return
+ }
+ // Add differential privacy noise
+ cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon)
+
+ // Update the entry
+ _, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time)
+ if err != nil {
+ log.Println(err)
+ }
+ }
+ return
+}