summaryrefslogtreecommitdiff
path: root/tools/mergetools.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-25 00:47:13 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-25 00:47:13 +0100
commit49b6e85963ef55fb6cfa1876fe825730f95658bc (patch)
treeb3f5c605d64c984a092a34c32b34305d4a3d687a /tools/mergetools.py
parent19a2a611a839c0318f58347e2d93943c8e2401a5 (diff)
Parallelise merge_fetch.py.
NOTE: Not supporting permdb yet! We're still not passing the tests because merge_backup.py exits when the secondary merge disappears.
Diffstat (limited to 'tools/mergetools.py')
-rw-r--r--tools/mergetools.py45
1 files changed, 42 insertions, 3 deletions
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 5471fe4..9a4f6b2 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -13,6 +13,7 @@ import requests
import time
import fcntl
import errno
+import multiprocessing
import logging
try:
import permdb
@@ -289,7 +290,7 @@ def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):
def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
try:
- json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries]
+ json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries]
result = http_request(
baseurl + "plop/v1/merge/sendentry",
json.dumps(json_entries),
@@ -301,7 +302,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, hash
+ print >>sys.stderr, ehash
print >>sys.stderr, "======= RESPONSE ======="
print >>sys.stderr, result
print >>sys.stderr, "========================"
@@ -435,6 +436,10 @@ def waitforfile(path):
return statinfo
def flock_ex_or_fail(path):
+ """
+ To be used at most once per process. Will otherwise leak file
+ descriptors.
+ """
try:
fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB)
except IOError, e:
@@ -443,6 +448,30 @@ def flock_ex_or_fail(path):
return False
return True
+def flock_ex_wait(path):
+ fd = os.open(path, os.O_CREAT)
+ logging.debug("waiting for exclusive lock on %s (%s)", fd, path)
+ fcntl.flock(fd, fcntl.LOCK_EX)
+ logging.debug("taken exclusive lock on %s", fd)
+ return fd
+
+def flock_sh_wait(path):
+ fd = os.open(path, os.O_CREAT)
+ logging.debug("waiting for shared lock on %s (%s)", fd, path)
+ fcntl.flock(fd, fcntl.LOCK_SH)
+ logging.debug("taken shared lock on %s", fd)
+ return fd
+
+def flock_release(fd):
+ logging.debug("releasing lock on %s", fd)
+ fcntl.flock(fd, fcntl.LOCK_UN)
+ os.close(fd)
+
+def terminate_child_procs():
+ for p in multiprocessing.active_children():
+ #print >>sys.stderr, "DEBUG: terminating pid", p.pid
+ p.terminate()
+
class Status:
def __init__(self, path):
self.path = path
@@ -452,12 +481,17 @@ class Status:
class FileDB:
def __init__(self, path):
self.path = path
+ self.lockfile = None
def get(self, key):
return read_chain(self.path, key)
def add(self, key, value):
return write_chain(key, value, self.path)
+ def lock_sh(self):
+ self.lockfile = flock_sh_wait(self.path + "/.lock")
+ def lock_ex(self):
+ self.lockfile = flock_ex_wait(self.path + "/.lock")
def commit(self):
- pass
+ flock_release(self.lockfile)
class PermDB:
def __init__(self, path):
@@ -466,5 +500,10 @@ class PermDB:
return permdb.getvalue(self.permdbobj, key)
def add(self, key, value):
return permdb.addvalue(self.permdbobj, key, value)
+ def lock_sh(self):
+ assert False # NYI
+ def lock_ex(self):
+ assert False # NYI
def commit(self):
permdb.committree(self.permdbobj)
+