summaryrefslogtreecommitdiff
path: root/sqlQueries.go
blob: 768342774c6466c4c30f72122ec7a3ee74195eda (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package main

import (
	"database/sql"
	_ "github.com/go-sql-driver/mysql"
	"log"
	"time"
)

const (
	TIMEZONE = "UTC"
)

//Retrieves limit rawdata entries that are older than tim
//limit <= 0 returns all entries that are older than tim
func fetchRawData(db *sql.DB, tim time.Time, limit int) (rDat []RawData, err error) {

	var prepSel *sql.Stmt
	if limit > 0 {
		prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ? LIMIT ?")
	} else {
		prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM raw_data WHERE stamp_inserted < ?")
	}
	if err != nil {
		log.Println("Failed to prepare select")
		return
	}

	var rows *sql.Rows
	if limit > 0 {
		rows, err = prepSel.Query(tim, limit)
	} else {
		rows, err = prepSel.Query(tim)
	}
	if err != nil {
		log.Println("Failed to query prepared selection")
		return
	}
	defer rows.Close()

	tx, err := db.Begin()
	if err != nil {
		log.Println("Failed to initialize transaction")
		return
	}

	prepUp, err := tx.Prepare("UPDATE raw_data SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
	if err != nil {
		log.Println("Failed to prepare update")
		return
	}

	loc, err := time.LoadLocation(TIMEZONE)
	for rows.Next() {
		var r RawData
		var tim []byte
		err = rows.Scan(&r.ipSrc, &r.ipDst, &r.asSrc, &r.asDst, &r.portSrc, &r.portDst, &r.packets, &r.pktLenDist, &tim)
		if err != nil {
			log.Println("Failed to scan result of query")
			return
		}
		r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
		if err != nil {
			log.Println("Failed to parse timestamp")
			return
		}

		_, err = prepUp.Exec(time.Now(), r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist, r.time)
		if err != nil {
			log.Println("Failed to query prepared update")
			tx.Rollback()
			return
		}

		rDat = append(rDat, r)
	}
	tx.Commit()
	return
}

//Removes the stamp_processed from every entry that started being proccesed before tim
func reprocess(db *sql.DB, tim time.Time) (err error) {
	stmt, err := db.Prepare("UPDATE raw_data SET stamp_processed = NULL WHERE stamp_processed < ?")
	if err != nil {
		return
	}
	_, err = stmt.Exec(tim)

	return
}

//Removes all entries in the database that started being processed before tim
func purgeAllProcessed(db *sql.DB, tim time.Time) (err error) {
	stmt, err := db.Prepare("DELETE FROM raw_data WHERE stamp_processed < ? ")
	if err != nil {
		return
	}
	_, err = stmt.Exec(tim)

	return
}

//Removes all rawdata that is in rDat from the database
func purgeRawData(tx *sql.Tx, rDat []RawData) (err error) {
	prepStmt, err := tx.Prepare("DELETE FROM raw_data WHERE ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_lenDist = ? AND stamp_processed IS NOT NULL LIMIT 1")
	if err != nil {
		return
	}

	for _, r := range rDat {
		_, err = prepStmt.Exec(r.ipSrc, r.ipDst, r.asSrc, r.asDst, r.portSrc, r.portDst, r.packets, r.pktLenDist)
		if err != nil {
			return
		}
	}
	return
}

func insertCleanData(tx *sql.Tx, cd *CleanData) error {
	prepStmt, err := tx.Prepare("INSERT INTO clean_data (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurances, volume, time) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ?) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
	if err != nil {
		log.Println("Failed to prepare statement")
		return err
	}

	_, err = prepStmt.Exec(cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.occurances, cd.volume, cd.time)
	if err != nil {
		log.Println("Failed to execute statement")
		return err
	}

	return nil
}

func insertASNIP(db *sql.DB, asn int, ipBlock string) error {
	prepCheck, err := db.Prepare("SELECT asn FROM asnip WHERE ip_block = ?")
	if err != nil {
		return err
	}
	defer prepCheck.Close()

	rows, err := prepCheck.Exec(ipBlock)
	if err != nil {
		return err
	}
	if rows != nil {
		return nil
	}

	prepIns, err := db.Prepare("INSERT INTO asnip VALUES ( ? , ? )")
	if err != nil {
		return err
	}
	defer prepIns.Close()

	_, err = prepIns.Exec(asn, ipBlock)
	if err != nil {
		return err
	}

	return nil
}

func removeASNIP(db *sql.DB, asn int, ipBlock string) error {
	prepStmt, err := db.Prepare("DELETE FROM anip WHERE asn = ? AND ip_block = ?")
	if err != nil {
		return err
	}
	defer prepStmt.Close()

	_, err = prepStmt.Exec(asn, ipBlock)
	if err != nil {
		return err
	}

	return nil
}