summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtools/merge_fetch.py147
-rw-r--r--tools/mergetools.py31
2 files changed, 74 insertions, 104 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index 7e0dfd8..633ee67 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -104,59 +104,46 @@ def merge_fetch_sequenced(args, config, localconfig):
def merge_fetch_worker(args, localconfig, storagenode, pipe):
paths = localconfig["paths"]
- mergedb = paths["mergedb"]
- chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
localconfig["nodename"]))
+
+ # NOTE: We should probably verifycert.communicate(struct.pack("I",0))
+ # to ask the verifycert process to quit nicely.
+ verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
to_fetch = set()
timeout = max(3, args.mergeinterval / 10)
while True:
if pipe.poll(timeout):
- msg = pipe.recv().split()
+ msg = pipe.recv()
if len(msg) < 2:
continue
cmd = msg[0]
ehash = msg[1]
if cmd == 'FETCH':
- to_fetch.add(hexdecode(ehash))
- else:
- logging.warning("%s: unknown command from parent: %s",
- storagenode["name"], msg)
+ to_fetch.add(ehash)
- if len(to_fetch) > 0:
+ if to_fetch:
logging.info("%s: fetching %d entries", storagenode["name"],
len(to_fetch))
- # TODO: Consider running the verifycert process longer.
- verifycert = subprocess.Popen(
- [paths["verifycert_bin"], paths["known_roots"]],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- # Chunking for letting other workers take the chainsdb lock.
- for chunk in chunks(list(to_fetch), 100):
- chainsdb.lock_ex()
- with requests.sessions.Session() as session:
- entries = get_entries(storagenode["name"],
- "https://%s/" % storagenode["address"],
- own_key, paths, chunk, session=session)
- for ehash in chunk:
- entry = entries[ehash]
- verify_entry(verifycert, entry, ehash)
- chainsdb.add(ehash, entry)
- chainsdb.commit()
- chainsdb.release_lock()
- for ehash in chunk:
- pipe.send('FETCHED %s' % hexencode(ehash))
- to_fetch.remove(ehash)
- verifycert.communicate(struct.pack("I", 0))
+ fetchlist = list(to_fetch)
+ with requests.sessions.Session() as session:
+ entries = get_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths, fetchlist,
+ session=session)
+ for ehash in fetchlist:
+ entry = entries[ehash]
+ verify_entry(verifycert, entry, ehash)
+ pipe.send(('FETCHED', ehash, entry))
+ to_fetch.remove(ehash)
new_entries = get_new_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
own_key, paths)
- if len(new_entries) > 0:
- logging.info("%s: got %d new entries", storagenode["name"],
- len(new_entries))
- for ehash in new_entries:
- pipe.send('NEWENTRY %s' % hexencode(ehash))
+ for ehash in new_entries:
+ pipe.send(('NEWENTRY', ehash))
def term(signal, arg):
terminate_child_procs()
@@ -176,6 +163,7 @@ def merge_fetch_parallel(args, config, localconfig):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
+ chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
currentsizefile = mergedb + "/fetched"
@@ -187,13 +175,15 @@ def merge_fetch_parallel(args, config, localconfig):
name = storagenode['name']
procs[name] = newworker(name, [args, localconfig, storagenode])
- logorder = get_logorder(logorderfile) # List of entries in log.
- entries_in_log = set(logorder) # Set of entries in log.
- entries_to_fetch = set() # Set of entries to fetch.
- fetch = {} # Dict with entries to fetch.
+ currentsizefilecontent = ""
+ logorder = get_logorder(logorderfile) # List of hashes in log, hexencoded.
+ entries_in_log = set(logorder) # Set of hashes in log, binary.
+ fetch_set = set()
+ fetch_dict = {}
while procs:
- assert(not entries_to_fetch)
- # Poll worker processes.
+ # Poll worker processes and handle messages.
+ assert(not fetch_set)
+ newentry = []
for name, pipe, p in procs.values():
if not p.is_alive():
logging.warning("%s is gone, restarting", name)
@@ -201,52 +191,59 @@ def merge_fetch_parallel(args, config, localconfig):
storagenodes[name]])
continue
logging.info("polling %s", name)
- if pipe.poll(1):
- msg = pipe.recv().split()
+ while pipe.poll(0):
+ msg = pipe.recv()
if len(msg) < 2:
- logging.warning("unknown command from %s: %s", name, msg)
continue
cmd = msg[0]
ehash = msg[1]
if cmd == 'NEWENTRY':
- logging.info("NEWENTRY at %s: %s", name, ehash)
- entries_to_fetch.add(ehash)
- logging.debug("entries_to_fetch: %s", entries_to_fetch)
+ logging.info("NEWENTRY at %s: %s", name, hexencode(ehash))
+ fetch_set.add(ehash)
elif cmd == 'FETCHED':
- logging.info("FETCHED from %s: %s", name, ehash)
- logorder.append(ehash)
- add_to_logorder(logorderfile, hexdecode(ehash))
- fsync_logorder(logorderfile)
+ if len(msg) != 3:
+ continue
+ entry = msg[2]
+ logging.info("FETCHED from %s: %s", name, hexencode(ehash))
+ chainsdb.add(ehash, entry) # Commit later.
+ ehash_enc = hexencode(ehash)
+ newentry.append(ehash_enc) # Writing to logorderfile later.
+ logorder.append(ehash_enc)
entries_in_log.add(ehash)
- if ehash in entries_to_fetch:
- entries_to_fetch.remove(ehash)
- del fetch[ehash]
- else:
- logging.warning("unknown command from %s: %s", name, msg)
+ if ehash in fetch_set:
+ fetch_set.remove(ehash)
+ del fetch_dict[ehash]
+ chainsdb.commit()
+ for ehash_enc in newentry:
+ add_to_logorder(logorderfile, ehash)
+ fsync_logorder(logorderfile)
# Ask workers to fetch entries.
logging.debug("nof entries to fetch including entries in log: %d",
- len(entries_to_fetch))
- entries_to_fetch -= entries_in_log
- logging.info("entries to fetch: %d", len(entries_to_fetch))
- # Add entries in entries_to_fetch as keys in dictionary fetch,
- # values being a list of storage nodes, in randomised order.
- for e in entries_to_fetch:
- if not e in fetch:
+ len(fetch_set))
+ fetch_set -= entries_in_log
+ logging.info("entries to fetch: %d", len(fetch_set))
+ # Add each entry in fetch_set to fetch_dict, key bing the hash
+ # and value being a list of storage nodes, in randomised
+ # order.
+ for e in fetch_set:
+ if not e in fetch_dict:
+ save = procs.values()
l = procs.values()
rand.shuffle(l)
- fetch[e] = l
+ assert save == procs.values()
+ fetch_dict[e] = l
# For each entry to fetch, treat its list of nodes as a
# circular list and ask the one in the front to fetch the
# entry.
- while entries_to_fetch:
- ehash = entries_to_fetch.pop()
- nodes = fetch[ehash]
+ while fetch_set:
+ ehash = fetch_set.pop()
+ nodes = fetch_dict[ehash]
node = nodes.pop(0)
- fetch[ehash] = nodes.append(node)
+ fetch_dict[ehash] = nodes.append(node)
name, pipe, p = node
- logging.info("asking %s to FETCH %s", name, ehash)
- pipe.send("FETCH %s" % ehash)
+ logging.info("asking %s to fetch %s", name, hexencode(ehash))
+ pipe.send(('FETCH', ehash))
# Update the 'fetched' file.
logsize = len(logorder)
@@ -254,10 +251,14 @@ def merge_fetch_parallel(args, config, localconfig):
last_hash = ''
else:
last_hash = logorder[logsize - 1]
- logging.info("updating 'fetched' file: %d %s", logsize-1, last_hash)
- currentsize = {"index": logsize - 1, "hash": last_hash}
- logging.debug("writing to %s: %s", currentsizefile, currentsize)
- write_file(currentsizefile, currentsize)
+ newcontent = {"index": logsize - 1, "hash": last_hash}
+ if newcontent != currentsizefilecontent:
+ logging.info("updating 'fetched' file: %d %s", logsize - 1, last_hash)
+ currentsizefilecontent = newcontent
+ write_file(currentsizefile, currentsizefilecontent)
+
+ # Wait some.
+ sleep(1)
return 0
diff --git a/tools/mergetools.py b/tools/mergetools.py
index d5d5f75..1334186 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -484,25 +484,6 @@ def flock_ex_or_fail(path):
return False
return True
-def flock_ex_wait(path):
- fd = os.open(path, os.O_CREAT)
- logging.debug("waiting for exclusive lock on %s (%s)", fd, path)
- fcntl.flock(fd, fcntl.LOCK_EX)
- logging.debug("taken exclusive lock on %s", fd)
- return fd
-
-def flock_sh_wait(path):
- fd = os.open(path, os.O_CREAT)
- logging.debug("waiting for shared lock on %s (%s)", fd, path)
- fcntl.flock(fd, fcntl.LOCK_SH)
- logging.debug("taken shared lock on %s", fd)
- return fd
-
-def flock_release(fd):
- logging.debug("releasing lock on %s", fd)
- fcntl.flock(fd, fcntl.LOCK_UN)
- os.close(fd)
-
def terminate_child_procs():
for p in multiprocessing.active_children():
#print >>sys.stderr, "DEBUG: terminating pid", p.pid
@@ -522,12 +503,6 @@ class FileDB:
return read_chain(self.path, key)
def add(self, key, value):
return write_chain(key, value, self.path)
- def lock_sh(self):
- self.lockfile = flock_sh_wait(self.path + "/.lock")
- def lock_ex(self):
- self.lockfile = flock_ex_wait(self.path + "/.lock")
- def release_lock(self):
- flock_release(self.lockfile)
def commit(self):
pass
@@ -538,12 +513,6 @@ class PermDB:
return permdb.getvalue(self.permdbobj, key)
def add(self, key, value):
return permdb.addvalue(self.permdbobj, key, value)
- def lock_sh(self):
- assert False # NYI
- def lock_ex(self):
- assert False # NYI
- def release_lock(self):
- assert False # NYI
def commit(self):
permdb.committree(self.permdbobj)