summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-03-26 11:09:48 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-03-26 11:09:48 +0100
commit986a05c215723b9227db20aa11a62e4d924281cb (patch)
tree6401602986bb32827b9defe43faa5e5e93c527f1
parent54f6128240711a93551d85b000766a9b57cc96cf (diff)
added verbose option to enable/disable extra output from the program
-rw-r--r--cleaner.go16
-rw-r--r--config.go2
-rw-r--r--config.json4
-rw-r--r--flow-cleaner.go27
-rw-r--r--flow-cleaner_test.go30
-rw-r--r--stdin.go16
-rw-r--r--whois.go8
7 files changed, 73 insertions, 30 deletions
diff --git a/cleaner.go b/cleaner.go
index 5e0e7d0..1342798 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -19,7 +19,9 @@ func init() {
}
func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
- cllogger.Println("Starting to clean from db...")
+ if VERBOSE {
+ cllogger.Println("Starting to clean from db...")
+ }
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
@@ -83,12 +85,16 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
}
tx.Commit()
- cllogger.Println("Finished cleaning from db!")
+ if VERBOSE {
+ cllogger.Println("Finished cleaning from db!")
+ }
return
}
func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error {
- cllogger.Println("Starting to clean from stdin...")
+ if VERBOSE {
+ cllogger.Println("Starting to clean from stdin...")
+ }
for rDat := range rDatChan {
cDat, err := clean(rDat, cfg)
if err != nil {
@@ -103,7 +109,9 @@ func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error {
continue
}
}
- cllogger.Println("Finished cleaning from stdin!")
+ if VERBOSE {
+ cllogger.Println("Finished cleaning from stdin!")
+ }
return nil
}
diff --git a/config.go b/config.go
index 8fdfc92..97bc87e 100644
--- a/config.go
+++ b/config.go
@@ -23,6 +23,8 @@ type Config struct {
Interval string `json:interval`
Epsilon float64 `json:epsilon`
+ Verbose bool `json:verbose`
+
DataSource string `json:dataSource`
DBConn string `json:DBConn`
diff --git a/config.json b/config.json
index c321869..c2f8b00 100644
--- a/config.json
+++ b/config.json
@@ -8,8 +8,10 @@
"comment Epsilon": "Epsilon is the epsilon value for differential privacy. epsilon < 1 high privacy, 10 < epsilon low privacy. If epsilon is set to 0, differential privacy will not be used.",
"epsilon": 0,
+ "comment Verbose": "Enables extra output. Primarily used for debugging"
+ "verbose": true,
- "comment dataSource": "dataSource is from where the program should read and process data. Currently only mydql and stdin is supported.",
+ "comment dataSource": "dataSource is from where the program should read and process data. Currently only mysql and stdin is supported.",
"dataSource": "stdin",
"DBConn": "",
diff --git a/flow-cleaner.go b/flow-cleaner.go
index 9e038cf..eaa5559 100644
--- a/flow-cleaner.go
+++ b/flow-cleaner.go
@@ -10,6 +10,7 @@ import (
var (
flogger *log.Logger
+ VERBOSE bool
)
func init() {
@@ -17,7 +18,9 @@ func init() {
}
func main() {
+ flogger.Println("Now running Flow-cleaner")
cfg, err := readConfig()
+ VERBOSE = cfg.Verbose
if err != nil {
flogger.Println("Could not read config")
return
@@ -36,18 +39,24 @@ func main() {
}
func processFromStdin(cfg *Config) {
- flogger.Println("Starting to process from stdin...")
+ if VERBOSE {
+ flogger.Println("Starting to process from stdin...")
+ }
input := readFromStdin()
rDatChan := parseRawData(input, cfg)
err := cleanFromStdin(rDatChan, cfg)
if err != nil {
flogger.Println("Failed to clean data:", err)
}
- flogger.Println("Finished processing from stdin!")
+ if VERBOSE {
+ flogger.Println("Finished processing from stdin!")
+ }
}
func processFromDB(cfg *Config) {
- flogger.Print("Starting to process from db...")
+ if VERBOSE {
+ flogger.Println("Starting to process from db...")
+ }
starttime := time.Now()
numOfRowsNotCleaned, err := cleanFromDB(cfg)
if err != nil {
@@ -55,12 +64,16 @@ func processFromDB(cfg *Config) {
flogger.Println("Exiting...")
return
}
- flogger.Println("Finished processing from db!")
+ if VERBOSE {
+ flogger.Println("Finished processing from db!")
+ }
// If either all rows are processed or if there is no limit for the processing
// we can safely add noise to the cleaned data
if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
- flogger.Println("Adding differential privacy noise to processed data...")
+ if VERBOSE {
+ flogger.Println("Adding differential privacy noise to processed data...")
+ }
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
flogger.Println("Failed to connect to db:", err)
@@ -78,6 +91,8 @@ func processFromDB(cfg *Config) {
if err != nil {
flogger.Println("Failed to privatize data:", err)
}
- flogger.Println("Done!")
+ if VERBOSE {
+ flogger.Println("Done!")
+ }
}
}
diff --git a/flow-cleaner_test.go b/flow-cleaner_test.go
index 9be5d69..7c9673d 100644
--- a/flow-cleaner_test.go
+++ b/flow-cleaner_test.go
@@ -13,12 +13,16 @@ import (
"time"
)
-func TestCleaningFromJSON(t *testing.T) {
+func init() {
+ VERBOSE = true
+}
+
+func TestCleaningFromDB(t *testing.T) {
cfg := &Config{
Limit: 0,
Interval: "5min",
Epsilon: 0,
- DataSource: "json",
+ DataSource: "mysql",
DBConn: "",
DBName: "test",
@@ -28,21 +32,20 @@ func TestCleaningFromJSON(t *testing.T) {
DBPass: "nil",
}
- fmt.Println("== Testing to process from stdin ==")
+ fmt.Println("== Testing to process from DB ==")
prepareDB(t, cfg)
- testProcessFromStdin(t, cfg)
- time.Sleep(15 * time.Second)
+ processFromDB(cfg)
controlCleanDB(t, cfg)
- controlRawDBStdin(t, cfg)
- fmt.Println("== Finished testing to process from stdin ==")
+ controlRawDBMySQL(t, cfg)
+ fmt.Println("== Finished testing to process from DB ==")
}
-func TestCleaningFromDB(t *testing.T) {
+func TestCleaningFromJSON(t *testing.T) {
cfg := &Config{
Limit: 0,
Interval: "5min",
Epsilon: 0,
- DataSource: "mysql",
+ DataSource: "json",
DBConn: "",
DBName: "test",
@@ -52,12 +55,13 @@ func TestCleaningFromDB(t *testing.T) {
DBPass: "nil",
}
- fmt.Println("== Testing to process from DB ==")
+ fmt.Println("== Testing to process from stdin ==")
prepareDB(t, cfg)
- processFromDB(cfg)
+ testProcessFromStdin(t, cfg)
+ time.Sleep(15 * time.Second)
controlCleanDB(t, cfg)
- controlRawDBMySQL(t, cfg)
- fmt.Println("== Finished testing to process from DB ==")
+ controlRawDBStdin(t, cfg)
+ fmt.Println("== Finished testing to process from stdin ==")
}
func prepareDB(t *testing.T, cfg *Config) {
diff --git a/stdin.go b/stdin.go
index dd89234..50976b7 100644
--- a/stdin.go
+++ b/stdin.go
@@ -22,12 +22,16 @@ func init() {
func readFromStdin() <-chan []byte {
out := make(chan []byte)
go func() {
- stlogger.Println("Now listening on stdin...")
+ if VERBOSE {
+ stlogger.Println("Now listening on stdin...")
+ }
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
out <- []byte(scanner.Text())
}
- stlogger.Println("Finished listening to stdin!")
+ if VERBOSE {
+ stlogger.Println("Finished listening to stdin!")
+ }
close(out)
}()
return out
@@ -40,7 +44,9 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
stlogger.Println("Could not parse interval: ", err)
}
timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
- stlogger.Println("Now parsing data from stdin...")
+ if VERBOSE {
+ stlogger.Println("Now parsing data from stdin...")
+ }
go func() {
rDat := make([]RawData, 0)
for line := range in {
@@ -76,7 +82,9 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
if len(rDat) > 0 {
out <- rDat
}
- stlogger.Println("Finished parsing data from stdin...")
+ if VERBOSE {
+ stlogger.Println("Finished parsing data from stdin...")
+ }
close(out)
}()
return out
diff --git a/whois.go b/whois.go
index 486d978..7e0a50e 100644
--- a/whois.go
+++ b/whois.go
@@ -35,7 +35,9 @@ func main() {
*/
func findIPBlock(domains ...string) (pairs map[string]string, err error) {
- wlogger.Println("Querying for ip-blocks...")
+ if VERBOSE {
+ wlogger.Println("Querying for ip-blocks...")
+ }
if len(domains) == 0 {
return
}
@@ -61,7 +63,9 @@ func findIPBlock(domains ...string) (pairs map[string]string, err error) {
ipb := strings.TrimSpace(content[2])
pairs[ipaddr] = ipb
}
- wlogger.Println("ip-blocks returned")
+ if VERBOSE {
+ wlogger.Println("ip-blocks returned")
+ }
return
}