summaryrefslogtreecommitdiff
path: root/sqlQueries.go
diff options
context:
space:
mode:
Diffstat (limited to 'sqlQueries.go')
-rw-r--r--sqlQueries.go49
1 files changed, 25 insertions, 24 deletions
diff --git a/sqlQueries.go b/sqlQueries.go
index 241f4e8..e7eb06f 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -4,6 +4,7 @@ import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"log"
+ "os"
"time"
)
@@ -12,11 +13,11 @@ const (
)
var (
- logger *log.Logger
+ slogger *log.Logger
)
func init() {
- logger = log.New(os.Stdout, "[ SQL ]", log.LstdFlags)
+ slogger = log.New(os.Stdout, "[ SQL ]", log.LstdFlags)
}
//Retrieves limit Rawdata entries that are older than tim
@@ -30,7 +31,7 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ?")
}
if err != nil {
- logger.Println("Failed to prepare select")
+ slogger.Println("Failed to prepare select")
return
}
@@ -41,26 +42,26 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
rows, err = prepSel.Query(tim)
}
if err != nil {
- logger.Println("Failed to query prepared selection")
+ slogger.Println("Failed to query prepared selection")
return
}
defer rows.Close()
tx, err := db.Begin()
if err != nil {
- logger.Println("Failed to initialize transaction")
+ slogger.Println("Failed to initialize transaction")
return
}
prepUp, err := tx.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
if err != nil {
- logger.Println("Failed to prepare update")
+ slogger.Println("Failed to prepare update")
return
}
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
- logger.Println("Couldn't load timezone")
+ slogger.Println("Couldn't load timezone")
return
}
for rows.Next() {
@@ -68,18 +69,18 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
var tim []byte
err = rows.Scan(&r.IpSrc, &r.IpDst, &r.AsSrc, &r.AsDst, &r.PortSrc, &r.PortDst, &r.Packets, &r.PktLenDist, &tim)
if err != nil {
- logger.Println("Failed to scan result of query")
+ slogger.Println("Failed to scan result of query")
return
}
r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
if err != nil {
- logger.Println("Failed to parse timestamp")
+ slogger.Println("Failed to parse timestamp")
return
}
_, err = prepUp.Exec(time.Now(), r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist, r.time)
if err != nil {
- logger.Println("Failed to query prepared update")
+ slogger.Println("Failed to query prepared update")
tx.Rollback()
return
}
@@ -131,14 +132,14 @@ func purgeRawData(tx *sql.Tx, cfg *Config, rDat []RawData) (err error) {
func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
prepStmt, err := tx.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
- logger.Println("Failed to prepare statement")
+ slogger.Println("Failed to prepare statement")
return err
}
for ix := range cd {
_, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences)
if err != nil {
- logger.Println("Failed to execute statement")
+ slogger.Println("Failed to execute statement")
return err
}
}
@@ -149,21 +150,21 @@ func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
func insertCleanDataToDB(cfg *Config, cd []cleanedData) error {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
- logger.Println("Failed to connect to db")
+ slogger.Println("Failed to connect to db")
return err
}
defer db.Close()
prepStmt, err := db.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
- logger.Println("Failed to prepare statement")
+ slogger.Println("Failed to prepare statement")
return err
}
for ix := range cd {
_, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences)
if err != nil {
- logger.Println("Failed to execute statement")
+ slogger.Println("Failed to execute statement")
return err
}
}
@@ -223,26 +224,26 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
}
query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM " + cfg.CleanTable + " WHERE time_added < ?")
if err != nil {
- logger.Println("Failed to prepare query")
+ slogger.Println("Failed to prepare query")
return
}
rows, err := query.Query(t)
if err != nil {
- logger.Println("Failed to query for unprivitized rows")
+ slogger.Println("Failed to query for unprivitized rows")
return
}
defer rows.Close()
update, err := db.Prepare("UPDATE " + cfg.CleanTable + " SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ")
if err != nil {
- logger.Println("Failed to prepare update")
+ slogger.Println("Failed to prepare update")
return
}
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
- logger.Println("Couldn't load timezone")
+ slogger.Println("Couldn't load timezone")
return
}
var cd cleanedData
@@ -250,12 +251,12 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
var tim []byte
err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &tim, &cd.occurences)
if err != nil {
- logger.Println("Failed to scan row")
+ slogger.Println("Failed to scan row")
return
}
cd.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
if err != nil {
- logger.Println("Failed to parse timestamp")
+ slogger.Println("Failed to parse timestamp")
return
}
// Add differential privacy noise
@@ -264,7 +265,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
// Update the entry
_, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time)
if err != nil {
- logger.Println("Failed to update an entry:", err)
+ slogger.Println("Failed to update an entry:", err)
}
}
return
@@ -273,14 +274,14 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
func availableRows(tx *sql.Tx, cfg *Config, timeLimit time.Time) (numRows int, err error) {
stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + cfg.RawTable + " WHERE stamp_inserted < ? ")
if err != nil {
- logger.Println("Could not prepare statement")
+ slogger.Println("Could not prepare statement")
return
}
row := stmt.QueryRow(timeLimit)
err = row.Scan(&numRows)
if err != nil {
- logger.Println("Failed to scan result")
+ slogger.Println("Failed to scan result")
}
return
}