diff options
-rw-r--r-- | cleaner.go | 141 | ||||
-rw-r--r-- | datastructs.go | 29 | ||||
-rw-r--r-- | main.go | 34 | ||||
-rw-r--r-- | sqlQueries.go | 104 |
4 files changed, 187 insertions, 121 deletions
@@ -17,23 +17,6 @@ const ( TIMESPAN = "day" ) -type RawData struct { - ipSrc string - ipDst string - time time.Time - port int - packetSize int -} - -type CleanData struct { - ipbSrc string - ipbDst string - time time.Time - port int - volume string - occurances int -} - func cleanData() (err error) { db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME) if err != nil { @@ -84,10 +67,6 @@ func cleanData() (err error) { return } -func getVolSize(s int) string { - return "medium" -} - func getTimespan(t time.Time) (span time.Time, err error) { loc, err := time.LoadLocation("Local") switch { @@ -130,7 +109,7 @@ func clean(rDat []RawData) (cDat []CleanData, err error) { } for _, rd := range rDat { - vol := getVolSize(rd.packetSize) + vol := rd.getVolSize() var tim time.Time tim, err = getTimespan(rd.time) if err != nil { @@ -147,107 +126,33 @@ func clean(rDat []RawData) (cDat []CleanData, err error) { }) } - return -} + cDat = removeDups(cDat) -func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) { - prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ") - if err != nil { - log.Println("Failed to prepare statement") - return - } - - rows, err := prepStmt.Query(numRows) - if err != nil { - log.Println("Failed to query prepared statement") - return - } - - loc, err := time.LoadLocation("Local") - for rows.Next() { - var r RawData - var tim []byte - err = rows.Scan(&r.ipSrc, &r.ipDst, &tim, &r.port, &r.packetSize) - r.time, err = time.ParseInLocation("2006-02-01 15:04:05", string(tim), loc) - if err != nil { - log.Println("Failed to scan result of query") - return - } - rDat = append(rDat, r) - } return } -func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { - prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? LIMIT 1") - if err != nil { - return - } - for _, r := range rDat { - _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) - if err != nil { - return +func removeDups(cDat []CleanData) []CleanData { + ret := make([]CleanData, 0) + var found bool + for _, d0 := range cDat { + found = false + + //Check if an equal struct already is appended + for _, d1 := range ret { + if d1.equals(d0) { + //If found, increase it occurances instead of + //appending a new struct + d1.occurances += d0.occurances + found = true + break + } } - } - return -} -func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error { - prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurences) VALUES ( ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") - if err != nil { - log.Println("Failed to prepare statement") - return err - } - - _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences) - if err != nil { - log.Println("Failed to execute statement") - return err - } - - return nil -} - -func insertASNIP(db *sql.DB, asn int, ipBlock string) error { - prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?") - if err != nil { - return err - } - defer prepCheck.Close() - - rows, err := prepCheck.Exec(ipBlock) - if err != nil { - return err - } - if rows != nil { - return nil - } - - prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )") - if err != nil { - return err - } - defer prepIns.Close() - - _, err = prepIns.Exec(asn, ipBlock) - if err != nil { - return err - } - - return nil -} - -func removeASNIP(db *sql.DB, asn int, ipBlock string) error { - prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?") - if err != nil { - return err - } - defer prepStmt.Close() - - _, err = prepStmt.Exec(asn, ipBlock) - if err != nil { - return err + if !found { + //if no equal struct is found + //append it + ret = append(ret, d0) + } } - - return nil + return ret } diff --git a/datastructs.go b/datastructs.go new file mode 100644 index 0000000..c885dea --- /dev/null +++ b/datastructs.go @@ -0,0 +1,29 @@ +package main + +type RawData struct { + ipDst string + time time.Time + port int + packetSize int +} + +func (rd *RawData) getVolSize() string { + return "medium" +} + +type CleanData struct { + ipbSrc string + ipbDst string + time time.Time + port int + volume string + occurances int +} + +func (cd *CleanData) equals(other *CleanData) bool { + return cd.ipdbSrc == other.ipbSrc && + cd.ipbDst == other.ipbDst && + cd.time == other.time && + cd.port == other.port && + cd.volume == other.volume +} @@ -1,12 +1,40 @@ package main import ( + "bufio" "log" + "os" + "strings" ) func main() { - err := cleanData() - if err != nil { - log.Println(err) + + stdin := readFromStdin() + for line := range stdin { + strs := strings.Split(line, ",") + for _, str := range strs { + log.Println(str) + } } + /* + err := cleanData() + if err != nil { + log.Println(err) + } + */ +} + +//Starts a process that reads from stdin and +//puts the strings read on the returned channel +func readFromStdin() <-chan string { + + out := make(chan string) + go func() { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + out <- scanner.Text() + } + close(out) + }() + return out } diff --git a/sqlQueries.go b/sqlQueries.go new file mode 100644 index 0000000..4f58242 --- /dev/null +++ b/sqlQueries.go @@ -0,0 +1,104 @@ +package main + +func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) { + prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ") + if err != nil { + log.Println("Failed to prepare statement") + return + } + + rows, err := prepStmt.Query(numRows) + if err != nil { + log.Println("Failed to query prepared statement") + return + } + + loc, err := time.LoadLocation("Local") + for rows.Next() { + var r RawData + var tim []byte + err = rows.Scan(&r.ipSrc, &r.ipDst, &tim, &r.port, &r.packetSize) + r.time, err = time.ParseInLocation("2006-02-01 15:04:05", string(tim), loc) + if err != nil { + log.Println("Failed to scan result of query") + return + } + rDat = append(rDat, r) + } + return +} + +func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) { + prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? LIMIT 1") + if err != nil { + return + } + + for _, r := range rDat { + _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize) + if err != nil { + return + } + } + return +} + +func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error { + prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurences) VALUES ( ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?") + if err != nil { + log.Println("Failed to prepare statement") + return err + } + + _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences) + if err != nil { + log.Println("Failed to execute statement") + return err + } + + return nil +} + +func insertASNIP(db *sql.DB, asn int, ipBlock string) error { + prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?") + if err != nil { + return err + } + defer prepCheck.Close() + + rows, err := prepCheck.Exec(ipBlock) + if err != nil { + return err + } + if rows != nil { + return nil + } + + prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )") + if err != nil { + return err + } + defer prepIns.Close() + + _, err = prepIns.Exec(asn, ipBlock) + if err != nil { + return err + } + + return nil +} + +func removeASNIP(db *sql.DB, asn int, ipBlock string) error { + prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?") + if err != nil { + return err + } + defer prepStmt.Close() + + _, err = prepStmt.Exec(asn, ipBlock) + if err != nil { + return err + } + + return nil +} |