diff options
author | josef <josef.gson@gmail.com> | 2015-10-20 11:40:07 +0200 |
---|---|---|
committer | josef <josef.gson@gmail.com> | 2015-10-20 11:40:07 +0200 |
commit | 62035bd9ccb0efb21a234418127e20cc56cd7c0a (patch) | |
tree | 6e30fe54ee2d830fb344611cd134f67ebe958b31 /monitor/josef_mover.py | |
parent | b07897de7fe8be60f99b3570e7453ca8ce9b92d1 (diff) |
parallellizing inclusion checker
Diffstat (limited to 'monitor/josef_mover.py')
-rwxr-xr-x | monitor/josef_mover.py | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/monitor/josef_mover.py b/monitor/josef_mover.py index 5bf5a7a..63a155f 100755 --- a/monitor/josef_mover.py +++ b/monitor/josef_mover.py @@ -6,6 +6,7 @@ import time import datetime import os import json +import multiprocessing from precerttools import cleanprecert from monitor_conf import * @@ -123,7 +124,7 @@ def move_entry(first, last, source, dest): print "FAILED!\n" print s_log["name"] + "[" + str(first + i) + "] found in " + str(len(inclusions)) + " logs: ", inclusions -def check_inclusion_by_submission(first, last, source, dest): +def check_inclusion_by_submission(first, last, source, dest, logfile): # print entries for s_log in source: try: @@ -135,13 +136,10 @@ def check_inclusion_by_submission(first, last, source, dest): except: print "Failed to get entries from " + s_log["name"] - # print "\n\nSource: " + s_log["name"] + "\n" for i in range(len(entries)): - # for item in entries: item = entries[i] inclusions = [] for d_log in dests: - # print "Log: " + d_log["name"] try: entry = extract_original_entry(item) if entry[2]: @@ -162,17 +160,18 @@ def check_inclusion_by_submission(first, last, source, dest): if not is_new_timestamp(res["timestamp"]): inclusions.append(d_log["name"]) - # time.sleep(5) except KeyboardInterrupt: sys.exit() except: - # print "FAILED!\n" pass s = s_log["name"] + "[" + str(first + i) + "] found in " + str(len(inclusions)) + " logs: " + str(inclusions) print s - logfile = OUTPUT_DIR + s_log["name"] + "_overlap.log" + # logfile = OUTPUT_DIR + s_log["name"] + "_overlap.log" log(logfile, s) + + + def log(fn, string): logfile = fn s = time_str() + " " + string @@ -180,21 +179,54 @@ def log(fn, string): f.write(s + "\n") f.close() + if __name__ == "__main__": - source = [CTLOGS[7]] + source = [CTLOGS[3]] dests = CTLOGS - # source = ctlogs - # dests = ctlogs + process_count = 4 + processes = [] for tmp_log in source: sth = get_sth(tmp_log["url"]) - first = 1654 - last = int(sth["tree_size"]) - 1 - # print last + first = 0 + last = int(sth["tree_size"]) + last = 8 + print "last:",last + + # split into tasks + chunk_size = last/process_count + print "chunk_size:", chunk_size + for i in range(process_count): + if i + 1 == process_count: + tmp_start = i * chunk_size + tmp_last = last - 1 + else: + tmp_start = i * chunk_size + tmp_last = (i+1) * chunk_size - 1 + + # execute in parallell + filename = OUTPUT_DIR + tmp_log["name"] + "_overlap_" + str(i) + ".tmp" + p = multiprocessing.Process(target=check_inclusion_by_submission, \ + args=(tmp_start, tmp_last, [tmp_log], dests, filename)) + p.start() + processes.append(p) + + for p in processes: + p.join() + # print "Done!" + + # merge results + with open(OUTPUT_DIR + tmp_log["name"] + "_overlap.log", "wb") as outfile: + for i in range(process_count): + filename = OUTPUT_DIR + tmp_log["name"] + "_overlap_" + str(i) + ".tmp" + # print filename + with open(filename, "rb") as infile: + outfile.write(infile.read()) + infile.close() + os.remove(filename) + outfile.close() - check_inclusion_by_submission(first, last, [tmp_log], dests) - # check_inclusion_all(first,last,source, dests) |