summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-12-01 15:06:50 +0100
committerLinus Nordberg <linus@nordu.net>2016-12-01 15:06:50 +0100
commitb6e98f32655b906b4c525ae314439cbae56c1ccb (patch)
treeacca72eb5d846a918ad305519f24fe4105d4430c
parentdac28bf5822d5ee1ed02f79e6f735ba3f4b448e3 (diff)
Drain pipe between fetch_send control process and worker processes.
The pipe is limited so send blocks. Drain after _every_ send in _both_ directions.
-rwxr-xr-xtools/merge_fetch.py87
1 files changed, 55 insertions, 32 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index 5fdb7d0..c66dc03 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -103,6 +103,12 @@ def merge_fetch_sequenced(args, config, localconfig):
else:
return (tree_size, logorder[tree_size-1])
+def read_parent_messages(pipe, to_fetch):
+ while pipe.poll():
+ cmd, ehash = pipe.recv()
+ if cmd == 'FETCH':
+ to_fetch.add(ehash)
+
def merge_fetch_worker(args, localconfig, storagenode, pipe):
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
@@ -119,10 +125,7 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe):
to_fetch = set()
while True:
## Read all messages from parent.
- while pipe.poll():
- cmd, ehash = pipe.recv()
- if cmd == 'FETCH':
- to_fetch.add(ehash)
+ read_parent_messages(pipe, to_fetch)
## Fetch entries from node.
if to_fetch:
@@ -136,13 +139,16 @@ def merge_fetch_worker(args, localconfig, storagenode, pipe):
verify_entry(verifycert, entry, ehash)
pipe.send(('FETCHED', ehash, entry))
to_fetch.remove(ehash)
+ read_parent_messages(pipe, to_fetch) # Drain pipe.
## Ask node for more entries.
for ehash in get_new_entries(name, url, own_key, paths):
pipe.send(('NEWENTRY', ehash))
+ read_parent_messages(pipe, to_fetch) # Drain pipe.
- ## Wait some.
- sleep(max(3, args.mergeinterval / 10))
+ ## Wait some if nothing to do.
+ if not to_fetch:
+ sleep(max(3, args.mergeinterval / 10))
def term(signal, arg):
terminate_child_procs()
@@ -158,6 +164,34 @@ def newworker(name, args):
logging.debug("%s started, pid %d", name, p.pid)
return (name, my_conn, p)
+def read_worker_messages(procs, messages, args, localconfig):
+ for name, pipe, p, storagenode in procs.values():
+ if not p.is_alive():
+ logging.warning("%s is gone, restarting", name)
+ procs[name] = \
+ newworker(name, [args, localconfig, storagenode]) + (storagenode,)
+ continue
+ while pipe.poll():
+ messages.append((name, pipe.recv()))
+
+def process_worker_message(name, msg, fetch_dict, fetch_set, chainsdb, newentry,
+ logorder, entries_in_log):
+ cmd = msg[0]
+ ehash = msg[1]
+ if cmd == 'NEWENTRY':
+ logging.info("NEWENTRY at %s: %s", name, hexencode(ehash))
+ if not ehash in fetch_dict: # Don't fetch twice.
+ fetch_set.add(ehash)
+ elif cmd == 'FETCHED':
+ entry = msg[2]
+ logging.info("FETCHED from %s: %s", name, hexencode(ehash))
+ chainsdb.add(ehash, entry)
+ newentry.append(ehash) # Writing to logorderfile after loop.
+ logorder.append(hexencode(ehash))
+ entries_in_log.add(ehash)
+ if ehash in fetch_dict:
+ del fetch_dict[ehash]
+
def merge_fetch_parallel(args, config, localconfig):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
@@ -187,34 +221,21 @@ def merge_fetch_parallel(args, config, localconfig):
fetch_set = set()
fetch_dict = {}
+ messages = []
+
while procs:
## Poll worker processes and handle messages.
assert not fetch_set
newentry = []
- for name, pipe, p, storagenode in procs.values():
- if not p.is_alive():
- logging.warning("%s is gone, restarting", name)
- procs[name] = \
- newworker(name,
- [args, localconfig, storagenode]) + (storagenode,)
- continue
- logging.info("polling %s", name)
- while pipe.poll():
- msg = pipe.recv()
- cmd = msg[0]
- ehash = msg[1]
- if cmd == 'NEWENTRY':
- logging.info("NEWENTRY at %s: %s", name, hexencode(ehash))
- if not ehash in fetch_dict: # Don't fetch twice.
- fetch_set.add(ehash)
- elif cmd == 'FETCHED':
- entry = msg[2]
- logging.info("FETCHED from %s: %s", name, hexencode(ehash))
- chainsdb.add(ehash, entry)
- newentry.append(ehash) # Writing to logorderfile after loop.
- logorder.append(hexencode(ehash))
- entries_in_log.add(ehash)
- del fetch_dict[ehash]
+
+ # Drain pipe, then process messages.
+ read_worker_messages(procs, messages, args, localconfig)
+ for name, msg in messages:
+ process_worker_message(name, msg, fetch_dict, fetch_set, chainsdb,
+ newentry, logorder, entries_in_log)
+ messages = []
+
+ # Commit to chains database and update 'logorder' file.
chainsdb.commit()
for ehash in newentry:
add_to_logorder(logorderfile, ehash)
@@ -238,6 +259,7 @@ def merge_fetch_parallel(args, config, localconfig):
name, pipe, _, _ = fetch_dict[e].next()
logging.info("asking %s to FETCH %s", name, hexencode(e))
pipe.send(('FETCH', e))
+ read_worker_messages(procs, messages, args, localconfig) # Drain pipe.
## Update the 'fetched' file.
logsize = len(logorder)
@@ -251,8 +273,9 @@ def merge_fetch_parallel(args, config, localconfig):
currentsizefilecontent = newcontent
write_file(currentsizefile, currentsizefilecontent)
- ## Wait some.
- sleep(1)
+ ## Wait some if nothing to do.
+ if not messages:
+ sleep(1)
return 0