diff options
-rw-r--r-- | cleaner.go | 10 | ||||
-rw-r--r-- | datastructs.go | 2 | ||||
-rw-r--r-- | main.go | 2 | ||||
-rw-r--r-- | sqlQueries.go | 16 | ||||
-rw-r--r-- | whois.go | 16 |
5 files changed, 21 insertions, 25 deletions
@@ -53,7 +53,7 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro //Add noise for differential privacy for i := range cDat { - cDat[i].occurances = diffpriv(cDat[i].occurances, 1, conf.Epsilon) + cDat[i].occurences = diffpriv(cDat[i].occurences, 1, conf.Epsilon) } //Begin transaction @@ -133,7 +133,7 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { iplist = append(iplist, ip) } - pairs, err := findASAndIPBlock(iplist...) + pairs, err := findIPBlock(iplist...) if err != nil { return } @@ -152,7 +152,7 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { asDst: rd.asDst, portSrc: rd.portSrc, portDst: rd.portDst, - occurances: 1, + occurences: 1, volume: rd.pktLenDist, time: tim, }) @@ -172,9 +172,9 @@ func removeDups(cDat []CleanData) []CleanData { //Check if an equal struct already is appended for ri := range ret { if ret[ri].equals(&cDat[ci]) { - //If found, increase it occurances instead of + //If found, increase it occurences instead of //appending a new struct - ret[ri].occurances += cDat[ci].occurances + ret[ri].occurences += cDat[ci].occurences found = true break } diff --git a/datastructs.go b/datastructs.go index f937d35..4a30da7 100644 --- a/datastructs.go +++ b/datastructs.go @@ -23,7 +23,7 @@ type CleanData struct { asDst int portSrc int portDst int - occurances int + occurences int volume string time time.Time } @@ -11,7 +11,7 @@ const ( DATABASE_USER = "flowcleaner" DATABASE_PASS = "nil" DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555) - DATABASE_NAME = "netflow" + DATABASE_NAME = "pmacct" ) func main() { diff --git a/sqlQueries.go b/sqlQueries.go index 7683427..05c08c1 100644 --- a/sqlQueries.go +++ b/sqlQueries.go @@ -17,9 +17,9 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err var prepSel *sql.Stmt if limit > 0 { - prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ? LIMIT ?") + prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM acct WHERE stamp_inserted < ? LIMIT ?") } else { - prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ?") + prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM acct WHERE stamp_inserted < ?") } if err != nil { log.Println("Failed to prepare select") @@ -44,7 +44,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err return } - prepUp, err := tx.Prepare("UPDATE raw_data SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?") + prepUp, err := tx.Prepare("UPDATE acct SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?") if err != nil { log.Println("Failed to prepare update") return @@ -80,7 +80,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err //Removes the stamp_processed from every entry that started being proccesed before tim func reprocess(db *sql.DB, tim time.Time) (err error) { - stmt, err := db.Prepare("UPDATE raw_data SET stamp_processed = NULL WHERE stamp_processed < ?") + stmt, err := db.Prepare("UPDATE acct SET stamp_processed = NULL WHERE stamp_processed < ?") if err != nil { return } @@ -91,7 +91,7 @@ func reprocess(db *sql.DB, tim time.Time) (err error) { //Removes all entries in the database that started being processed before tim func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { - stmt, err := db.Prepare("DELETE FROM raw_data WHERE stamp_processed < ? ") + stmt, err := db.Prepare("DELETE FROM acct WHERE stamp_processed < ? ") if err != nil { return } @@ -102,7 +102,7 @@ func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { //Removes all rawdata that is in rDat from the database func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { - prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_lenDist = ? AND stamp_processed IS NOT NULL LIMIT 1") + prepStmt, err := tx.Prepare("DELETE FROM acct WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1") if err != nil { return } @@ -117,13 +117,13 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { } func insertCleanData(tx *sql.Tx, cd *CleanData) error { - prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurances, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") + prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") return err } - _, err = prepStmt.Exec(cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.occurances, cd.volume, cd.time) + _, err = prepStmt.Exec(cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.occurences, cd.volume, cd.time, cd.occurences) if err != nil { log.Println("Failed to execute statement") return err @@ -16,20 +16,15 @@ const ( /* func main() { - pairs, err := findASAndIPBlock("109.105.104.100", "123.123.123.123") + pairs, err := findIPBlock("130.229.137.105", "192.88.99.1") if err != nil { panic(err) } - fmt.Println(pairs) - pairs, err = findASAndIPBlock("123.123.123.123") - if err != nil { - panic(err) - } - fmt.Println(pairs) + log.Println(pairs) } */ -func findASAndIPBlock(domains ...string) (pairs map[string]string, err error) { +func findIPBlock(domains ...string) (pairs map[string]string, err error) { if len(domains) == 0 { return } @@ -49,10 +44,11 @@ func findASAndIPBlock(domains ...string) (pairs map[string]string, err error) { lines := strings.Split(res, "\n") pairs = make(map[string]string, len(lines)-2) - for ix, line := range lines[1 : len(lines)-1] { + for _, line := range lines[1 : len(lines)-1] { content := strings.Split(line, "|") + ipaddr := strings.TrimSpace(content[1]) ipb := strings.TrimSpace(content[2]) - pairs[domains[ix]] = ipb + pairs[ipaddr] = ipb } return } |