diff options
author | Daniel Langesten <daniel.langest@gmail.com> | 2015-02-27 14:58:27 +0100 |
---|---|---|
committer | Daniel Langesten <daniel.langest@gmail.com> | 2015-02-27 14:58:27 +0100 |
commit | 9fe8a6e44cc53c2b96db45c8e605d9b7820252f8 (patch) | |
tree | 1fd4093cf7f6495b5372b5de060f87298fab6a60 | |
parent | 43326cc2e8e595e4ed456751326465f1937c935a (diff) |
added more functions that will be needed
-rw-r--r-- | mysql.go | 87 |
1 files changed, 81 insertions, 6 deletions
@@ -2,7 +2,6 @@ package main import ( "database/sql" - "fmt" _ "github.com/go-sql-driver/mysql" "time" ) @@ -12,28 +11,100 @@ const ( DATABASE_PASS = "pass" DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555) DATABASE_NAME = "netflow" + PROCESS_BATCH = 3 ) -func main() { +type RawData struct { + ipSrc string + ipDst string + time time.Time + port int + packetSize int +} +func derp() (err error) { db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME) if err != nil { - panic(err) + return } defer db.Close() + rDat, err := fetchRawData(db) + if err != nil { + return + } + + //Begin transaction + tx, err := db.Begin() + if err != nil { + return + } + + //TODO insertCleanedData + + //remove old data + err = purgeRawData(tx, rDat) + if err != nil { + tx.Rollback() + return + } + + tx.Commit() + return +} + +func fetchRawData(db *sql.DB) (rDat []RawData, err error) { + prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ") + if err != nil { + return + } + + rows, err := prepStmt.Query(PROCESS_BATCH) + if err != nil { + return + } + + for rows.Next() { + var r RawData + err = rows.Scan(&r.ipSrc, &r.ipDst, &r.time, &r.port, &r.packetSize) + if err != nil { + return + } + rDat = append(rDat, r) + } + return +} + +func insertCleanedData(tx *sql.Tx, rDat []RawData) (err error) { + +} + +func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { + for _, r := range rDat { + prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?") + if err != nil { + return + } + _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) + if err != nil { + return + } + } + return } -func insertCleanData(db *slq.DB, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error { +func insertCleanData(db *sql.DB, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error { prepStmt, err := db.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { return err } - _, err := prepStmt.Execute(ipbSrc, ipb_dst, time, port, volume, occurences, occurences) + _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences) if err != nil { return err } + + return nil } func insertASNIP(db *sql.DB, asn int, ipBlock string) error { @@ -48,7 +119,7 @@ func insertASNIP(db *sql.DB, asn int, ipBlock string) error { return err } if rows != nil { - return + return nil } prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )") @@ -61,6 +132,8 @@ func insertASNIP(db *sql.DB, asn int, ipBlock string) error { if err != nil { return err } + + return nil } func removeASNIP(db *sql.DB, asn int, ipBlock string) error { @@ -74,4 +147,6 @@ func removeASNIP(db *sql.DB, asn int, ipBlock string) error { if err != nil { return err } + + return nil } |