summaryrefslogtreecommitdiff
path: root/sqlQueries.go
diff options
context:
space:
mode:
Diffstat (limited to 'sqlQueries.go')
-rw-r--r--sqlQueries.go47
1 files changed, 23 insertions, 24 deletions
diff --git a/sqlQueries.go b/sqlQueries.go
index efaffba..6048b4c 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -9,18 +9,17 @@ import (
const (
TIMEZONE = "UTC"
- TABLE = "copyOfData"
)
//Retrieves limit rawdata entries that are older than tim
//limit <= 0 returns all entries that are older than tim
-func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err error) {
+func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err error) {
var prepSel *sql.Stmt
- if limit > 0 {
- prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + TABLE + " WHERE stamp_processed IS NULL AND stamp_inserted < ? LIMIT ?")
+ if cfg.Limit > 0 {
+ prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ? LIMIT ?")
} else {
- prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + TABLE + " WHERE stamp_processed IS NULL AND stamp_inserted < ?")
+ prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ?")
}
if err != nil {
log.Println("Failed to prepare select")
@@ -28,8 +27,8 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err
}
var rows *sql.Rows
- if limit > 0 {
- rows, err = prepSel.Query(tim, limit)
+ if cfg.Limit > 0 {
+ rows, err = prepSel.Query(tim, cfg.Limit)
} else {
rows, err = prepSel.Query(tim)
}
@@ -45,7 +44,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err
return
}
- prepUp, err := tx.Prepare("UPDATE " + TABLE + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
+ prepUp, err := tx.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
if err != nil {
log.Println("Failed to prepare update")
return
@@ -84,8 +83,8 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []rawData, err err
}
//Removes the stamp_processed from every entry that started being proccesed before tim
-func reprocess(db *sql.DB, tim time.Time) (err error) {
- stmt, err := db.Prepare("UPDATE " + TABLE + " SET stamp_processed = NULL WHERE stamp_processed < ?")
+func reprocess(db *sql.DB, cfg *Config, tim time.Time) (err error) {
+ stmt, err := db.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = NULL WHERE stamp_processed < ?")
if err != nil {
return
}
@@ -95,8 +94,8 @@ func reprocess(db *sql.DB, tim time.Time) (err error) {
}
//Removes all entries in the database that started being processed before tim
-func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
- stmt, err := db.Prepare("DELETE FROM " + TABLE + " WHERE stamp_processed < ? ")
+func purgeAllProcessed(db *sql.DB, cfg *Config, tim time.Time) (err error) {
+ stmt, err := db.Prepare("DELETE FROM " + cfg.RawTable + " WHERE stamp_processed < ? ")
if err != nil {
return
}
@@ -106,8 +105,8 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
}
//Removes all rawdata that is in rDat from the database
-func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) {
- prepStmt, err := tx.Prepare("DELETE FROM " + TABLE + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1")
+func purgeRawData(tx *sql.Tx, cfg *Config, rDat []rawData) (err error) {
+ prepStmt, err := tx.Prepare("DELETE FROM " + cfg.RawTable + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1")
if err != nil {
return
}
@@ -121,8 +120,8 @@ func purgeRawData(tx *sql.Tx, rDat []rawData) (err error) {
return
}
-func insertCleanData(tx *sql.Tx, cd []cleanedData) error {
- prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
+func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
+ prepStmt, err := tx.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
log.Println("Failed to prepare statement")
return err
@@ -186,11 +185,11 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
// Adds differential privacy to all entries in the
// database that is older than t and haven't had
// differential privacy added to them yet.
-func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
- if conf.Epsilon <= 0 {
+func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
+ if cfg.Epsilon <= 0 {
return
}
- query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM clean_data WHERE time_added < ?")
+ query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM " + cfg.CleanTable + " WHERE time_added < ?")
if err != nil {
log.Println("Failed to prepare query")
return
@@ -203,7 +202,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
}
defer rows.Close()
- update, err := db.Prepare("UPDATE clean_data SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ")
+ update, err := db.Prepare("UPDATE " + cfg.CleanTable + " SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ")
if err != nil {
log.Println("Failed to prepare update")
return
@@ -228,7 +227,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
return
}
// Add differential privacy noise
- cd.occurences = diffpriv(cd.occurences, 1, conf.Epsilon)
+ cd.occurences = diffpriv(cd.occurences, 1, cfg.Epsilon)
// Update the entry
_, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time)
@@ -239,13 +238,13 @@ func privatizeCleaned(db *sql.DB, t time.Time, conf Config) (err error) {
return
}
-func availableRows(tx *sql.Tx, limit time.Time) (numRows int, err error) {
- stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + TABLE + " WHERE stamp_inserted < ? ")
+func availableRows(tx *sql.Tx, cfg *Config, timeLimit time.Time) (numRows int, err error) {
+ stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + cfg.RawTable + " WHERE stamp_inserted < ? ")
if err != nil {
log.Println("Could not prepare statement")
return
}
- row := stmt.QueryRow(limit)
+ row := stmt.QueryRow(timeLimit)
err = row.Scan(&numRows)
if err != nil {