summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cleaner.go36
-rw-r--r--config.go11
-rw-r--r--config.json9
-rw-r--r--main.go19
-rw-r--r--sqlQueries.go47
5 files changed, 65 insertions, 57 deletions
diff --git a/cleaner.go b/cleaner.go
index 986d556..21b6842 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -9,9 +9,9 @@ import (
"time"
)
-func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft int, err error) {
+func cleanData(cfg *Config) (rowsLeft int, err error) {
- db, err := sql.Open("mysql", db_user+":"+db_pass+"@"+db_conn+"/"+db_name)
+ db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
log.Println("Failed to connect to db")
return
@@ -19,25 +19,25 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft
defer db.Close()
//Remove the processed mark on entries older than 6 hours
- err = reprocess(db, time.Now().Add(-1*time.Hour))
+ err = reprocess(db, cfg, time.Now().Add(-1*time.Hour))
if err != nil {
return
}
- interval, err := conf.getInterval()
+ interval, err := cfg.getInterval()
if err != nil {
return
}
cleanLimit := time.Now().Add(-2 * interval)
//Fetch data that should be cleaned
- rDat, err := fetchRawData(db, cleanLimit, conf.Limit)
+ rDat, err := fetchRawData(db, cfg, cleanLimit)
if err != nil {
log.Println("Faild to fetch raw data")
return
}
- cDat, err := clean(rDat, conf)
+ cDat, err := clean(rDat, cfg)
if err != nil {
log.Println("Failed to clean data")
return
@@ -51,7 +51,7 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft
}
//save cleaned data
- err = insertCleanData(tx, cDat)
+ err = insertCleanData(tx, cfg, cDat)
if err != nil {
tx.Rollback()
log.Println("Failed to save cleaned data")
@@ -59,13 +59,13 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft
}
//remove old data
- err = purgeRawData(tx, rDat)
+ err = purgeRawData(tx, cfg, rDat)
if err != nil {
tx.Rollback()
log.Println("Failed to remove old data")
return
}
- rowsLeft, err = availableRows(tx, cleanLimit)
+ rowsLeft, err = availableRows(tx, cfg, cleanLimit)
if err != nil {
tx.Rollback()
log.Println("Failed to fetch available rows")
@@ -76,42 +76,42 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (rowsLeft
return
}
-func getTimespan(t time.Time, conf Config) (span time.Time, err error) {
+func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) {
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
return
}
switch {
- case conf.Interval == "5min": //Round the date into 5 minutes
+ case cfg.Interval == "5min": //Round the date into 5 minutes
y, m, d := t.Date()
h := t.Hour()
min := t.Minute()
min = (min / 5) * 5
span = time.Date(y, m, d, h, min, 0, 0, loc)
- case conf.Interval == "10min": //Round the date into 10 minutes
+ case cfg.Interval == "10min": //Round the date into 10 minutes
y, m, d := t.Date()
h := t.Hour()
min := t.Minute()
min = (min / 10) * 10
span = time.Date(y, m, d, h, min, 0, 0, loc)
- case conf.Interval == "hour": //Round the date into hour
+ case cfg.Interval == "hour": //Round the date into hour
y, m, d := t.Date()
h := t.Hour()
span = time.Date(y, m, d, h, 0, 0, 0, loc)
- case conf.Interval == "day": //Round the date into day
+ case cfg.Interval == "day": //Round the date into day
y, m, d := t.Date()
span = time.Date(y, m, d, 0, 0, 0, 0, loc)
default:
- err = errors.New(fmt.Sprintf("Bad interval in config %s", conf.Interval))
+ err = errors.New(fmt.Sprintf("Bad interval in config %s", cfg.Interval))
return
}
return
}
-func clean(rDat []rawData, conf Config) (cDat []cleanedData, err error) {
+func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) {
// collect all ips so we can query for their ip blocks
ips := make(map[string]struct{})
for _, rd := range rDat {
@@ -124,14 +124,16 @@ func clean(rDat []rawData, conf Config) (cDat []cleanedData, err error) {
iplist = append(iplist, ip)
}
+ log.Println("Querying for ip-blocks...")
pairs, err := findIPBlock(iplist...)
if err != nil {
return
}
+ log.Println("ip-blocks returned")
for _, rd := range rDat {
var tim time.Time
- tim, err = getTimespan(rd.time, conf)
+ tim, err = getTimespan(rd.time, cfg)
if err != nil {
return
}
diff --git a/config.go b/config.go
index c0951f4..b6c903f 100644
--- a/config.go
+++ b/config.go
@@ -13,6 +13,13 @@ type Config struct {
Limit int `json:limit`
Interval string `json:interval`
Epsilon float64 `json:epsilon`
+
+ DBConn string `json:DBConn`
+ DBName string `json:DBName`
+ RawTable string `json:rawTable`
+ CleanTable string `json:cleanTable`
+ DBUser string `json:DBUser`
+ DBPass string `json:DBPass`
}
func (cfg *Config) getInterval() (interval time.Duration, err error) {
@@ -31,12 +38,12 @@ func (cfg *Config) getInterval() (interval time.Duration, err error) {
return
}
-func readConfig() (conf Config, err error) {
+func readConfig() (cfg *Config, err error) {
content, err := ioutil.ReadFile("config.json")
if err != nil {
log.Println(err)
}
- err = json.Unmarshal(content, &conf)
+ err = json.Unmarshal(content, &cfg)
if err != nil {
log.Println(err)
}
diff --git a/config.json b/config.json
index ebb9f81..98e8854 100644
--- a/config.json
+++ b/config.json
@@ -6,5 +6,12 @@
"limit": 100,
"comment Epsilon": "Epsilon is the epsilon value for differential privacy. epsilon < 1 high privacy, 10 < epsilon low privacy. If epsilon is set to 0, differential privacy will not be used.",
- "epsilon": 0
+ "epsilon": 0,
+
+ "DBConn": "",
+ "DBName": "test",
+ "rawTable": "test_raw",
+ "cleanTable": "test_clean",
+ "DBUser": "flowcleaner",
+ "DBPass": "nil",
}
diff --git a/main.go b/main.go
index 65b314e..5b94e07 100644
--- a/main.go
+++ b/main.go
@@ -10,16 +10,9 @@ import (
//"strings"
)
-const (
- DATABASE_USER = "flowcleaner"
- DATABASE_PASS = "nil"
- DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555)
- DATABASE_NAME = "pmacct"
-)
-
func main() {
log.Println("Reading config...")
- conf, err := readConfig()
+ cfg, err := readConfig()
if err != nil {
log.Println("Could not read config")
return
@@ -37,7 +30,7 @@ func main() {
*/
log.Print("Cleaning data...")
starttime := time.Now()
- numOfRowsNotCleaned, err := cleanData(conf, DATABASE_USER, DATABASE_PASS, DATABASE_CONNECTION, DATABASE_NAME)
+ numOfRowsNotCleaned, err := cleanData(cfg)
if err != nil {
log.Println(err)
log.Println("Exiting...")
@@ -47,22 +40,22 @@ func main() {
// If either all rows are processed or if there is no limit for the processing
// we can safely add noise to the cleaned data
- if (numOfRowsNotCleaned == 0 || conf.Limit == 0) && conf.Epsilon >= 0 {
+ if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
log.Println("Adding differential privacy noise to processed data...")
- db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@"+DATABASE_CONNECTION+"/"+DATABASE_NAME)
+ db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
log.Println("Failed to connect to db:", err)
return
}
defer db.Close()
- ival, err := conf.getInterval()
+ ival, err := cfg.getInterval()
if err != nil {
log.Println("erronous interval in conf prevents the privatization of data:", err)
return
}
- err = privatizeCleaned(db, starttime.Add(-2*ival), conf)
+ err = privatizeCleaned(db, starttime.Add(-2*ival), cfg)
if err != nil {
log.Println("Failed to privatize data:", err)
}
diff --git a/sqlQueries.go b/sqlQueries.go
index efaffba..6048b4c 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -9,18 +9,17 @@ import (
const (
TIMEZONE = "UTC"
- TABLE = "copyOfData"
)
//Retrieves limit rawdata entries that are older than tim
//limit <= 0 returns all entries that are older than tim
-func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) {
+func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err error) {
var prepSel *sql.Stmt
- if limit > 0 {
- prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + TABLE + " WHERE stamp_processed IS NULL AND stamp_inserted < ? LIMIT ?")
+ if cfg.Limit > 0 {
+ prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ? LIMIT ?")
} else {
- prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + TABLE + " WHERE stamp_processed IS NULL AND stamp_inserted < ?")
+ prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ?")
}
if err != nil {
log.Println("Failed to prepare select")
@@ -28,8 +27,8 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err
}
var rows *sql.Rows
- if limit > 0 {
- rows, err = prepSel.Query(tim, limit)
+ if cfg.Limit > 0 {
+ rows, err = prepSel.Query(tim, cfg.Limit)
} else {
rows, err = prepSel.Query(tim)
}
@@ -45,7 +44,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err
return
}
- prepUp, err := tx.Prepare("UPDATE " + TABLE + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
+ prepUp, err := tx.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
if err != nil {
log.Println("Failed to prepare update")
return
@@ -84,8 +83,8 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err
}
//Removes the stamp_processed from every entry that started being proccesed before tim
-func reprocess(db *sql.DB, tim time.Time) (err error) {
- stmt, err := db.Prepare("UPDATE " + TABLE + " SET stamp_processed = NULL WHERE stamp_processed < ?")
+func reprocess(db *sql.DB, cfg *Config, tim time.Time) (err error) {
+ stmt, err := db.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = NULL WHERE stamp_processed < ?")
if err != nil {
return
}
@@ -95,8 +94,8 @@ func reprocess(db *sql.DB, tim time.Time) (err error) {
}
//Removes all entries in the database that started being processed before tim
-func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
- stmt, err := db.Prepare("DELETE FROM " + TABLE + " WHERE stamp_processed < ? ")
+func purgeAllProcessed(db *sql.DB, cfg *Config, tim time.Time) (err error) {
+ stmt, err := db.Prepare("DELETE FROM " + cfg.RawTable + " WHERE stamp_processed < ? ")
if err != nil {
return
}
@@ -106,8 +105,8 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
}
//Removes all rawdata that is in rDat from the database
-func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) {
- prepStmt, err := tx.Prepare("DELETE FROM " + TABLE + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1")
+func purgeRawData(tx *sql.Tx, cfg *Config, rDat []rawData) (err error) {
+ prepStmt, err := tx.Prepare("DELETE FROM " + cfg.RawTable + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1")
if err != nil {
return
}
@@ -121,8 +120,8 @@ func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) {
return
}
-func insertCleanData(tx *sql.Tx, cd []cleanedData) error {
- prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
+func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
+ prepStmt, err := tx.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
log.Println("Failed to prepare statement")
return err
@@ -186,11 +185,11 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
// Adds differential privacy to all entries in the
// database that is older than t and haven't had
// differential privacy added to them yet.
-func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
- if conf.Epsilon <= 0 {
+func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
+ if cfg.Epsilon <= 0 {
return
}
- query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM clean_data WHERE time_added < ?")
+ query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM " + cfg.CleanTable + " WHERE time_added < ?")
if err != nil {
log.Println("Failed to prepare query")
return
@@ -203,7 +202,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
}
defer rows.Close()
- update, err := db.Prepare("UPDATE clean_data SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ")
+ update, err := db.Prepare("UPDATE " + cfg.CleanTable + " SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ")
if err != nil {
log.Println("Failed to prepare update")
return
@@ -228,7 +227,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
return
}
// Add differential privacy noise
- cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon)
+ cd.occurences = diffpriv(cd.occurences, 1, cfg.Epsilon)
// Update the entry
_, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time)
@@ -239,13 +238,13 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
return
}
-func availableRows(tx *sql.Tx, limit time.Time) (numRows int, err error) {
- stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + TABLE + " WHERE stamp_inserted < ? ")
+func availableRows(tx *sql.Tx, cfg *Config, timeLimit time.Time) (numRows int, err error) {
+ stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + cfg.RawTable + " WHERE stamp_inserted < ? ")
if err != nil {
log.Println("Could not prepare statement")
return
}
- row := stmt.QueryRow(limit)
+ row := stmt.QueryRow(timeLimit)
err = row.Scan(&numRows)
if err != nil {