package main import ( "bufio" "database/sql" "encoding/json" _ "github.com/go-sql-driver/mysql" "log" "os" "strings" "time" //"strings" ) func main() { log.Println("Reading config...") cfg, err := readConfig() if err != nil { log.Println("Could not read config") return } log.Println("Done!") input := readFromStdin() rDatChan := parseRawData(input, cfg) cleanFromStdin(rDatChan, cfg) log.Print("Cleaning data...") starttime := time.Now() numOfRowsNotCleaned, err := cleanFromDB(cfg) if err != nil { log.Println(err) log.Println("Exiting...") return } log.Println("Done!") // If either all rows are processed or if there is no limit for the processing // we can safely add noise to the cleaned data if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 { log.Println("Adding differential privacy noise to processed data...") db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { log.Println("Failed to connect to db:", err) return } defer db.Close() ival, err := cfg.getInterval() if err != nil { log.Println("erronous interval in conf prevents the privatization of data:", err) return } err = privatizeCleaned(db, starttime.Add(-2*ival), cfg) if err != nil { log.Println("Failed to privatize data:", err) } log.Println("Done!") } log.Println("Finished processing, now exiting") } //Starts a process that reads from stdin and //puts the strings read on the returned channel func readFromStdin() <-chan []byte { out := make(chan []byte) go func() { scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { out <- scanner.Bytes() } close(out) }() return out } func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { out := make(chan []RawData) ival, err := cfg.getInterval() if err != nil { log.Println("Could not parse interval: ", err) } timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin go func() { rDat := make([]RawData, 0) for line := range in { if !strings.HasPrefix(string(line), "{") { //This should be a break in the output from pmacct //so we deploy our collected data and set a new timeBin ival, err := cfg.getInterval() if err != nil { log.Println("Could not parse interval: ", err) } timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin //Send the data if we have something to send if len(rDat) > 0 { out <- rDat rDat = make([]RawData, 0) } continue } var rd RawData err := json.Unmarshal(line, rd) if err != nil { log.Println("Failed in parsing json:", err) close(out) return } rd.time = timeBin rDat = append(rDat, rd) } //If there is any unsent data after in is closed we make sure to send it. if len(rDat) > 0 { out <- rDat } close(out) }() return out }