summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cleaner.go141
-rw-r--r--datastructs.go29
-rw-r--r--main.go34
-rw-r--r--sqlQueries.go104
4 files changed, 187 insertions, 121 deletions
diff --git a/cleaner.go b/cleaner.go
index b001c0a..6666e91 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -17,23 +17,6 @@ const (
TIMESPAN = "day"
)
-type RawData struct {
- ipSrc string
- ipDst string
- time time.Time
- port int
- packetSize int
-}
-
-type CleanData struct {
- ipbSrc string
- ipbDst string
- time time.Time
- port int
- volume string
- occurances int
-}
-
func cleanData() (err error) {
db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME)
if err != nil {
@@ -84,10 +67,6 @@ func cleanData() (err error) {
return
}
-func getVolSize(s int) string {
- return "medium"
-}
-
func getTimespan(t time.Time) (span time.Time, err error) {
loc, err := time.LoadLocation("Local")
switch {
@@ -130,7 +109,7 @@ func clean(rDat []RawData) (cDat []CleanData, err error) {
}
for _, rd := range rDat {
- vol := getVolSize(rd.packetSize)
+ vol := rd.getVolSize()
var tim time.Time
tim, err = getTimespan(rd.time)
if err != nil {
@@ -147,107 +126,33 @@ func clean(rDat []RawData) (cDat []CleanData, err error) {
})
}
- return
-}
+ cDat = removeDups(cDat)
-func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) {
- prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ")
- if err != nil {
- log.Println("Failed to prepare statement")
- return
- }
-
- rows, err := prepStmt.Query(numRows)
- if err != nil {
- log.Println("Failed to query prepared statement")
- return
- }
-
- loc, err := time.LoadLocation("Local")
- for rows.Next() {
- var r RawData
- var tim []byte
- err = rows.Scan(&r.ipSrc, &r.ipDst, &tim, &r.port, &r.packetSize)
- r.time, err = time.ParseInLocation("2006-02-01 15:04:05", string(tim), loc)
- if err != nil {
- log.Println("Failed to scan result of query")
- return
- }
- rDat = append(rDat, r)
- }
return
}
-func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
- prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? LIMIT 1")
- if err != nil {
- return
- }
- for _, r := range rDat {
- _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
- if err != nil {
- return
+func removeDups(cDat []CleanData) []CleanData {
+ ret := make([]CleanData, 0)
+ var found bool
+ for _, d0 := range cDat {
+ found = false
+
+ //Check if an equal struct already is appended
+ for _, d1 := range ret {
+ if d1.equals(d0) {
+ //If found, increase it occurances instead of
+ //appending a new struct
+ d1.occurances += d0.occurances
+ found = true
+ break
+ }
}
- }
- return
-}
-func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error {
- prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurences) VALUES ( ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
- if err != nil {
- log.Println("Failed to prepare statement")
- return err
- }
-
- _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences)
- if err != nil {
- log.Println("Failed to execute statement")
- return err
- }
-
- return nil
-}
-
-func insertASNIP(db *sql.DB, asn int, ipBlock string) error {
- prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?")
- if err != nil {
- return err
- }
- defer prepCheck.Close()
-
- rows, err := prepCheck.Exec(ipBlock)
- if err != nil {
- return err
- }
- if rows != nil {
- return nil
- }
-
- prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )")
- if err != nil {
- return err
- }
- defer prepIns.Close()
-
- _, err = prepIns.Exec(asn, ipBlock)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
- prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?")
- if err != nil {
- return err
- }
- defer prepStmt.Close()
-
- _, err = prepStmt.Exec(asn, ipBlock)
- if err != nil {
- return err
+ if !found {
+ //if no equal struct is found
+ //append it
+ ret = append(ret, d0)
+ }
}
-
- return nil
+ return ret
}
diff --git a/datastructs.go b/datastructs.go
new file mode 100644
index 0000000..c885dea
--- /dev/null
+++ b/datastructs.go
@@ -0,0 +1,29 @@
+package main
+
+type RawData struct {
+ ipDst string
+ time time.Time
+ port int
+ packetSize int
+}
+
+func (rd *RawData) getVolSize() string {
+ return "medium"
+}
+
+type CleanData struct {
+ ipbSrc string
+ ipbDst string
+ time time.Time
+ port int
+ volume string
+ occurances int
+}
+
+func (cd *CleanData) equals(other *CleanData) bool {
+ return cd.ipdbSrc == other.ipbSrc &&
+ cd.ipbDst == other.ipbDst &&
+ cd.time == other.time &&
+ cd.port == other.port &&
+ cd.volume == other.volume
+}
diff --git a/main.go b/main.go
index f47ffef..cd19454 100644
--- a/main.go
+++ b/main.go
@@ -1,12 +1,40 @@
package main
import (
+ "bufio"
"log"
+ "os"
+ "strings"
)
func main() {
- err := cleanData()
- if err != nil {
- log.Println(err)
+
+ stdin := readFromStdin()
+ for line := range stdin {
+ strs := strings.Split(line, ",")
+ for _, str := range strs {
+ log.Println(str)
+ }
}
+ /*
+ err := cleanData()
+ if err != nil {
+ log.Println(err)
+ }
+ */
+}
+
+//Starts a process that reads from stdin and
+//puts the strings read on the returned channel
+func readFromStdin() <-chan string {
+
+ out := make(chan string)
+ go func() {
+ scanner := bufio.NewScanner(os.Stdin)
+ for scanner.Scan() {
+ out <- scanner.Text()
+ }
+ close(out)
+ }()
+ return out
}
diff --git a/sqlQueries.go b/sqlQueries.go
new file mode 100644
index 0000000..4f58242
--- /dev/null
+++ b/sqlQueries.go
@@ -0,0 +1,104 @@
+package main
+
+func fetchRawData(db *sql.DB, numRows int) (rDat []RawData, err error) {
+ prepStmt, err := db.Prepare("SELECT * FROM raw_data LIMIT ? ")
+ if err != nil {
+ log.Println("Failed to prepare statement")
+ return
+ }
+
+ rows, err := prepStmt.Query(numRows)
+ if err != nil {
+ log.Println("Failed to query prepared statement")
+ return
+ }
+
+ loc, err := time.LoadLocation("Local")
+ for rows.Next() {
+ var r RawData
+ var tim []byte
+ err = rows.Scan(&r.ipSrc, &r.ipDst, &tim, &r.port, &r.packetSize)
+ r.time, err = time.ParseInLocation("2006-02-01 15:04:05", string(tim), loc)
+ if err != nil {
+ log.Println("Failed to scan result of query")
+ return
+ }
+ rDat = append(rDat, r)
+ }
+ return
+}
+
+func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
+ prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND time = ? AND port = ? AND packet_size = ? LIMIT 1")
+ if err != nil {
+ return
+ }
+
+ for _, r := range rDat {
+ _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.time, r.port, r.packetSize)
+ if err != nil {
+ return
+ }
+ }
+ return
+}
+
+func insertCleanData(tx *sql.Tx, ipbSrc, ipbDst, volume string, time time.Time, port, occurences int) error {
+ prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, time, port, volume, occurences) VALUES ( ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
+ if err != nil {
+ log.Println("Failed to prepare statement")
+ return err
+ }
+
+ _, err = prepStmt.Exec(ipbSrc, ipbDst, time, port, volume, occurences, occurences)
+ if err != nil {
+ log.Println("Failed to execute statement")
+ return err
+ }
+
+ return nil
+}
+
+func insertASNIP(db *sql.DB, asn int, ipBlock string) error {
+ prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?")
+ if err != nil {
+ return err
+ }
+ defer prepCheck.Close()
+
+ rows, err := prepCheck.Exec(ipBlock)
+ if err != nil {
+ return err
+ }
+ if rows != nil {
+ return nil
+ }
+
+ prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )")
+ if err != nil {
+ return err
+ }
+ defer prepIns.Close()
+
+ _, err = prepIns.Exec(asn, ipBlock)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
+ prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?")
+ if err != nil {
+ return err
+ }
+ defer prepStmt.Close()
+
+ _, err = prepStmt.Exec(asn, ipBlock)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}