summaryrefslogtreecommitdiff
path: root/cleaner.go
diff options
context:
space:
mode:
Diffstat (limited to 'cleaner.go')
-rw-r--r--cleaner.go151
1 files changed, 151 insertions, 0 deletions
diff --git a/cleaner.go b/cleaner.go
new file mode 100644
index 0000000..012b308
--- /dev/null
+++ b/cleaner.go
@@ -0,0 +1,151 @@
+package main
+
+import (
+ "database/sql"
+ _ "github.com/go-sql-driver/mysql"
+ "time"
+)
+
+const (
+ DATABASE_USER = "root"
+ DATABASE_PASS = "pass"
+ DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555)
+ DATABASE_NAME = "netflow"
+ PROCESS_BATCH = 3
+)
+
+type RawData struct {
+ ipSrc string
+ ipDst string
+ time time.Time
+ port int
+ packetSize int
+}
+
+func derp() (err error) {
+ db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME)
+ if err != nil {
+ return
+ }
+ defer db.Close()
+
+ rDat, err := fetchRawData(db)
+ if err != nil {
+ return
+ }
+
+ //Begin transaction
+ tx, err := db.Begin()
+ if err != nil {
+ return
+ }
+
+ //TODO insertCleanedData
+
+ //remove old data
+ err = purgeRawData(tx, rDat)
+ if err != nil {
+ tx.Rollback()
+ return
+ }
+
+ tx.Commit()
+ return
+}
+
+func insertCleanedData(tx *sql.DB, rDat []RawData) (err error) {
+ return
+}
+
+func fetchRawData(db *sql.DB) (rDat []RawData, err error) {
+ prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ")
+ if err != nil {
+ return
+ }
+
+ rows, err := prepStmt.Query(PROCESS_BATCH)
+ if err != nil {
+ return
+ }
+
+ for rows.Next() {
+ var r RawData
+ err = rows.Scan(&r.ipSrc, &r.ipDst, &r.time, &r.port, &r.packetSize)
+ if err != nil {
+ return
+ }
+ rDat = append(rDat, r)
+ }
+ return
+}
+func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
+ for _, r := range rDat {
+ prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?")
+ if err != nil {
+ return
+ }
+ _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
+ if err != nil {
+ return
+ }
+ }
+ return
+}
+
+func insertCleanData(db *sql.DB, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error {
+ prepStmt, err := db.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
+ if err != nil {
+ return err
+ }
+
+ _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func insertASNIP(db *sql.DB, asn int, ipBlock string) error {
+ prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?")
+ if err != nil {
+ return err
+ }
+ defer prepCheck.Close()
+
+ rows, err := prepCheck.Exec(ipBlock)
+ if err != nil {
+ return err
+ }
+ if rows != nil {
+ return nil
+ }
+
+ prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )")
+ if err != nil {
+ return err
+ }
+ defer prepIns.Close()
+
+ _, err = prepIns.Exec(asn, ipBlock)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
+ prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?")
+ if err != nil {
+ return err
+ }
+ defer prepStmt.Close()
+
+ _, err = prepStmt.Exec(asn, ipBlock)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}