1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
package main
import (
"bufio"
"encoding/json"
"log"
"os"
"strings"
"time"
)
var (
stlogger *log.Logger
)
func init() {
stlogger = log.New(os.Stdout, "[ stdin ]", log.LstdFlags)
}
//Starts a process that reads from stdin and
//puts the strings read on the returned channel
func readFromStdin() <-chan []byte {
out := make(chan []byte)
go func() {
if VERBOSE {
stlogger.Println("Now listening on stdin...")
}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
out <- []byte(scanner.Text())
}
if VERBOSE {
stlogger.Println("Finished listening to stdin!")
}
close(out)
}()
return out
}
func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
out := make(chan []RawData)
ival, err := cfg.getInterval()
if err != nil {
stlogger.Println("Could not parse interval: ", err)
}
timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
if VERBOSE {
stlogger.Println("Now parsing data from stdin...")
}
go func() {
rDat := make([]RawData, 0)
for line := range in {
if len(string(line)) > 0 && !strings.HasPrefix(string(line), "{") {
stlogger.Println("Got message:", strings.TrimSpace(string(line)))
//This should be a break in the output from pmacct
//so we deploy our collected data and set a new timeBin
ival, err := cfg.getInterval()
if err != nil {
stlogger.Println("Could not parse interval: ", err)
}
timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
//Send the data if we have something to send
if len(rDat) > 0 {
out <- rDat
rDat = make([]RawData, 0)
}
continue
}
var rd RawData
err := json.Unmarshal(line, &rd)
if err != nil {
stlogger.Println("Failed in parsing json:", err)
close(out)
return
}
rd.time = timeBin
rDat = append(rDat, rd)
}
//If there is any unsent data after in is closed we make sure to send it.
if len(rDat) > 0 {
out <- rDat
}
if VERBOSE {
stlogger.Println("Finished parsing data from stdin...")
}
close(out)
}()
return out
}
|