diff options
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-x | tools/merge_fetch.py | 147 |
1 files changed, 74 insertions, 73 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index 7e0dfd8..633ee67 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -104,59 +104,46 @@ def merge_fetch_sequenced(args, config, localconfig): def merge_fetch_worker(args, localconfig, storagenode, pipe): paths = localconfig["paths"] - mergedb = paths["mergedb"] - chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) + + # NOTE: We should probably verifycert.communicate(struct.pack("I",0)) + # to ask the verifycert process to quit nicely. + verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) to_fetch = set() timeout = max(3, args.mergeinterval / 10) while True: if pipe.poll(timeout): - msg = pipe.recv().split() + msg = pipe.recv() if len(msg) < 2: continue cmd = msg[0] ehash = msg[1] if cmd == 'FETCH': - to_fetch.add(hexdecode(ehash)) - else: - logging.warning("%s: unknown command from parent: %s", - storagenode["name"], msg) + to_fetch.add(ehash) - if len(to_fetch) > 0: + if to_fetch: logging.info("%s: fetching %d entries", storagenode["name"], len(to_fetch)) - # TODO: Consider running the verifycert process longer. - verifycert = subprocess.Popen( - [paths["verifycert_bin"], paths["known_roots"]], - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - # Chunking for letting other workers take the chainsdb lock. - for chunk in chunks(list(to_fetch), 100): - chainsdb.lock_ex() - with requests.sessions.Session() as session: - entries = get_entries(storagenode["name"], - "https://%s/" % storagenode["address"], - own_key, paths, chunk, session=session) - for ehash in chunk: - entry = entries[ehash] - verify_entry(verifycert, entry, ehash) - chainsdb.add(ehash, entry) - chainsdb.commit() - chainsdb.release_lock() - for ehash in chunk: - pipe.send('FETCHED %s' % hexencode(ehash)) - to_fetch.remove(ehash) - verifycert.communicate(struct.pack("I", 0)) + fetchlist = list(to_fetch) + with requests.sessions.Session() as session: + entries = get_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths, fetchlist, + session=session) + for ehash in fetchlist: + entry = entries[ehash] + verify_entry(verifycert, entry, ehash) + pipe.send(('FETCHED', ehash, entry)) + to_fetch.remove(ehash) new_entries = get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], own_key, paths) - if len(new_entries) > 0: - logging.info("%s: got %d new entries", storagenode["name"], - len(new_entries)) - for ehash in new_entries: - pipe.send('NEWENTRY %s' % hexencode(ehash)) + for ehash in new_entries: + pipe.send(('NEWENTRY', ehash)) def term(signal, arg): terminate_child_procs() @@ -176,6 +163,7 @@ def merge_fetch_parallel(args, config, localconfig): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] + chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" @@ -187,13 +175,15 @@ def merge_fetch_parallel(args, config, localconfig): name = storagenode['name'] procs[name] = newworker(name, [args, localconfig, storagenode]) - logorder = get_logorder(logorderfile) # List of entries in log. - entries_in_log = set(logorder) # Set of entries in log. - entries_to_fetch = set() # Set of entries to fetch. - fetch = {} # Dict with entries to fetch. + currentsizefilecontent = "" + logorder = get_logorder(logorderfile) # List of hashes in log, hexencoded. + entries_in_log = set(logorder) # Set of hashes in log, binary. + fetch_set = set() + fetch_dict = {} while procs: - assert(not entries_to_fetch) - # Poll worker processes. + # Poll worker processes and handle messages. + assert(not fetch_set) + newentry = [] for name, pipe, p in procs.values(): if not p.is_alive(): logging.warning("%s is gone, restarting", name) @@ -201,52 +191,59 @@ def merge_fetch_parallel(args, config, localconfig): storagenodes[name]]) continue logging.info("polling %s", name) - if pipe.poll(1): - msg = pipe.recv().split() + while pipe.poll(0): + msg = pipe.recv() if len(msg) < 2: - logging.warning("unknown command from %s: %s", name, msg) continue cmd = msg[0] ehash = msg[1] if cmd == 'NEWENTRY': - logging.info("NEWENTRY at %s: %s", name, ehash) - entries_to_fetch.add(ehash) - logging.debug("entries_to_fetch: %s", entries_to_fetch) + logging.info("NEWENTRY at %s: %s", name, hexencode(ehash)) + fetch_set.add(ehash) elif cmd == 'FETCHED': - logging.info("FETCHED from %s: %s", name, ehash) - logorder.append(ehash) - add_to_logorder(logorderfile, hexdecode(ehash)) - fsync_logorder(logorderfile) + if len(msg) != 3: + continue + entry = msg[2] + logging.info("FETCHED from %s: %s", name, hexencode(ehash)) + chainsdb.add(ehash, entry) # Commit later. + ehash_enc = hexencode(ehash) + newentry.append(ehash_enc) # Writing to logorderfile later. + logorder.append(ehash_enc) entries_in_log.add(ehash) - if ehash in entries_to_fetch: - entries_to_fetch.remove(ehash) - del fetch[ehash] - else: - logging.warning("unknown command from %s: %s", name, msg) + if ehash in fetch_set: + fetch_set.remove(ehash) + del fetch_dict[ehash] + chainsdb.commit() + for ehash_enc in newentry: + add_to_logorder(logorderfile, ehash) + fsync_logorder(logorderfile) # Ask workers to fetch entries. logging.debug("nof entries to fetch including entries in log: %d", - len(entries_to_fetch)) - entries_to_fetch -= entries_in_log - logging.info("entries to fetch: %d", len(entries_to_fetch)) - # Add entries in entries_to_fetch as keys in dictionary fetch, - # values being a list of storage nodes, in randomised order. - for e in entries_to_fetch: - if not e in fetch: + len(fetch_set)) + fetch_set -= entries_in_log + logging.info("entries to fetch: %d", len(fetch_set)) + # Add each entry in fetch_set to fetch_dict, key bing the hash + # and value being a list of storage nodes, in randomised + # order. + for e in fetch_set: + if not e in fetch_dict: + save = procs.values() l = procs.values() rand.shuffle(l) - fetch[e] = l + assert save == procs.values() + fetch_dict[e] = l # For each entry to fetch, treat its list of nodes as a # circular list and ask the one in the front to fetch the # entry. - while entries_to_fetch: - ehash = entries_to_fetch.pop() - nodes = fetch[ehash] + while fetch_set: + ehash = fetch_set.pop() + nodes = fetch_dict[ehash] node = nodes.pop(0) - fetch[ehash] = nodes.append(node) + fetch_dict[ehash] = nodes.append(node) name, pipe, p = node - logging.info("asking %s to FETCH %s", name, ehash) - pipe.send("FETCH %s" % ehash) + logging.info("asking %s to fetch %s", name, hexencode(ehash)) + pipe.send(('FETCH', ehash)) # Update the 'fetched' file. logsize = len(logorder) @@ -254,10 +251,14 @@ def merge_fetch_parallel(args, config, localconfig): last_hash = '' else: last_hash = logorder[logsize - 1] - logging.info("updating 'fetched' file: %d %s", logsize-1, last_hash) - currentsize = {"index": logsize - 1, "hash": last_hash} - logging.debug("writing to %s: %s", currentsizefile, currentsize) - write_file(currentsizefile, currentsize) + newcontent = {"index": logsize - 1, "hash": last_hash} + if newcontent != currentsizefilecontent: + logging.info("updating 'fetched' file: %d %s", logsize - 1, last_hash) + currentsizefilecontent = newcontent + write_file(currentsizefile, currentsizefilecontent) + + # Wait some. + sleep(1) return 0 |