package main import ( "bufio" "database/sql" _ "github.com/go-sql-driver/mysql" "io" "os" "testing" "time" ) func TestCleaningFromJSON(t *testing.T) { cfg := &Config{ Limit: 0, Interval: "5min", Epsilon: 0, DataSource: "json", DBConn: "", DBName: "test", CleanTable: "test_clean", DBUser: "flowcleaner", DBPass: "nil", } testProcessFromStdin(t, cfg) time.Sleep(15 * time.Second) controlDB(t, cfg) } func testProcessFromStdin(t *testing.T, cfg *Config) { stdin := make(chan []byte) go func() { for i := 0; i < 3; i++ { fakeStdin(t, stdin) if i < 2 { time.Sleep(3 * time.Minute) } } close(stdin) }() rDatChan := parseRawData(stdin, cfg) err := cleanFromStdin(rDatChan, cfg) if err != nil { t.Fatal(err) } } func fakeStdin(t *testing.T, wr chan<- []byte) { file, err := os.Open("testdata/jsoninput") if err != nil { t.Fatal(err) } defer file.Close() reader := bufio.NewReader(file) for { line, err := reader.ReadBytes('\n') if err == io.EOF { break } else if err != nil { t.Fatal(err) } else { wr <- line } } } func controlDB(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() rows, err := db.Query("SELECT * FROM " + cfg.CleanTable) if err != nil { t.Fatal("Failed to select cleaned data:", err) } defer rows.Close() numRows := 0 for rows.Next() { numRows++ } if numRows != 14 { t.Fatal("Wrong number of rows found in db") } check1, err := db.Query("SELECT occurences, volume FROM " + cfg.CleanTable + " ORDER BY occurences DESC") if err != nil { t.Fatal("Failed to select occurences and volume:", err) } defer check1.Close() var oc int var vol string check1.Next() check1.Scan(&oc, &vol) if oc != 36 && vol == "0-199" { t.Fatal("Failed oc 36, vol 0-199. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 20 && vol == "200-299" { t.Fatal("Failed oc 20, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 18 && vol == "200-299" { t.Fatal("Failed oc 18, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 14 && vol == "200-299" { t.Fatal("Failed oc 14, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 12 && vol == "200-299" { t.Fatal("Failed oc 12, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 10 && vol == "200-299" { t.Fatal("Failed oc 10, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 8 && vol == "200-299" { t.Fatal("Failed oc 8, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 7 && vol == "200-299" { t.Fatal("Failed oc 7, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 6 && vol == "200-299" { t.Fatal("Failed oc 6, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 4 && vol == "200-299" { t.Fatal("Failed oc 4, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 2 && vol == "200-299" { t.Fatal("Failed oc 2, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 1 && vol == "200-299" { t.Fatal("Failed oc 1, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 0 && vol == "200-299" { t.Fatal("Failed oc 0, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 0 && vol == "200-299" { t.Fatal("Failed oc 0, vol 200-299. Actually was:", oc, vol) } }