diff options
-rwxr-xr-x | tools/merge_fetch.py | 80 |
1 files changed, 40 insertions, 40 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index a31143e..d5a514b 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -16,6 +16,7 @@ import logging from time import sleep from multiprocessing import Process, Pipe from random import Random +from itertools import cycle from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ hexencode, hexdecode, parse_args, perm, flock_ex_or_fail, Status, \ @@ -107,15 +108,18 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe): own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) + name = storagenode["name"] + address = storagenode["address"] + url = "https://%s/" % address # NOTE: We should probably verifycert.communicate(struct.pack("I",0)) # to ask the verifycert process to quit nicely. verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) to_fetch = set() - timeout = max(3, args.mergeinterval / 10) while True: - if pipe.poll(timeout): + ## Read all messages from parent. + while pipe.poll(0): msg = pipe.recv() if len(msg) < 2: continue @@ -124,26 +128,26 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe): if cmd == 'FETCH': to_fetch.add(ehash) + ## Fetch entries from node. if to_fetch: - logging.info("%s: fetching %d entries", storagenode["name"], - len(to_fetch)) + logging.info("%s: fetching %d entries", name, len(to_fetch)) with requests.sessions.Session() as session: for chunk in chunks(list(to_fetch), 100): - entries = get_entries(storagenode["name"], - "https://%s/" % storagenode["address"], - own_key, paths, chunk, session=session) + entries = get_entries(name, url, own_key, paths, chunk, + session=session) for ehash in chunk: entry = entries[ehash] verify_entry(verifycert, entry, ehash) pipe.send(('FETCHED', ehash, entry)) to_fetch.remove(ehash) - new_entries = get_new_entries(storagenode["name"], - "https://%s/" % storagenode["address"], - own_key, paths) - for ehash in new_entries: + ## Ask node for more entries. + for ehash in get_new_entries(name, url, own_key, paths): pipe.send(('NEWENTRY', ehash)) + ## Wait some. + sleep(max(3, args.mergeinterval / 10)) + def term(signal, arg): terminate_child_procs() sys.exit(1) @@ -180,12 +184,15 @@ def merge_fetch_parallel(args, config, localconfig): entries_in_log = set(logorder) # Hashes are binary. # Entries to fetch, kept in both a set and a dict. The dict is # keyed on hashes (binary) and contains randomised lists of nodes - # to fetch from. + # to fetch from. Note that the dict keeps entries until they're + # successfully fetched while the set is temporary within one + # iteration of the loop. fetch_set = set() fetch_dict = {} + while procs: - # Poll worker processes and handle messages. - assert(not fetch_set) + ## Poll worker processes and handle messages. + assert not fetch_set newentry = [] for name, pipe, p in procs.values(): if not p.is_alive(): @@ -202,50 +209,43 @@ def merge_fetch_parallel(args, config, localconfig): ehash = msg[1] if cmd == 'NEWENTRY': logging.info("NEWENTRY at %s: %s", name, hexencode(ehash)) - fetch_set.add(ehash) + if not ehash in fetch_dict: # Don't fetch twice. + fetch_set.add(ehash) elif cmd == 'FETCHED': if len(msg) != 3: continue entry = msg[2] logging.info("FETCHED from %s: %s", name, hexencode(ehash)) - chainsdb.add(ehash, entry) # Commit later. - newentry.append(ehash) # Writing to logorderfile later. + chainsdb.add(ehash, entry) + newentry.append(ehash) # Writing to logorderfile after loop. logorder.append(hexencode(ehash)) entries_in_log.add(ehash) - if ehash in fetch_set: - fetch_set.remove(ehash) del fetch_dict[ehash] chainsdb.commit() for ehash in newentry: add_to_logorder(logorderfile, ehash) fsync_logorder(logorderfile) - # Ask workers to fetch entries. + ## Ask workers to fetch new entries. logging.debug("nof entries to fetch including entries in log: %d", len(fetch_set)) fetch_set -= entries_in_log logging.info("entries to fetch: %d", len(fetch_set)) - # Add each entry in fetch_set to fetch_dict, key bing the hash - # and value being a list of storage nodes, in randomised - # order. - for e in fetch_set: - if not e in fetch_dict: - l = procs.values() - rand.shuffle(l) - fetch_dict[e] = l - # For each entry to fetch, treat its list of nodes as a - # circular list and ask the one in the front to fetch the + # Add entries to be fetched to fetch_dict, with the hash as + # key and value being a cyclic iterator of list of storage + # nodes, in randomised order. Ask next node to fetch the # entry. while fetch_set: - ehash = fetch_set.pop() - nodes = fetch_dict[ehash] - node = nodes.pop(0) - fetch_dict[ehash] = nodes + [node] - name, pipe, p = node - logging.info("asking %s to fetch %s", name, hexencode(ehash)) - pipe.send(('FETCH', ehash)) - - # Update the 'fetched' file. + e = fetch_set.pop() + if not e in fetch_dict: + l = list(procs.values()) + rand.shuffle(l) + fetch_dict[e] = cycle(l) + name, pipe, _ = fetch_dict[e].next() + logging.info("asking %s to FETCH %s", name, hexencode(e)) + pipe.send(('FETCH', e)) + + ## Update the 'fetched' file. logsize = len(logorder) if logsize == 0: last_hash = '' @@ -257,7 +257,7 @@ def merge_fetch_parallel(args, config, localconfig): currentsizefilecontent = newcontent write_file(currentsizefile, currentsizefilecontent) - # Wait some. + ## Wait some. sleep(1) return 0 |