summaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go82
1 files changed, 16 insertions, 66 deletions
diff --git a/main.go b/main.go
index 3c2dd4b..05bbac3 100644
--- a/main.go
+++ b/main.go
@@ -1,15 +1,10 @@
package main
import (
- "bufio"
"database/sql"
- "encoding/json"
_ "github.com/go-sql-driver/mysql"
"log"
- "os"
- "strings"
"time"
- //"strings"
)
func main() {
@@ -21,10 +16,26 @@ func main() {
}
log.Println("Done!")
+ switch cfg.DataSource {
+ case "stdin":
+ processFromStdin()
+ case "mysq":
+ pricessFromDB()
+ default:
+ log.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
+ }
+
+ log.Println("Finished processing, now exiting")
+}
+
+func processFromStdin() {
+ log.Println("Starting to process from stdin...")
input := readFromStdin()
rDatChan := parseRawData(input, cfg)
cleanFromStdin(rDatChan, cfg)
+}
+func processFromDB() {
log.Print("Cleaning data...")
starttime := time.Now()
numOfRowsNotCleaned, err := cleanFromDB(cfg)
@@ -58,65 +69,4 @@ func main() {
}
log.Println("Done!")
}
- log.Println("Finished processing, now exiting")
-}
-
-//Starts a process that reads from stdin and
-//puts the strings read on the returned channel
-func readFromStdin() <-chan []byte {
- out := make(chan []byte)
- go func() {
- scanner := bufio.NewScanner(os.Stdin)
- for scanner.Scan() {
- out <- scanner.Bytes()
- }
- close(out)
- }()
- return out
-}
-
-func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
- out := make(chan []RawData)
- ival, err := cfg.getInterval()
- if err != nil {
- log.Println("Could not parse interval: ", err)
- }
- timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
- go func() {
- rDat := make([]RawData, 0)
- for line := range in {
- if !strings.HasPrefix(string(line), "{") {
- //This should be a break in the output from pmacct
- //so we deploy our collected data and set a new timeBin
- ival, err := cfg.getInterval()
- if err != nil {
- log.Println("Could not parse interval: ", err)
- }
- timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
- //Send the data if we have something to send
- if len(rDat) > 0 {
- out <- rDat
- rDat = make([]RawData, 0)
- }
- continue
- }
-
- var rd RawData
- err := json.Unmarshal(line, rd)
- if err != nil {
- log.Println("Failed in parsing json:", err)
- close(out)
- return
- }
- rd.time = timeBin
-
- rDat = append(rDat, rd)
- }
- //If there is any unsent data after in is closed we make sure to send it.
- if len(rDat) > 0 {
- out <- rDat
- }
- close(out)
- }()
- return out
}