diff options
Diffstat (limited to 'main.go')
-rw-r--r-- | main.go | 70 |
1 files changed, 56 insertions, 14 deletions
@@ -3,9 +3,11 @@ package main import ( "bufio" "database/sql" + "encoding/json" _ "github.com/go-sql-driver/mysql" "log" "os" + "strings" "time" //"strings" ) @@ -19,18 +21,13 @@ func main() { } log.Println("Done!") - /* - stdin := readFromStdin() - for line := range stdin { - strs := strings.Split(line, ",") - for _, str := range strs { - log.Println(str) - } - } - */ + input := readFromStdin() + rDatChan := parseRawData(input, cfg) + cleanFromStdin(rDatChan, cfg) + log.Print("Cleaning data...") starttime := time.Now() - numOfRowsNotCleaned, err := cleanData(cfg) + numOfRowsNotCleaned, err := cleanFromDB(cfg) if err != nil { log.Println(err) log.Println("Exiting...") @@ -66,13 +63,58 @@ func main() { //Starts a process that reads from stdin and //puts the strings read on the returned channel -func readFromStdin() <-chan string { - - out := make(chan string) +func readFromStdin() <-chan []byte { + out := make(chan []byte) go func() { scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { - out <- scanner.Text() + out <- scanner.Bytes() + } + close(out) + }() + return out +} + +func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { + out := make(chan []RawData) + ival, err := cfg.getInterval() + if err != nil { + log.Println("Could not parse interval: ", err) + } + timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin + go func() { + rDat := make([]RawData, 0) + for line := range in { + if !strings.HasPrefix(string(line), "{") { + //This should be a break in the output from pmacct + //so we deploy our collected data and set a new timeBin + ival, err := cfg.getInterval() + if err != nil { + log.Println("Could not parse interval: ", err) + } + timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin + //Send the data if we have something to send + if len(rDat) > 0 { + out <- rDat + rDat = make([]RawData, 0) + } + continue + } + + var rd RawData + err := json.Unmarshal(line, rd) + if err != nil { + log.Println("Failed in parsing json:", err) + close(out) + return + } + rd.time = timeBin + + rDat = append(rDat, rd) + } + //If there is any unsent data after in is closed we make sure to send it. + if len(rDat) > 0 { + out <- rDat } close(out) }() |