summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cleaner.go113
1 files changed, 100 insertions, 13 deletions
diff --git a/cleaner.go b/cleaner.go
index 012b308..e60e573 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -2,6 +2,7 @@ package main
import (
"database/sql"
+ "errors"
_ "github.com/go-sql-driver/mysql"
"time"
)
@@ -11,7 +12,8 @@ const (
DATABASE_PASS = "pass"
DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555)
DATABASE_NAME = "netflow"
- PROCESS_BATCH = 3
+ MAXIMUM_ENTRIES = 10
+ TIMESPAN = "day"
)
type RawData struct {
@@ -22,14 +24,29 @@ type RawData struct {
packetSize int
}
-func derp() (err error) {
+type CleanData struct {
+ ipbSrc string
+ ipbDst string
+ time time.Time
+ port int
+ volume string
+ occurances int
+}
+
+func cleanData() (err error) {
db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME)
if err != nil {
return
}
defer db.Close()
- rDat, err := fetchRawData(db)
+ //Fetch data that should be cleaned
+ rDat, err := fetchRawData(db, MAXIMUM_ENTRIES)
+ if err != nil {
+ return
+ }
+
+ cDat, err := clean(rDat)
if err != nil {
return
}
@@ -40,7 +57,14 @@ func derp() (err error) {
return
}
- //TODO insertCleanedData
+ //save cleaned data
+ for _, cd := range cDat {
+ err = insertCleanData(tx, cd.ipbSrc, cd.ipbDst, cd.volume, cd.time, cd.port, cd.occurances)
+ if err != nil {
+ tx.Rollback()
+ return
+ }
+ }
//remove old data
err = purgeRawData(tx, rDat)
@@ -53,17 +77,79 @@ func derp() (err error) {
return
}
-func insertCleanedData(tx *sql.DB, rDat []RawData) (err error) {
+func getVolSize(s int) string {
+ return "medium"
+}
+
+func getTimespan(t time.Time) (span time.Time, err error) {
+ loc, err := time.LoadLocation("Local")
+ switch {
+ case TIMESPAN == "5min":
+
+ case TIMESPAN == "hour":
+
+ case TIMESPAN == "day":
+ y, m, d := t.Date()
+ if err != nil {
+ return
+ }
+ span = time.Date(y, m, d, 0, 0, 0, 0, loc)
+ default:
+ err = errors.New("Bad timespan")
+ return
+ }
return
}
-func fetchRawData(db *sql.DB) (rDat []RawData, err error) {
+func clean(rDat []RawData) (cDat []CleanData, err error) {
+ // collect all ips so we can query for their ip blocks
+ var ips map[string]*asnipPair
+ for _, rd := range rDat {
+ ips[rd.ipSrc] = nil
+ ips[rd.ipDst] = nil
+ }
+
+ var iplist []string
+ for ip := range ips {
+ iplist = append(iplist, ip)
+ }
+
+ pairs, err := findASAndIPBlock(iplist...)
+ if err != nil {
+ return
+ }
+ for _, p := range pairs {
+ ips[p.ipAdr] = &p
+ }
+
+ for _, rd := range rDat {
+ vol := getVolSize(rd.packetSize)
+ var tim time.Time
+ tim, err = getTimespan(rd.time)
+ if err != nil {
+ return
+ }
+ cDat = append(cDat,
+ CleanData{
+ ipbSrc: ips[rd.ipSrc].ipBlock,
+ ipbDst: ips[rd.ipDst].ipBlock,
+ time: tim,
+ port: rd.port,
+ volume: vol,
+ occurances: 1,
+ })
+ }
+
+ return
+}
+
+func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) {
prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ")
if err != nil {
return
}
- rows, err := prepStmt.Query(PROCESS_BATCH)
+ rows, err := prepStmt.Query(numRows)
if err != nil {
return
}
@@ -79,11 +165,12 @@ func fetchRawData(db *sql.DB) (rDat []RawData, err error) {
return
}
func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
+ prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?")
+ if err != nil {
+ return
+ }
+
for _, r := range rDat {
- prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ?")
- if err != nil {
- return
- }
_, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
if err != nil {
return
@@ -92,8 +179,8 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
return
}
-func insertCleanData(db *sql.DB, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error {
- prepStmt, err := db.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
+func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error {
+ prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurenaces) VALUES ( ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
return err
}