summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cleaner.go15
-rw-r--r--main.go28
-rw-r--r--sqlQueries.go19
3 files changed, 42 insertions, 20 deletions
diff --git a/cleaner.go b/cleaner.go
index 3d47f39..bdf4cae 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -9,7 +9,7 @@ import (
"time"
)
-func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err error) {
+func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft int, err error) {
db, err := sql.Open("mysql", db_user+":"+db_pass+"@"+db_conn+"/"+db_name)
if err != nil {
@@ -26,10 +26,12 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro
interval, err := conf.getInterval()
if err != nil {
- return err
+ return
}
+
+ cleanLimit := time.Now().Add(-2 * interval)
//Fetch data that should be cleaned
- rDat, err := fetchRawData(db, time.Now().Add(-2*interval), conf.Limit)
+ rDat, err := fetchRawData(db, cleanLimit, conf.Limit)
if err != nil {
log.Println("Faild to fetch raw data")
return
@@ -65,9 +67,14 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro
log.Println("Failed to remove old data")
return
}
+ rowsLeft, err = availableRows(tx, cleanLimit)
+ if err != nil {
+ tx.Rollback()
+ log.Println("Failed to fetch available rows")
+ return
+ }
tx.Commit()
-
return
}
diff --git a/main.go b/main.go
index 84a3559..027867a 100644
--- a/main.go
+++ b/main.go
@@ -34,23 +34,27 @@ func main() {
}
*/
starttime := time.Now()
- err = cleanData(conf, DATABASE_USER, DATABASE_PASS, DATABASE_CONNECTION, DATABASE_NAME)
+ numOfRowsNotCleaned, err := cleanData(conf, DATABASE_USER, DATABASE_PASS, DATABASE_CONNECTION, DATABASE_NAME)
if err != nil {
log.Println(err)
}
- db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@"+DATABASE_CONNECTION+"/"+DATABASE_NAME)
- if err != nil {
- log.Println("Failed to connect to db")
- return
- }
- defer db.Close()
- ival, err := conf.getInterval()
- if err != nil {
- log.Println("erronous interval in conf prevents the privatization of data")
- return
+ // If either all rows are processed or if there is no limit for the processing
+ // we can safely add noise to the cleaned data
+ if numOfRowsNotCleaned == 0 || conf.Limit == 0 {
+ db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@"+DATABASE_CONNECTION+"/"+DATABASE_NAME)
+ if err != nil {
+ log.Println("Failed to connect to db")
+ return
+ }
+ defer db.Close()
+ ival, err := conf.getInterval()
+ if err != nil {
+ log.Println("erronous interval in conf prevents the privatization of data")
+ return
+ }
+ privatizeCleaned(db, starttime.Add(-2*ival), conf)
}
- privatizeCleaned(db, starttime.Add(-2*ival), conf)
}
//Starts a process that reads from stdin and
diff --git a/sqlQueries.go b/sqlQueries.go
index 65b0a7a..b814d2e 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -179,11 +179,7 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
// Adds differential privacy to all entries in the
// database that is older than t and haven't had
// differential privacy added to them yet.
-// If epsilon == 0 in conf. Then nothing is done.
func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
- if conf.Epsilon == 0 {
- return
- }
query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time,occurences FROM clean_data WHERE time < ? FOR UPDATE")
if err != nil {
log.Println("Failed to prepare query")
@@ -220,3 +216,18 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
}
return
}
+
+func availableRows(tx *sql.Tx, limit time.Time) (numRows int, err error) {
+ stmt, err := tx.Prepare("SELECT COUNT(*) FROM acct WHERE stamp_inserted < ? ")
+ if err != nil {
+ log.Println("Could not prepare statement")
+ return
+ }
+ row := stmt.QueryRow(limit)
+
+ err = row.Scan(&numRows)
+ if err != nil {
+ log.Println("Failed to scan result")
+ }
+ return
+}