package main import ( "bufio" "database/sql" "fmt" _ "github.com/go-sql-driver/mysql" "io" "io/ioutil" "os" "strings" "testing" "time" ) func TestCleaningFromJSON(t *testing.T) { cfg := &Config{ Limit: 0, Interval: "5min", Epsilon: 0, DataSource: "json", DBConn: "", DBName: "test", RawTable: "test_raw", CleanTable: "test_clean", DBUser: "flowcleaner", DBPass: "nil", } fmt.Println("== Testing to process from stdin ==") prepareDB(t, cfg) testProcessFromStdin(t, cfg) time.Sleep(15 * time.Second) controlCleanDB(t, cfg) controlRawDBStdin(t, cfg) fmt.Println("== Finished testing to process from stdin ==") } func TestCleaningFromDB(t *testing.T) { cfg := &Config{ Limit: 0, Interval: "5min", Epsilon: 0, DataSource: "mysql", DBConn: "", DBName: "test", RawTable: "test_raw", CleanTable: "test_clean", DBUser: "flowcleaner", DBPass: "nil", } fmt.Println("== Testing to process from DB ==") prepareDB(t, cfg) processFromDB(cfg) controlCleanDB(t, cfg) controlRawDBMySQL(t, cfg) fmt.Println("== Finished testing to process from DB ==") } func prepareDB(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() file, err := ioutil.ReadFile("testdata/dbTestSetup.mysql") if err != nil { t.Fatal(err) } for _, query := range strings.Split(string(file), ";") { query = strings.TrimSpace(query) if len(query) > 0 { _, err = db.Exec(query + ";") if err != nil { fmt.Println("QUERY:", query) t.Fatal(err) } } } } func testProcessFromStdin(t *testing.T, cfg *Config) { stdin := make(chan []byte) go func() { min := time.Now().Minute() binStart := ((min + 5) / 5) * 5 // Set it to the next interval wait := (binStart - min) % 5 dur, err := time.ParseDuration(fmt.Sprintf("%dm", wait)) if err != nil { t.Fatal(err) } fmt.Println("== Waiting for alignment of timebin before testing ==") fmt.Println(fmt.Sprintf("== Will start in %d minute(s) ==", wait)) time.Sleep(dur) fmt.Println("== Now starting to test ==") for i := 0; i < 3; i++ { fakeStdin(t, stdin) if i < 1 { fmt.Println("== Now waiting for next time bin ==") time.Sleep(5 * time.Minute) fmt.Println("== Now continuing testing ==") } } close(stdin) }() rDatChan := parseRawData(stdin, cfg) err := cleanFromStdin(rDatChan, cfg) if err != nil { t.Fatal(err) } } func fakeStdin(t *testing.T, wr chan<- []byte) { file, err := os.Open("testdata/jsoninput") if err != nil { t.Fatal(err) } defer file.Close() reader := bufio.NewReader(file) for { line, err := reader.ReadBytes('\n') if err == io.EOF { break } else if err != nil { t.Fatal(err) } else { wr <- line } } } func controlRawDBStdin(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() rows, err := db.Query("SELECT * FROM " + cfg.RawTable) if err != nil { t.Fatal("Failed to select cleaned data:", err) } defer rows.Close() numRows := 0 for rows.Next() { numRows++ } if numRows != 33 { t.Fatal("Wrong number of rows found in db, should be 33 found:", numRows) } } func controlRawDBMySQL(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() rows, err := db.Query("SELECT * FROM " + cfg.RawTable) if err != nil { t.Fatal("Failed to select cleaned data:", err) } defer rows.Close() numRows := 0 for rows.Next() { numRows++ } if numRows != 3 { t.Fatal("Wrong number of rows found in db, should be 3 found:", numRows) } } func controlCleanDB(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() rows, err := db.Query("SELECT * FROM " + cfg.CleanTable) if err != nil { t.Fatal("Failed to select cleaned data:", err) } defer rows.Close() numRows := 0 for rows.Next() { numRows++ } if numRows != 14 { t.Fatal("Wrong number of rows found in db, should be 14 found:", numRows) } check1, err := db.Query("SELECT occurences, volume FROM " + cfg.CleanTable + " ORDER BY occurences DESC") if err != nil { t.Fatal("Failed to select occurences and volume:", err) } defer check1.Close() var oc int var vol string check1.Next() check1.Scan(&oc, &vol) if oc != 36 && vol == "0-199" { t.Fatal("Failed oc 36, vol 0-199. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 20 && vol == "200-299" { t.Fatal("Failed oc 20, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 18 && vol == "200-299" { t.Fatal("Failed oc 18, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 14 && vol == "200-299" { t.Fatal("Failed oc 14, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 12 && vol == "200-299" { t.Fatal("Failed oc 12, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 10 && vol == "200-299" { t.Fatal("Failed oc 10, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 8 && vol == "200-299" { t.Fatal("Failed oc 8, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 7 && vol == "200-299" { t.Fatal("Failed oc 7, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 6 && vol == "200-299" { t.Fatal("Failed oc 6, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 4 && vol == "200-299" { t.Fatal("Failed oc 4, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 2 && vol == "200-299" { t.Fatal("Failed oc 2, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 1 && vol == "200-299" { t.Fatal("Failed oc 1, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 0 && vol == "200-299" { t.Fatal("Failed oc 0, vol 200-299. Actually was:", oc, vol) } check1.Next() check1.Scan(&oc, &vol) if oc != 0 && vol == "200-299" { t.Fatal("Failed oc 0, vol 200-299. Actually was:", oc, vol) } }