multithreading - Python process synchronization -
i trying set number of 'worker' threads/processes, prime them list of 'commands' work through , them step through commands one-by-one in step each other.
update have had few questions why taking approach, here's bit of context: i'm using approach automated testing script. simulating multi-user environment, different users running application points shared resource. want carry out sequence of operations simultaneously on api multiple clients. want able command each worker produce repeatable set of tests. without synchronisation have no guarantee operations carried out in order expect. requirement (i perchance haven't mentioned) want have commands executed simultaneously. e.g. writing big amount of info db.
i using multiprocessing
module in python 2.7.5b3 on windows 7. far, have next illustration working, illustrative of trying do.
this illustration gets workers write results shared queue, can see order in commands executed.
the worker (worker.py):
from multiprocessing import process, queue, event, lock class worker(process): def __init__(self, execute, q_out): process.__init__(self) print self.name, 'init' self.daemon = true self.q_in = queue() self.q_out = q_out self.execute = execute def run(self): print self.name, 'running' self.execute.wait() while not self.q_in.empty(): cmd = self.q_in.get() self.q_out.put((self.name, cmd))
the manager:
from multiprocessing import event, queue worker import worker if __name__ == '__main__': workers = [] syncevent = event() shared_q = queue() in range(0,2): worker = worker(syncevent, shared_q) map(worker.q_in.put, ['a', 'b', 'c']) workers.append(worker) worker.start() syncevent.set() w in workers: w.join() while not shared_q.empty(): print shared_q.get()
this gives me output follows:
worker-1 init worker-2 init worker-1 running worker-2 running ('worker-1', 'a') ('worker-1', 'b') ('worker-1', 'c') ('worker-2', 'a') ('worker-2', 'b') ('worker-2', 'c')
what trying accomplish output:
worker-1 init worker-2 init worker-1 running worker-2 running ('worker-1', 'a') ('worker-2', 'a') ('worker-1', 'b') ('worker-2', 'b') ('worker-1', 'c') ('worker-2', 'c')
i have locked @ lock
andrlock
, doesn't seem fit bill, trying threads running @ same time, stop , wait until others finished before executing next command.
i sure there nice , simple way this, can't quite head around is. have suggestions how proceed?
this class should work synchronise processes. holds processes on threading condition, when lastly worker done, notifies other processes woken , able continue
worker.py
from multiprocessing import process, queue, event, lock class worker(process): def __init__(self, execute, q_out, syncher): process.__init__(self) print self.name, 'init' self.daemon = true self.q_in = queue() self.q_out = q_out self.execute = execute self.syncher = syncher def run(self): print self.name, 'running' self.execute.wait() while not self.q_in.empty(): self.syncher.check() cmd = self.q_in.get() self.q_out.put((self.name, cmd))
manager.py
from multiprocessing import event, queue, condition, lock, value worker import worker class synchroniser(object): def __init__(self, workers): self.workers_locked = value('i', 0) self.workers = workers self.condition = condition(lock()) def check(self): self.condition: self.workers_locked.value += 1 if self.workers_locked.value >= self.workers: self.condition.notify_all() else: self.condition.wait() self.workers_locked.value -= 1 if __name__ == '__main__': workers = [] syncevent = event() shared_q = queue() worker_num = 2 syncher = synchroniser(worker_num) in range(0,worker_num): worker = worker(syncevent, shared_q, syncher) map(worker.q_in.put, ['a', 'b', 'c']) workers.append(worker) worker.start() syncevent.set() w in workers: w.join() while not shared_q.empty(): print shared_q.get()
> python manager.py worker-1 init worker-2 init worker-1 running worker-2 running ('worker-1', 'a') ('worker-2', 'a') ('worker-2', 'b') ('worker-1', 'b') ('worker-1', 'c') ('worker-2', 'c')
python multithreading synchronization
No comments:
Post a Comment