diff options
author | Linus Nordberg <linus@nordu.net> | 2016-11-25 00:47:13 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-11-25 00:47:13 +0100 |
commit | 49b6e85963ef55fb6cfa1876fe825730f95658bc (patch) | |
tree | b3f5c605d64c984a092a34c32b34305d4a3d687a /tools/mergetools.py | |
parent | 19a2a611a839c0318f58347e2d93943c8e2401a5 (diff) |
Parallelise merge_fetch.py.
NOTE: Not supporting permdb yet!
We're still not passing the tests because merge_backup.py exits when
the secondary merge disappears.
Diffstat (limited to 'tools/mergetools.py')
-rw-r--r-- | tools/mergetools.py | 45 |
1 files changed, 42 insertions, 3 deletions
diff --git a/tools/mergetools.py b/tools/mergetools.py index 5471fe4..9a4f6b2 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -13,6 +13,7 @@ import requests import time import fcntl import errno +import multiprocessing import logging try: import permdb @@ -289,7 +290,7 @@ def sendentry_merge(node, baseurl, own_key, paths, entry, ehash): def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): try: - json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries] + json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries] result = http_request( baseurl + "plop/v1/merge/sendentry", json.dumps(json_entries), @@ -301,7 +302,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, hash + print >>sys.stderr, ehash print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" @@ -435,6 +436,10 @@ def waitforfile(path): return statinfo def flock_ex_or_fail(path): + """ + To be used at most once per process. Will otherwise leak file + descriptors. + """ try: fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB) except IOError, e: @@ -443,6 +448,30 @@ def flock_ex_or_fail(path): return False return True +def flock_ex_wait(path): + fd = os.open(path, os.O_CREAT) + logging.debug("waiting for exclusive lock on %s (%s)", fd, path) + fcntl.flock(fd, fcntl.LOCK_EX) + logging.debug("taken exclusive lock on %s", fd) + return fd + +def flock_sh_wait(path): + fd = os.open(path, os.O_CREAT) + logging.debug("waiting for shared lock on %s (%s)", fd, path) + fcntl.flock(fd, fcntl.LOCK_SH) + logging.debug("taken shared lock on %s", fd) + return fd + +def flock_release(fd): + logging.debug("releasing lock on %s", fd) + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + +def terminate_child_procs(): + for p in multiprocessing.active_children(): + #print >>sys.stderr, "DEBUG: terminating pid", p.pid + p.terminate() + class Status: def __init__(self, path): self.path = path @@ -452,12 +481,17 @@ class Status: class FileDB: def __init__(self, path): self.path = path + self.lockfile = None def get(self, key): return read_chain(self.path, key) def add(self, key, value): return write_chain(key, value, self.path) + def lock_sh(self): + self.lockfile = flock_sh_wait(self.path + "/.lock") + def lock_ex(self): + self.lockfile = flock_ex_wait(self.path + "/.lock") def commit(self): - pass + flock_release(self.lockfile) class PermDB: def __init__(self, path): @@ -466,5 +500,10 @@ class PermDB: return permdb.getvalue(self.permdbobj, key) def add(self, key, value): return permdb.addvalue(self.permdbobj, key, value) + def lock_sh(self): + assert False # NYI + def lock_ex(self): + assert False # NYI def commit(self): permdb.committree(self.permdbobj) + |