diff options
-rw-r--r-- | cleaner.go | 40 | ||||
-rw-r--r-- | datastructs.go | 18 | ||||
-rw-r--r-- | main.go | 70 | ||||
-rw-r--r-- | sqlQueries.go | 40 |
4 files changed, 125 insertions, 43 deletions
@@ -9,7 +9,7 @@ import ( "time" ) -func cleanData(cfg *Config) (rowsLeft int, err error) { +func cleanFromDB(cfg *Config) (rowsLeft int, err error) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { @@ -39,7 +39,7 @@ func cleanData(cfg *Config) (rowsLeft int, err error) { cDat, err := clean(rDat, cfg) if err != nil { - log.Println("Failed to clean data") + log.Println("Failed to clean data from db:", err) return } @@ -76,6 +76,22 @@ func cleanData(cfg *Config) (rowsLeft int, err error) { return } +func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error { + for rDat := range rDatChan { + cDat, err := clean(rDat, cfg) + if err != nil { + log.Println("Failed to clean data from stdin:", err) + return err + } + err = insertCleanDataToDB(cfg, cDat) + if err != nil { + log.Println("Failed to insert clean data from stdin:", err) + return err + } + } + return nil +} + func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) { loc, err := time.LoadLocation(TIMEZONE) if err != nil { @@ -111,12 +127,12 @@ func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) { return } -func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) { +func clean(rDat []RawData, cfg *Config) (cDat []cleanedData, err error) { // collect all ips so we can query for their ip blocks ips := make(map[string]struct{}) for _, rd := range rDat { - ips[rd.ipSrc] = struct{}{} - ips[rd.ipDst] = struct{}{} + ips[rd.IpSrc] = struct{}{} + ips[rd.IpDst] = struct{}{} } var iplist []string @@ -139,14 +155,14 @@ func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) { } cDat = append(cDat, cleanedData{ - ipbSrc: pairs[rd.ipSrc], - ipbDst: pairs[rd.ipDst], - asSrc: rd.asSrc, - asDst: rd.asDst, - portSrc: rd.portSrc, - portDst: rd.portDst, + ipbSrc: pairs[rd.IpSrc], + ipbDst: pairs[rd.IpDst], + asSrc: rd.AsSrc, + asDst: rd.AsDst, + portSrc: rd.PortSrc, + portDst: rd.PortDst, occurences: 1, - volume: rd.pktLenDist, + volume: rd.PktLenDist, time: tim, }) } diff --git a/datastructs.go b/datastructs.go index 8220c62..f8887ef 100644 --- a/datastructs.go +++ b/datastructs.go @@ -4,15 +4,15 @@ import ( "time" ) -type rawData struct { - ipSrc string - ipDst string - asSrc int - asDst int - portSrc int - portDst int - packets int - pktLenDist string +type RawData struct { + IpSrc string `json:ip_src` + IpDst string `json:ip_dst` + AsSrc int `json:as_src` + AsDst int `json:as_dst` + PortSrc int `json:port_src` + PortDst int `json:port_dst` + Packets int `json:packets` + PktLenDist string `json:pkt_len_distrib` time time.Time } @@ -3,9 +3,11 @@ package main import ( "bufio" "database/sql" + "encoding/json" _ "github.com/go-sql-driver/mysql" "log" "os" + "strings" "time" //"strings" ) @@ -19,18 +21,13 @@ func main() { } log.Println("Done!") - /* - stdin := readFromStdin() - for line := range stdin { - strs := strings.Split(line, ",") - for _, str := range strs { - log.Println(str) - } - } - */ + input := readFromStdin() + rDatChan := parseRawData(input, cfg) + cleanFromStdin(rDatChan, cfg) + log.Print("Cleaning data...") starttime := time.Now() - numOfRowsNotCleaned, err := cleanData(cfg) + numOfRowsNotCleaned, err := cleanFromDB(cfg) if err != nil { log.Println(err) log.Println("Exiting...") @@ -66,13 +63,58 @@ func main() { //Starts a process that reads from stdin and //puts the strings read on the returned channel -func readFromStdin() <-chan string { - - out := make(chan string) +func readFromStdin() <-chan []byte { + out := make(chan []byte) go func() { scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { - out <- scanner.Text() + out <- scanner.Bytes() + } + close(out) + }() + return out +} + +func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { + out := make(chan []RawData) + ival, err := cfg.getInterval() + if err != nil { + log.Println("Could not parse interval: ", err) + } + timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin + go func() { + rDat := make([]RawData, 0) + for line := range in { + if !strings.HasPrefix(string(line), "{") { + //This should be a break in the output from pmacct + //so we deploy our collected data and set a new timeBin + ival, err := cfg.getInterval() + if err != nil { + log.Println("Could not parse interval: ", err) + } + timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin + //Send the data if we have something to send + if len(rDat) > 0 { + out <- rDat + rDat = make([]RawData, 0) + } + continue + } + + var rd RawData + err := json.Unmarshal(line, rd) + if err != nil { + log.Println("Failed in parsing json:", err) + close(out) + return + } + rd.time = timeBin + + rDat = append(rDat, rd) + } + //If there is any unsent data after in is closed we make sure to send it. + if len(rDat) > 0 { + out <- rDat } close(out) }() diff --git a/sqlQueries.go b/sqlQueries.go index 6048b4c..03f87fb 100644 --- a/sqlQueries.go +++ b/sqlQueries.go @@ -11,9 +11,9 @@ const ( TIMEZONE = "UTC" ) -//Retrieves limit rawdata entries that are older than tim +//Retrieves limit Rawdata entries that are older than tim //limit <= 0 returns all entries that are older than tim -func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err error) { +func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err error) { var prepSel *sql.Stmt if cfg.Limit > 0 { @@ -56,9 +56,9 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err e return } for rows.Next() { - var r rawData + var r RawData var tim []byte - err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim) + err = rows.Scan(&r.IpSrc, &r.IpDst, &r.AsSrc, &r.AsDst, &r.PortSrc, &r.PortDst, &r.Packets, &r.PktLenDist, &tim) if err != nil { log.Println("Failed to scan result of query") return @@ -69,7 +69,7 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err e return } - _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist, r.time) + _, err = prepUp.Exec(time.Now(), r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist, r.time) if err != nil { log.Println("Failed to query prepared update") tx.Rollback() @@ -104,15 +104,15 @@ func purgeAllProcessed(db *sql.DB, cfg *Config, tim time.Time) (err error) { return } -//Removes all rawdata that is in rDat from the database -func purgeRawData(tx *sql.Tx, cfg *Config, rDat []rawData) (err error) { +//Removes all Rawdata that is in rDat from the database +func purgeRawData(tx *sql.Tx, cfg *Config, rDat []RawData) (err error) { prepStmt, err := tx.Prepare("DELETE FROM " + cfg.RawTable + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1") if err != nil { return } for _, r := range rDat { - _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist) + _, err = prepStmt.Exec(r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist) if err != nil { return } @@ -135,6 +135,30 @@ func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error { } } + return err +} + +func insertCleanDataToDB(cfg *Config, cd []cleanedData) error { + db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) + if err != nil { + log.Println("Failed to connect to db") + return err + } + defer db.Close() + + prepStmt, err := db.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") + if err != nil { + log.Println("Failed to prepare statement") + return err + } + + for ix := range cd { + _, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences) + if err != nil { + log.Println("Failed to execute statement") + return err + } + } return nil } |