package main import ( "bufio" "encoding/json" "log" "os" "strings" "time" ) var ( stlogger *log.Logger ) func init() { stlogger = log.New(os.Stdout, "[ stdin ]", log.LstdFlags) } //Starts a process that reads from stdin and //puts the strings read on the returned channel func readFromStdin() <-chan []byte { out := make(chan []byte) go func() { if VERBOSE { stlogger.Println("Now listening on stdin...") } scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { out <- []byte(scanner.Text()) } if VERBOSE { stlogger.Println("Finished listening to stdin!") } close(out) }() return out } func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { out := make(chan []RawData) ival, err := cfg.getInterval() if err != nil { stlogger.Println("Could not parse interval: ", err) } timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin if VERBOSE { stlogger.Println("Now parsing data from stdin...") } go func() { rDat := make([]RawData, 0) for line := range in { if len(string(line)) > 0 && !strings.HasPrefix(string(line), "{") { stlogger.Println("Got message:", strings.TrimSpace(string(line))) //This should be a break in the output from pmacct //so we deploy our collected data and set a new timeBin ival, err := cfg.getInterval() if err != nil { stlogger.Println("Could not parse interval: ", err) } timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin //Send the data if we have something to send if len(rDat) > 0 { out <- rDat rDat = make([]RawData, 0) } continue } var rd RawData err := json.Unmarshal(line, &rd) if err != nil { stlogger.Println("Failed in parsing json:", err) close(out) return } rd.time = timeBin rDat = append(rDat, rd) } //If there is any unsent data after in is closed we make sure to send it. if len(rDat) > 0 { out <- rDat } if VERBOSE { stlogger.Println("Finished parsing data from stdin...") } close(out) }() return out }