summaryrefslogtreecommitdiff
path: root/stdin.go
blob: 50976b71e55fbcb2cda0c4a5827e4310e7629a07 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package main

import (
	"bufio"
	"encoding/json"
	"log"
	"os"
	"strings"
	"time"
)

var (
	stlogger *log.Logger
)

func init() {
	stlogger = log.New(os.Stdout, "[ stdin ]", log.LstdFlags)
}

//Starts a process that reads from stdin and
//puts the strings read on the returned channel
func readFromStdin() <-chan []byte {
	out := make(chan []byte)
	go func() {
		if VERBOSE {
			stlogger.Println("Now listening on stdin...")
		}
		scanner := bufio.NewScanner(os.Stdin)
		for scanner.Scan() {
			out <- []byte(scanner.Text())
		}
		if VERBOSE {
			stlogger.Println("Finished listening to stdin!")
		}
		close(out)
	}()
	return out
}

func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
	out := make(chan []RawData)
	ival, err := cfg.getInterval()
	if err != nil {
		stlogger.Println("Could not parse interval: ", err)
	}
	timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
	if VERBOSE {
		stlogger.Println("Now parsing data from stdin...")
	}
	go func() {
		rDat := make([]RawData, 0)
		for line := range in {
			if len(string(line)) > 0 && !strings.HasPrefix(string(line), "{") {
				stlogger.Println("Got message:", strings.TrimSpace(string(line)))
				//This should be a break in the output from pmacct
				//so we deploy our collected data and set a new timeBin
				ival, err := cfg.getInterval()
				if err != nil {
					stlogger.Println("Could not parse interval: ", err)
				}
				timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
				//Send the data if we have something to send
				if len(rDat) > 0 {
					out <- rDat
					rDat = make([]RawData, 0)
				}
				continue
			}

			var rd RawData
			err := json.Unmarshal(line, &rd)
			if err != nil {
				stlogger.Println("Failed in parsing json:", err)
				close(out)
				return
			}
			rd.time = timeBin

			rDat = append(rDat, rd)
		}
		//If there is any unsent data after in is closed we make sure to send it.
		if len(rDat) > 0 {
			out <- rDat
		}
		if VERBOSE {
			stlogger.Println("Finished parsing data from stdin...")
		}
		close(out)
	}()
	return out
}