1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
package main
import (
"database/sql"
_ "github.com/Go-SQL-Driver/MySQL"
"log"
"os"
"time"
)
var (
flogger *log.Logger
VERBOSE bool
)
func init() {
flogger = log.New(os.Stdout, "[ Flow-cleaner ]", log.LstdFlags)
}
func main() {
flogger.Println("Now running Flow-cleaner")
cfg, err := readConfig()
if err != nil {
flogger.Println("Could not read config")
return
}
VERBOSE = cfg.Verbose
switch cfg.DataSource {
case "stdin":
processFromStdin(cfg)
case "mysql":
processFromDB(cfg)
default:
flogger.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
}
flogger.Println("Finished processing, now exiting")
}
func processFromStdin(cfg *Config) {
if VERBOSE {
flogger.Println("Starting to process from stdin...")
}
input := readFromStdin()
rDatChan := parseRawData(input, cfg)
err := cleanFromStdin(rDatChan, cfg)
if err != nil {
flogger.Println("Failed to clean data:", err)
}
if VERBOSE {
flogger.Println("Finished processing from stdin!")
}
}
func processFromDB(cfg *Config) {
if VERBOSE {
flogger.Println("Starting to process from db...")
}
starttime := time.Now()
numOfRowsNotCleaned, err := cleanFromDB(cfg)
if err != nil {
flogger.Println(err)
flogger.Println("Exiting...")
return
}
if VERBOSE {
flogger.Println("Finished processing from db!")
}
// If either all rows are processed or if there is no limit for the processing
// we can safely add noise to the cleaned data
if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
if VERBOSE {
flogger.Println("Adding differential privacy noise to processed data...")
}
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
flogger.Println("Failed to connect to db:", err)
return
}
defer db.Close()
ival, err := cfg.getInterval()
if err != nil {
flogger.Println("erronous interval in conf prevents the privatization of data:", err)
return
}
err = privatizeCleaned(db, starttime.Add(-2*ival), cfg)
if err != nil {
flogger.Println("Failed to privatize data:", err)
}
if VERBOSE {
flogger.Println("Done!")
}
}
}
|