summaryrefslogtreecommitdiff
path: root/sqlQueries.go
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-03-09 15:39:09 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-03-09 15:39:09 +0100
commit083ff24409534cc46a07d9d7bd724b8be300f843 (patch)
tree725d95d3fa080611442f8f466af0776c6fd3a6d4 /sqlQueries.go
parentc5d17b84bf54ece938c6aa712cff6c9c9ab17da0 (diff)
added processing of timestamp in raw data
Diffstat (limited to 'sqlQueries.go')
-rw-r--r--sqlQueries.go57
1 files changed, 51 insertions, 6 deletions
diff --git a/sqlQueries.go b/sqlQueries.go
index e333c58..a0c2012 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -7,16 +7,30 @@ import (
"time"
)
-func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) {
- prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ")
+//Retrieves all rawdata older than tim
+func fetchRawData(db *sql.DB, tim time.Time) (rDat []RawData, err error) {
+
+ prepSel, err := db.Prepare("SELECT ip_src,ip_dst,time,port,packet_size FROM raw_data WHERE time < tim")
if err != nil {
- log.Println("Failed to prepare statement")
+ log.Println("Failed to prepare select")
+ return
+ }
+
+ rows, err := prepSel.Query(numRows)
+ if err != nil {
+ log.Println("Failed to query prepared selection")
return
}
- rows, err := prepStmt.Query(numRows)
+ tx, err := db.Begin()
if err != nil {
- log.Println("Failed to query prepared statement")
+ log.Println("Failed to initialize transaction")
+ return
+ }
+
+ prepUp, err := tx.Prepare("UPDATE raw_data SET process_time ? where ip_src = ? AND ip_dst = ? AND time = ? AND port = ? and packet_size = ? LIMIT 1")
+ if err != nil {
+ log.Println("Failed to prepare update")
return
}
@@ -30,13 +44,44 @@ func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) {
log.Println("Failed to scan result of query")
return
}
+
+ prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
+ if err != nil {
+ log.Println("Failed to query prepared update")
+ tx.Rollback()
+ return
+ }
+
rDat = append(rDat, r)
}
+ tx.Commit()
+ return
+}
+
+//Removes the process_time from every entry that started being proccesed before tim
+func reprocess(db *sq.DB, tim time.Time) (err error) {
+ stmt, err := db.Prepare("UPDATE raw_data SET process_time = NULL WHERE process_time < ?")
+ if err != nil {
+ return
+ }
+ _, err = stmt.Execute(tim)
+
+ return
+}
+
+//Removes all entries in the database that started being processed before tim
+func purgeProcessed(db *sql.DB, tim time.Time) (err error) {
+ stmt, err := db.Prepare("DELETE FROM raw_data WHERE process_time < ? ")
+ if err != nil {
+ return
+ }
+ _, err = stmt.Execute(tim)
+
return
}
func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
- prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? LIMIT 1")
+ prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? AND process_time IS NOT NULL LIMIT 1")
if err != nil {
return
}