summaryrefslogtreecommitdiff
path: root/monitor/josef_mover.py
diff options
context:
space:
mode:
authorjosef <josef.gson@gmail.com>2015-10-20 11:40:07 +0200
committerjosef <josef.gson@gmail.com>2015-10-20 11:40:07 +0200
commit62035bd9ccb0efb21a234418127e20cc56cd7c0a (patch)
tree6e30fe54ee2d830fb344611cd134f67ebe958b31 /monitor/josef_mover.py
parentb07897de7fe8be60f99b3570e7453ca8ce9b92d1 (diff)
parallellizing inclusion checker
Diffstat (limited to 'monitor/josef_mover.py')
-rwxr-xr-xmonitor/josef_mover.py62
1 files changed, 47 insertions, 15 deletions
diff --git a/monitor/josef_mover.py b/monitor/josef_mover.py
index 5bf5a7a..63a155f 100755
--- a/monitor/josef_mover.py
+++ b/monitor/josef_mover.py
@@ -6,6 +6,7 @@ import time
import datetime
import os
import json
+import multiprocessing
from precerttools import cleanprecert
from monitor_conf import *
@@ -123,7 +124,7 @@ def move_entry(first, last, source, dest):
print "FAILED!\n"
print s_log["name"] + "[" + str(first + i) + "] found in " + str(len(inclusions)) + " logs: ", inclusions
-def check_inclusion_by_submission(first, last, source, dest):
+def check_inclusion_by_submission(first, last, source, dest, logfile):
# print entries
for s_log in source:
try:
@@ -135,13 +136,10 @@ def check_inclusion_by_submission(first, last, source, dest):
except:
print "Failed to get entries from " + s_log["name"]
- # print "\n\nSource: " + s_log["name"] + "\n"
for i in range(len(entries)):
- # for item in entries:
item = entries[i]
inclusions = []
for d_log in dests:
- # print "Log: " + d_log["name"]
try:
entry = extract_original_entry(item)
if entry[2]:
@@ -162,17 +160,18 @@ def check_inclusion_by_submission(first, last, source, dest):
if not is_new_timestamp(res["timestamp"]):
inclusions.append(d_log["name"])
- # time.sleep(5)
except KeyboardInterrupt:
sys.exit()
except:
- # print "FAILED!\n"
pass
s = s_log["name"] + "[" + str(first + i) + "] found in " + str(len(inclusions)) + " logs: " + str(inclusions)
print s
- logfile = OUTPUT_DIR + s_log["name"] + "_overlap.log"
+ # logfile = OUTPUT_DIR + s_log["name"] + "_overlap.log"
log(logfile, s)
+
+
+
def log(fn, string):
logfile = fn
s = time_str() + " " + string
@@ -180,21 +179,54 @@ def log(fn, string):
f.write(s + "\n")
f.close()
+
if __name__ == "__main__":
- source = [CTLOGS[7]]
+ source = [CTLOGS[3]]
dests = CTLOGS
- # source = ctlogs
- # dests = ctlogs
+ process_count = 4
+ processes = []
for tmp_log in source:
sth = get_sth(tmp_log["url"])
- first = 1654
- last = int(sth["tree_size"]) - 1
- # print last
+ first = 0
+ last = int(sth["tree_size"])
+ last = 8
+ print "last:",last
+
+ # split into tasks
+ chunk_size = last/process_count
+ print "chunk_size:", chunk_size
+ for i in range(process_count):
+ if i + 1 == process_count:
+ tmp_start = i * chunk_size
+ tmp_last = last - 1
+ else:
+ tmp_start = i * chunk_size
+ tmp_last = (i+1) * chunk_size - 1
+
+ # execute in parallell
+ filename = OUTPUT_DIR + tmp_log["name"] + "_overlap_" + str(i) + ".tmp"
+ p = multiprocessing.Process(target=check_inclusion_by_submission, \
+ args=(tmp_start, tmp_last, [tmp_log], dests, filename))
+ p.start()
+ processes.append(p)
+
+ for p in processes:
+ p.join()
+ # print "Done!"
+
+ # merge results
+ with open(OUTPUT_DIR + tmp_log["name"] + "_overlap.log", "wb") as outfile:
+ for i in range(process_count):
+ filename = OUTPUT_DIR + tmp_log["name"] + "_overlap_" + str(i) + ".tmp"
+ # print filename
+ with open(filename, "rb") as infile:
+ outfile.write(infile.read())
+ infile.close()
+ os.remove(filename)
+ outfile.close()
- check_inclusion_by_submission(first, last, [tmp_log], dests)
- # check_inclusion_all(first,last,source, dests)