summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-02-27 14:58:27 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-02-27 14:58:27 +0100
commit9fe8a6e44cc53c2b96db45c8e605d9b7820252f8 (patch)
tree1fd4093cf7f6495b5372b5de060f87298fab6a60
parent43326cc2e8e595e4ed456751326465f1937c935a (diff)
added more functions that will be needed
-rw-r--r--mysql.go87
1 files changed, 81 insertions, 6 deletions
diff --git a/mysql.go b/mysql.go
index d8dc39a..06d9c2f 100644
--- a/mysql.go
+++ b/mysql.go
@@ -2,7 +2,6 @@ package main
import (
"database/sql"
- "fmt"
_ "github.com/go-sql-driver/mysql"
"time"
)
@@ -12,28 +11,100 @@ const (
DATABASE_PASS = "pass"
DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555)
DATABASE_NAME = "netflow"
+ PROCESS_BATCH = 3
)
-func main() {
+type RawData struct {
+ ipSrc string
+ ipDst string
+ time time.Time
+ port int
+ packetSize int
+}
+func derp() (err error) {
db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME)
if err != nil {
- panic(err)
+ return
}
defer db.Close()
+ rDat, err := fetchRawData(db)
+ if err != nil {
+ return
+ }
+
+ //Begin transaction
+ tx, err := db.Begin()
+ if err != nil {
+ return
+ }
+
+ //TODO insertCleanedData
+
+ //remove old data
+ err = purgeRawData(tx, rDat)
+ if err != nil {
+ tx.Rollback()
+ return
+ }
+
+ tx.Commit()
+ return
+}
+
+func fetchRawData(db *sql.DB) (rDat []RawData, err error) {
+ prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ")
+ if err != nil {
+ return
+ }
+
+ rows, err := prepStmt.Query(PROCESS_BATCH)
+ if err != nil {
+ return
+ }
+
+ for rows.Next() {
+ var r RawData
+ err = rows.Scan(&r.ipSrc, &r.ipDst, &r.time, &r.port, &r.packetSize)
+ if err != nil {
+ return
+ }
+ rDat = append(rDat, r)
+ }
+ return
+}
+
+func insertCleanedData(tx *sql.Tx, rDat []RawData) (err error) {
+
+}
+
+func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
+ for _, r := range rDat {
+ prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?")
+ if err != nil {
+ return
+ }
+ _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
+ if err != nil {
+ return
+ }
+ }
+ return
}
-func insertCleanData(db *slq.DB, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error {
+func insertCleanData(db *sql.DB, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error {
prepStmt, err := db.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
return err
}
- _, err := prepStmt.Execute(ipbSrc, ipb_dst, time, port, volume, occurences, occurences)
+ _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences)
if err != nil {
return err
}
+
+ return nil
}
func insertASNIP(db *sql.DB, asn int, ipBlock string) error {
@@ -48,7 +119,7 @@ func insertASNIP(db *sql.DB, asn int, ipBlock string) error {
return err
}
if rows != nil {
- return
+ return nil
}
prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )")
@@ -61,6 +132,8 @@ func insertASNIP(db *sql.DB, asn int, ipBlock string) error {
if err != nil {
return err
}
+
+ return nil
}
func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
@@ -74,4 +147,6 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
if err != nil {
return err
}
+
+ return nil
}