summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cleaner.go32
-rw-r--r--config.go19
-rw-r--r--datastructs.go6
-rw-r--r--diffpriv.go2
-rw-r--r--main.go17
-rw-r--r--sqlQueries.go53
6 files changed, 98 insertions, 31 deletions
diff --git a/cleaner.go b/cleaner.go
index d79e462..3d47f39 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -19,24 +19,14 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro
defer db.Close()
//Remove the processed mark on entries older than 6 hours
- err = reprocess(db, time.Now().Add(-6*time.Hour))
+ err = reprocess(db, time.Now().Add(-1*time.Hour))
if err != nil {
return
}
- var interval time.Duration
- switch conf.Interval {
- case "5min":
- interval = time.Minute * 5
- case "10min":
- interval = time.Minute * 10
- case "hour":
- interval = time.Hour
- case "day":
- interval = time.Hour * 24
- default:
- err = errors.New(fmt.Sprintf("Invalid interval: %s", conf.Interval))
- return
+ interval, err := conf.getInterval()
+ if err != nil {
+ return err
}
//Fetch data that should be cleaned
rDat, err := fetchRawData(db, time.Now().Add(-2*interval), conf.Limit)
@@ -51,11 +41,6 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro
return
}
- //Add noise for differential privacy
- for i := range cDat {
- cDat[i].occurences = diffpriv(cDat[i].occurences, 1, conf.Epsilon)
- }
-
//Begin transaction
tx, err := db.Begin()
if err != nil {
@@ -82,6 +67,7 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro
}
tx.Commit()
+
return
}
@@ -120,7 +106,7 @@ func getTimespan(t time.Time, conf Config) (span time.Time, err error) {
return
}
-func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) {
+func clean(rDat []rawData, conf Config) (cDat []cleanedData, err error) {
// collect all ips so we can query for their ip blocks
ips := make(map[string]struct{})
for _, rd := range rDat {
@@ -145,7 +131,7 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) {
return
}
cDat = append(cDat,
- CleanData{
+ cleanedData{
ipbSrc: pairs[rd.ipSrc],
ipbDst: pairs[rd.ipDst],
asSrc: rd.asSrc,
@@ -163,8 +149,8 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) {
return
}
-func removeDups(cDat []CleanData) []CleanData {
- ret := make([]CleanData, 0)
+func removeDups(cDat []cleanedData) []cleanedData {
+ ret := make([]cleanedData, 0)
var found bool
for ci := range cDat {
found = false
diff --git a/config.go b/config.go
index fd80d87..c0951f4 100644
--- a/config.go
+++ b/config.go
@@ -2,8 +2,11 @@ package main
import (
"encoding/json"
+ "errors"
+ "fmt"
"io/ioutil"
"log"
+ "time"
)
type Config struct {
@@ -12,6 +15,22 @@ type Config struct {
Epsilon float64 `json:epsilon`
}
+func (cfg *Config) getInterval() (interval time.Duration, err error) {
+ switch cfg.Interval {
+ case "5min":
+ interval = time.Minute * 5
+ case "10min":
+ interval = time.Minute * 10
+ case "hour":
+ interval = time.Hour
+ case "day":
+ interval = time.Hour * 24
+ default:
+ err = errors.New(fmt.Sprintf("Invalid interval: %s", cfg.Interval))
+ }
+ return
+}
+
func readConfig() (conf Config, err error) {
content, err := ioutil.ReadFile("config.json")
if err != nil {
diff --git a/datastructs.go b/datastructs.go
index 4a30da7..8220c62 100644
--- a/datastructs.go
+++ b/datastructs.go
@@ -4,7 +4,7 @@ import (
"time"
)
-type RawData struct {
+type rawData struct {
ipSrc string
ipDst string
asSrc int
@@ -16,7 +16,7 @@ type RawData struct {
time time.Time
}
-type CleanData struct {
+type cleanedData struct {
ipbSrc string
ipbDst string
asSrc int
@@ -28,7 +28,7 @@ type CleanData struct {
time time.Time
}
-func (cd *CleanData) equals(other *CleanData) bool {
+func (cd *cleanedData) equals(other *cleanedData) bool {
return cd.ipbSrc == other.ipbSrc &&
cd.ipbDst == other.ipbDst &&
cd.asSrc == other.asSrc &&
diff --git a/diffpriv.go b/diffpriv.go
index 382ef37..fb6d2fc 100644
--- a/diffpriv.go
+++ b/diffpriv.go
@@ -19,7 +19,7 @@ func diffpriv(value int, sensitivity, epsilon float64) int {
return 0
}
noise := laplaceDist(0, sensitivity/epsilon)
- return round(float64(value) + noise)
+ return round(math.Abs(float64(value) + noise))
}
// Returns a random value from a laplace
diff --git a/main.go b/main.go
index dcf42b1..84a3559 100644
--- a/main.go
+++ b/main.go
@@ -2,8 +2,11 @@ package main
import (
"bufio"
+ "database/sql"
+ _ "github.com/go-sql-driver/mysql"
"log"
"os"
+ "time"
//"strings"
)
@@ -30,10 +33,24 @@ func main() {
}
}
*/
+ starttime := time.Now()
err = cleanData(conf, DATABASE_USER, DATABASE_PASS, DATABASE_CONNECTION, DATABASE_NAME)
if err != nil {
log.Println(err)
}
+
+ db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@"+DATABASE_CONNECTION+"/"+DATABASE_NAME)
+ if err != nil {
+ log.Println("Failed to connect to db")
+ return
+ }
+ defer db.Close()
+ ival, err := conf.getInterval()
+ if err != nil {
+ log.Println("erronous interval in conf prevents the privatization of data")
+ return
+ }
+ privatizeCleaned(db, starttime.Add(-2*ival), conf)
}
//Starts a process that reads from stdin and
diff --git a/sqlQueries.go b/sqlQueries.go
index 05c08c1..65b0a7a 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -13,7 +13,7 @@ const (
//Retrieves limit rawdata entries that are older than tim
//limit <= 0 returns all entries that are older than tim
-func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err error) {
+func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) {
var prepSel *sql.Stmt
if limit > 0 {
@@ -52,7 +52,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err
loc, err := time.LoadLocation(TIMEZONE)
for rows.Next() {
- var r RawData
+ var r rawData
var tim []byte
err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim)
if err != nil {
@@ -101,7 +101,7 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
}
//Removes all rawdata that is in rDat from the database
-func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
+func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) {
prepStmt, err := tx.Prepare("DELETE FROM acct WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1")
if err != nil {
return
@@ -116,7 +116,7 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
return
}
-func insertCleanData(tx *sql.Tx, cd *CleanData) error {
+func insertCleanData(tx *sql.Tx, cd *cleanedData) error {
prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
log.Println("Failed to prepare statement")
@@ -175,3 +175,48 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
return nil
}
+
+// Adds differential privacy to all entries in the
+// database that is older than t and haven't had
+// differential privacy added to them yet.
+// If epsilon == 0 in conf. Then nothing is done.
+func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
+ if conf.Epsilon == 0 {
+ return
+ }
+ query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time,occurences FROM clean_data WHERE time < ? FOR UPDATE")
+ if err != nil {
+ log.Println("Failed to prepare query")
+ return
+ }
+
+ rows, err := query.Query(t)
+ if err != nil {
+ log.Println("Failed to query for unprivitized rows")
+ return
+ }
+
+ update, err := db.Prepare("UPDATE clean_data SET occurences = ? , privitazied = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time = ? ")
+ if err != nil {
+ log.Println("Failed to prepare update")
+ return
+ }
+
+ var cd cleanedData
+ for rows.Next() {
+ err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &cd.time, &cd.occurences)
+ if err != nil {
+ log.Println("Failed to scan row")
+ return
+ }
+ // Add differential privacy noise
+ cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon)
+
+ // Update the entry
+ _, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time)
+ if err != nil {
+ log.Println(err)
+ }
+ }
+ return
+}