summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-03-18 14:53:49 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-03-18 14:53:49 +0100
commitaf80532aa7ee6c313a63a0a83af43e6e64128027 (patch)
tree27e0473faec90d2e5f7f6b65d17fee7bcc31e055
parent76c6a956aef78457309122a14c10541ffe2e7434 (diff)
added support for cleaning json data from stdin
-rw-r--r--cleaner.go40
-rw-r--r--datastructs.go18
-rw-r--r--main.go70
-rw-r--r--sqlQueries.go40
4 files changed, 125 insertions, 43 deletions
diff --git a/cleaner.go b/cleaner.go
index 21b6842..186cd44 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -9,7 +9,7 @@ import (
"time"
)
-func cleanData(cfg *Config) (rowsLeft int, err error) {
+func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
@@ -39,7 +39,7 @@ func cleanData(cfg *Config) (rowsLeft int, err error) {
cDat, err := clean(rDat, cfg)
if err != nil {
- log.Println("Failed to clean data")
+ log.Println("Failed to clean data from db:", err)
return
}
@@ -76,6 +76,22 @@ func cleanData(cfg *Config) (rowsLeft int, err error) {
return
}
+func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error {
+ for rDat := range rDatChan {
+ cDat, err := clean(rDat, cfg)
+ if err != nil {
+ log.Println("Failed to clean data from stdin:", err)
+ return err
+ }
+ err = insertCleanDataToDB(cfg, cDat)
+ if err != nil {
+ log.Println("Failed to insert clean data from stdin:", err)
+ return err
+ }
+ }
+ return nil
+}
+
func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) {
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
@@ -111,12 +127,12 @@ func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) {
return
}
-func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) {
+func clean(rDat []RawData, cfg *Config) (cDat []cleanedData, err error) {
// collect all ips so we can query for their ip blocks
ips := make(map[string]struct{})
for _, rd := range rDat {
- ips[rd.ipSrc] = struct{}{}
- ips[rd.ipDst] = struct{}{}
+ ips[rd.IpSrc] = struct{}{}
+ ips[rd.IpDst] = struct{}{}
}
var iplist []string
@@ -139,14 +155,14 @@ func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) {
}
cDat = append(cDat,
cleanedData{
- ipbSrc: pairs[rd.ipSrc],
- ipbDst: pairs[rd.ipDst],
- asSrc: rd.asSrc,
- asDst: rd.asDst,
- portSrc: rd.portSrc,
- portDst: rd.portDst,
+ ipbSrc: pairs[rd.IpSrc],
+ ipbDst: pairs[rd.IpDst],
+ asSrc: rd.AsSrc,
+ asDst: rd.AsDst,
+ portSrc: rd.PortSrc,
+ portDst: rd.PortDst,
occurences: 1,
- volume: rd.pktLenDist,
+ volume: rd.PktLenDist,
time: tim,
})
}
diff --git a/datastructs.go b/datastructs.go
index 8220c62..f8887ef 100644
--- a/datastructs.go
+++ b/datastructs.go
@@ -4,15 +4,15 @@ import (
"time"
)
-type rawData struct {
- ipSrc string
- ipDst string
- asSrc int
- asDst int
- portSrc int
- portDst int
- packets int
- pktLenDist string
+type RawData struct {
+ IpSrc string `json:ip_src`
+ IpDst string `json:ip_dst`
+ AsSrc int `json:as_src`
+ AsDst int `json:as_dst`
+ PortSrc int `json:port_src`
+ PortDst int `json:port_dst`
+ Packets int `json:packets`
+ PktLenDist string `json:pkt_len_distrib`
time time.Time
}
diff --git a/main.go b/main.go
index 5b94e07..3c2dd4b 100644
--- a/main.go
+++ b/main.go
@@ -3,9 +3,11 @@ package main
import (
"bufio"
"database/sql"
+ "encoding/json"
_ "github.com/go-sql-driver/mysql"
"log"
"os"
+ "strings"
"time"
//"strings"
)
@@ -19,18 +21,13 @@ func main() {
}
log.Println("Done!")
- /*
- stdin := readFromStdin()
- for line := range stdin {
- strs := strings.Split(line, ",")
- for _, str := range strs {
- log.Println(str)
- }
- }
- */
+ input := readFromStdin()
+ rDatChan := parseRawData(input, cfg)
+ cleanFromStdin(rDatChan, cfg)
+
log.Print("Cleaning data...")
starttime := time.Now()
- numOfRowsNotCleaned, err := cleanData(cfg)
+ numOfRowsNotCleaned, err := cleanFromDB(cfg)
if err != nil {
log.Println(err)
log.Println("Exiting...")
@@ -66,13 +63,58 @@ func main() {
//Starts a process that reads from stdin and
//puts the strings read on the returned channel
-func readFromStdin() <-chan string {
-
- out := make(chan string)
+func readFromStdin() <-chan []byte {
+ out := make(chan []byte)
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
- out <- scanner.Text()
+ out <- scanner.Bytes()
+ }
+ close(out)
+ }()
+ return out
+}
+
+func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
+ out := make(chan []RawData)
+ ival, err := cfg.getInterval()
+ if err != nil {
+ log.Println("Could not parse interval: ", err)
+ }
+ timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
+ go func() {
+ rDat := make([]RawData, 0)
+ for line := range in {
+ if !strings.HasPrefix(string(line), "{") {
+ //This should be a break in the output from pmacct
+ //so we deploy our collected data and set a new timeBin
+ ival, err := cfg.getInterval()
+ if err != nil {
+ log.Println("Could not parse interval: ", err)
+ }
+ timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
+ //Send the data if we have something to send
+ if len(rDat) > 0 {
+ out <- rDat
+ rDat = make([]RawData, 0)
+ }
+ continue
+ }
+
+ var rd RawData
+ err := json.Unmarshal(line, rd)
+ if err != nil {
+ log.Println("Failed in parsing json:", err)
+ close(out)
+ return
+ }
+ rd.time = timeBin
+
+ rDat = append(rDat, rd)
+ }
+ //If there is any unsent data after in is closed we make sure to send it.
+ if len(rDat) > 0 {
+ out <- rDat
}
close(out)
}()
diff --git a/sqlQueries.go b/sqlQueries.go
index 6048b4c..03f87fb 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -11,9 +11,9 @@ const (
TIMEZONE = "UTC"
)
-//Retrieves limit rawdata entries that are older than tim
+//Retrieves limit Rawdata entries that are older than tim
//limit <= 0 returns all entries that are older than tim
-func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err error) {
+func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err error) {
var prepSel *sql.Stmt
if cfg.Limit > 0 {
@@ -56,9 +56,9 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err e
return
}
for rows.Next() {
- var r rawData
+ var r RawData
var tim []byte
- err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim)
+ err = rows.Scan(&r.IpSrc, &r.IpDst, &r.AsSrc, &r.AsDst, &r.PortSrc, &r.PortDst, &r.Packets, &r.PktLenDist, &tim)
if err != nil {
log.Println("Failed to scan result of query")
return
@@ -69,7 +69,7 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []rawData, err e
return
}
- _, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist, r.time)
+ _, err = prepUp.Exec(time.Now(), r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist, r.time)
if err != nil {
log.Println("Failed to query prepared update")
tx.Rollback()
@@ -104,15 +104,15 @@ func purgeAllProcessed(db *sql.DB, cfg *Config, tim time.Time) (err error) {
return
}
-//Removes all rawdata that is in rDat from the database
-func purgeRawData(tx *sql.Tx, cfg *Config, rDat []rawData) (err error) {
+//Removes all Rawdata that is in rDat from the database
+func purgeRawData(tx *sql.Tx, cfg *Config, rDat []RawData) (err error) {
prepStmt, err := tx.Prepare("DELETE FROM " + cfg.RawTable + " WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_processed IS NOT NULL LIMIT 1")
if err != nil {
return
}
for _, r := range rDat {
- _, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist)
+ _, err = prepStmt.Exec(r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist)
if err != nil {
return
}
@@ -135,6 +135,30 @@ func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
}
}
+ return err
+}
+
+func insertCleanDataToDB(cfg *Config, cd []cleanedData) error {
+ db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
+ if err != nil {
+ log.Println("Failed to connect to db")
+ return err
+ }
+ defer db.Close()
+
+ prepStmt, err := db.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
+ if err != nil {
+ log.Println("Failed to prepare statement")
+ return err
+ }
+
+ for ix := range cd {
+ _, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences)
+ if err != nil {
+ log.Println("Failed to execute statement")
+ return err
+ }
+ }
return nil
}