summaryrefslogtreecommitdiff
path: root/sqlQueries.go
diff options
context:
space:
mode:
Diffstat (limited to 'sqlQueries.go')
-rw-r--r--sqlQueries.go34
1 files changed, 19 insertions, 15 deletions
diff --git a/sqlQueries.go b/sqlQueries.go
index 805248c..34f802a 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -17,9 +17,9 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err
var prepSel *sql.Stmt
if limit > 0 {
- prepSel, err = db.Prepare("SELECT ip_src,ip_dst,time,port,packet_size FROM raw_data WHERE time < ? LIMIT ?")
+ prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ? LIMIT ?")
} else {
- prepSel, err = db.Prepare("SELECT ip_src,ip_dst,time,port,packet_size FROM raw_data WHERE time < ?")
+ prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ?")
}
if err != nil {
log.Println("Failed to prepare select")
@@ -44,7 +44,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err
return
}
- prepUp, err := tx.Prepare("UPDATE raw_data SET process_time = ? where ip_src = ? AND ip_dst = ? AND time = ? AND port = ? and packet_size = ? AND process_time IS NULL LIMIT 1")
+ prepUp, err := tx.Prepare("UPDATE raw_data SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
if err != nil {
log.Println("Failed to prepare update")
return
@@ -54,14 +54,18 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err
for rows.Next() {
var r RawData
var tim []byte
- err = rows.Scan(&r.ipSrc, &r.ipDst, &tim, &r.port, &r.packetSize)
- r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
+ err = rows.Scan(&rd.ipSrc, &rd.ipDst, &rd.asSrc, &rd.asDst, &rd.portSrc, &rd.portDst, &rd.packets, &rd.pktLenDist, &tim)
if err != nil {
log.Println("Failed to scan result of query")
return
}
+ r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
+ if err != nil {
+ log.Println("Failed to parse timestamp")
+ return
+ }
- _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
+ _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist, r.time)
if err != nil {
log.Println("Failed to query prepared update")
tx.Rollback()
@@ -74,9 +78,9 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err
return
}
-//Removes the process_time from every entry that started being proccesed before tim
+//Removes the stamp_processed from every entry that started being proccesed before tim
func reprocess(db *sql.DB, tim time.Time) (err error) {
- stmt, err := db.Prepare("UPDATE raw_data SET process_time = NULL WHERE process_time < ?")
+ stmt, err := db.Prepare("UPDATE raw_data SET stamp_processed = NULL WHERE stamp_processed < ?")
if err != nil {
return
}
@@ -86,8 +90,8 @@ func reprocess(db *sql.DB, tim time.Time) (err error) {
}
//Removes all entries in the database that started being processed before tim
-func purgeProcessed(db *sql.DB, tim time.Time) (err error) {
- stmt, err := db.Prepare("DELETE FROM raw_data WHERE process_time < ? ")
+func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
+ stmt, err := db.Prepare("DELETE FROM raw_data WHERE stamp_processed < ? ")
if err != nil {
return
}
@@ -98,13 +102,13 @@ func purgeProcessed(db *sql.DB, tim time.Time) (err error) {
//Removes all rawdata that is in rDat from the database
func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
- prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? AND process_time IS NOT NULL LIMIT 1")
+ prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_lenDist = ? AND stamp_processed IS NOT NULL LIMIT 1")
if err != nil {
return
}
for _, r := range rDat {
- _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
+ _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist)
if err != nil {
return
}
@@ -112,14 +116,14 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
return
}
-func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, tim time.Time, port, occurences int) error {
- prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurences) VALUES ( ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
+func insertCleanData(tx *sql.Tx, cd *CleanData) error {
+ prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurances, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
log.Println("Failed to prepare statement")
return err
}
- _, err = prepStmt.Exec(ipbSrc, ipbDst, tim, port, volume, occurences, occurences)
+ _, err = prepStmt.Exec(cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.occurances, cd.volume, cd.time)
if err != nil {
log.Println("Failed to execute statement")
return err