summaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go70
1 files changed, 56 insertions, 14 deletions
diff --git a/main.go b/main.go
index 5b94e07..3c2dd4b 100644
--- a/main.go
+++ b/main.go
@@ -3,9 +3,11 @@ package main
import (
"bufio"
"database/sql"
+ "encoding/json"
_ "github.com/go-sql-driver/mysql"
"log"
"os"
+ "strings"
"time"
//"strings"
)
@@ -19,18 +21,13 @@ func main() {
}
log.Println("Done!")
- /*
- stdin := readFromStdin()
- for line := range stdin {
- strs := strings.Split(line, ",")
- for _, str := range strs {
- log.Println(str)
- }
- }
- */
+ input := readFromStdin()
+ rDatChan := parseRawData(input, cfg)
+ cleanFromStdin(rDatChan, cfg)
+
log.Print("Cleaning data...")
starttime := time.Now()
- numOfRowsNotCleaned, err := cleanData(cfg)
+ numOfRowsNotCleaned, err := cleanFromDB(cfg)
if err != nil {
log.Println(err)
log.Println("Exiting...")
@@ -66,13 +63,58 @@ func main() {
//Starts a process that reads from stdin and
//puts the strings read on the returned channel
-func readFromStdin() <-chan string {
-
- out := make(chan string)
+func readFromStdin() <-chan []byte {
+ out := make(chan []byte)
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
- out <- scanner.Text()
+ out <- scanner.Bytes()
+ }
+ close(out)
+ }()
+ return out
+}
+
+func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
+ out := make(chan []RawData)
+ ival, err := cfg.getInterval()
+ if err != nil {
+ log.Println("Could not parse interval: ", err)
+ }
+ timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
+ go func() {
+ rDat := make([]RawData, 0)
+ for line := range in {
+ if !strings.HasPrefix(string(line), "{") {
+ //This should be a break in the output from pmacct
+ //so we deploy our collected data and set a new timeBin
+ ival, err := cfg.getInterval()
+ if err != nil {
+ log.Println("Could not parse interval: ", err)
+ }
+ timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
+ //Send the data if we have something to send
+ if len(rDat) > 0 {
+ out <- rDat
+ rDat = make([]RawData, 0)
+ }
+ continue
+ }
+
+ var rd RawData
+ err := json.Unmarshal(line, rd)
+ if err != nil {
+ log.Println("Failed in parsing json:", err)
+ close(out)
+ return
+ }
+ rd.time = timeBin
+
+ rDat = append(rDat, rd)
+ }
+ //If there is any unsent data after in is closed we make sure to send it.
+ if len(rDat) > 0 {
+ out <- rDat
}
close(out)
}()