diff options
-rw-r--r-- | cleaner.go | 21 | ||||
-rw-r--r-- | datastructs.go | 36 | ||||
-rw-r--r-- | sqlQueries.go | 34 | ||||
-rw-r--r-- | whois.go | 17 |
4 files changed, 48 insertions, 60 deletions
@@ -65,7 +65,7 @@ func cleanData(conf Config, db_user, db_pass, db_conn, db_name string) (err erro //save cleaned data for _, cd := range cDat { - err = insertCleanData(tx, cd.ipbSrc, cd.ipbDst, cd.volume, cd.time, cd.port, cd.occurances) + err = insertCleanData(tx, cd) if err != nil { tx.Rollback() log.Println("Failed to save cleaned data") @@ -122,7 +122,7 @@ func getTimespan(t time.Time, conf Config) (span time.Time, err error) { func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { // collect all ips so we can query for their ip blocks - ips := make(map[string]*asnipPair) + ips := make(map[string]struct{}) for _, rd := range rDat { ips[rd.ipSrc] = nil ips[rd.ipDst] = nil @@ -137,16 +137,8 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { if err != nil { return } - for ix, p := range pairs { - ips[p.ipAdr] = &pairs[ix] - } - var vol string for _, rd := range rDat { - vol, err = rd.getVolSize(conf) - if err != nil { - return - } var tim time.Time tim, err = getTimespan(rd.time, conf) if err != nil { @@ -156,10 +148,13 @@ func clean(rDat []RawData, conf Config) (cDat []CleanData, err error) { CleanData{ ipbSrc: ips[rd.ipSrc].ipBlock, ipbDst: ips[rd.ipDst].ipBlock, - time: tim, - port: rd.port, - volume: vol, + asSrc: rd.asSrc, + asDst: rd.asDst, + portSrc: rd.portSrc, + portDst: rd.portDst, occurances: 1, + volume: rd.pktLenDist, + time: tim, }) } diff --git a/datastructs.go b/datastructs.go index a4ac45c..b970cec 100644 --- a/datastructs.go +++ b/datastructs.go @@ -8,34 +8,34 @@ import ( type RawData struct { ipSrc string ipDst string + asSrc int + asDst int + portSrc int + portDst int + packets int + pktLenDist string time time.Time - port int - packetSize int -} - -func (rd *RawData) getVolSize(conf Config) (string, error) { - for _, volume := range conf.Volumes { - if volume.Lower <= rd.packetSize && - (rd.packetSize < volume.Upper || volume.Upper == 0) { - return volume.Size, nil - } - } - return "N/A", errors.New("Could not find a fitting size volume") } type CleanData struct { ipbSrc string ipbDst string - time time.Time - port int - volume string + asSrc int + asDst int + portSrc int + portDst int occurances int + volume string + time time.Time } func (cd *CleanData) equals(other *CleanData) bool { return cd.ipbSrc == other.ipbSrc && cd.ipbDst == other.ipbDst && - cd.time == other.time && - cd.port == other.port && - cd.volume == other.volume + cd.asSrc == other.asSrc && + cd.asDst == othr.asDst && + cd.portSrc == other.portSrc && + cd.portDst == other.portDst && + cd.volume == other.volume && + cd.time == other.time } diff --git a/sqlQueries.go b/sqlQueries.go index 805248c..34f802a 100644 --- a/sqlQueries.go +++ b/sqlQueries.go @@ -17,9 +17,9 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err var prepSel *sql.Stmt if limit > 0 { - prepSel, err = db.Prepare("SELECT ip_src,ip_dst,time,port,packet_size FROM raw_data WHERE time < ? LIMIT ?") + prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ? LIMIT ?") } else { - prepSel, err = db.Prepare("SELECT ip_src,ip_dst,time,port,packet_size FROM raw_data WHERE time < ?") + prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ?") } if err != nil { log.Println("Failed to prepare select") @@ -44,7 +44,7 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err return } - prepUp, err := tx.Prepare("UPDATE raw_data SET process_time = ? where ip_src = ? AND ip_dst = ? AND time = ? AND port = ? and packet_size = ? AND process_time IS NULL LIMIT 1") + prepUp, err := tx.Prepare("UPDATE raw_data SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?") if err != nil { log.Println("Failed to prepare update") return @@ -54,14 +54,18 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err for rows.Next() { var r RawData var tim []byte - err = rows.Scan(&r.ipSrc, &r.ipDst, &tim, &r.port, &r.packetSize) - r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc) + err = rows.Scan(&rd.ipSrc, &rd.ipDst, &rd.asSrc, &rd.asDst, &rd.portSrc, &rd.portDst, &rd.packets, &rd.pktLenDist, &tim) if err != nil { log.Println("Failed to scan result of query") return } + r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc) + if err != nil { + log.Println("Failed to parse timestamp") + return + } - _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) + _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist, r.time) if err != nil { log.Println("Failed to query prepared update") tx.Rollback() @@ -74,9 +78,9 @@ func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err err return } -//Removes the process_time from every entry that started being proccesed before tim +//Removes the stamp_processed from every entry that started being proccesed before tim func reprocess(db *sql.DB, tim time.Time) (err error) { - stmt, err := db.Prepare("UPDATE raw_data SET process_time = NULL WHERE process_time < ?") + stmt, err := db.Prepare("UPDATE raw_data SET stamp_processed = NULL WHERE stamp_processed < ?") if err != nil { return } @@ -86,8 +90,8 @@ func reprocess(db *sql.DB, tim time.Time) (err error) { } //Removes all entries in the database that started being processed before tim -func purgeProcessed(db *sql.DB, tim time.Time) (err error) { - stmt, err := db.Prepare("DELETE FROM raw_data WHERE process_time < ? ") +func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) { + stmt, err := db.Prepare("DELETE FROM raw_data WHERE stamp_processed < ? ") if err != nil { return } @@ -98,13 +102,13 @@ func purgeProcessed(db *sql.DB, tim time.Time) (err error) { //Removes all rawdata that is in rDat from the database func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { - prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? AND process_time IS NOT NULL LIMIT 1") + prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_lenDist = ? AND stamp_processed IS NOT NULL LIMIT 1") if err != nil { return } for _, r := range rDat { - _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) + _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist) if err != nil { return } @@ -112,14 +116,14 @@ func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { return } -func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, tim time.Time, port, occurences int) error { - prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurences) VALUES ( ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") +func insertCleanData(tx *sql.Tx, cd *CleanData) error { + prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurances, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { log.Println("Failed to prepare statement") return err } - _, err = prepStmt.Exec(ipbSrc, ipbDst, tim, port, volume, occurences, occurences) + _, err = prepStmt.Exec(cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.occurances, cd.volume, cd.time) if err != nil { log.Println("Failed to execute statement") return err @@ -14,12 +14,6 @@ const ( RETRIES = 1 ) -type asnipPair struct { - asNum string - ipAdr string - ipBlock string -} - /* func main() { pairs, err := findASAndIPBlock("109.105.104.100", "123.123.123.123") @@ -35,7 +29,7 @@ func main() { } */ -func findASAndIPBlock(domains ...string) (pairs []asnipPair, err error) { +func findASAndIPBlock(domains ...string) (pairs map[string]string, err error) { if len(domains) == 0 { return } @@ -54,16 +48,11 @@ func findASAndIPBlock(domains ...string) (pairs []asnipPair, err error) { } lines := strings.Split(res, "\n") - pairs = make([]asnipPair, 0, len(lines)-2) + pairs = make(map[string]string, 0, len(lines)-2) for ix, line := range lines[1 : len(lines)-1] { content := strings.Split(line, "|") - as := strings.TrimSpace(content[0]) ipb := strings.TrimSpace(content[2]) - pairs = append(pairs, asnipPair{ - asNum: as, - ipBlock: ipb, - ipAdr: domains[ix], - }) + pairs[domains[ix]] = ipb } return } |