Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synchronize with signals and fix tensors presented #345

Closed
wants to merge 5 commits into from

Conversation

StevenSong
Copy link
Collaborator

@StevenSong StevenSong commented Jun 25, 2020

resolves #323
resolves #346

this PR aims to use more reliable and portable python functions (no longer using qsize), to fix the number of tensors presented (i + 1) % ... and to fix 2 possible concurrency edge cases:

  1. consumer dequeues more items than originally produced at time of decision to consume. current code only blocks producers when qsize == num_workers and consumer consumes until the queue is empty (see appendix for current code). this opens the door for this scenario:
let's say workers = 4
1. 4 workers put things into queue and block while the queue is full
2. consumer begins to dequeue and pops 1 item off the queue
3. workers begin putting more items into the queue because queue is no longer "full"
4. consumer pops from queue until queue is empty
at this point, consumer has consumed more than the 4 items it originally wanted.

the fix is to have producers wait until the consumer gives the "all clear" to begin producing again. the consumer waits for all the producers to have enqueued, processes all the items, and then gives the all clear. concurrently, the producers are waiting to produce the next item.

  1. the other edge case is for a single producer to enqueue more than once before the consumer dequeues. this example in the current code can happen:
let's say workers = 2
1. worker 0 enqueues an item
2. worker 1 is lazy
3. worker 0 enqueues another item
4. consumer detects 2 items in queue, start to dequeue
worker 0 has just reported stats for the same set of paths twice, as each worker gets a distinct set of paths

the fix is again to have the producers wait until the "all clear" is given before enqueuing the next set of items.

Appendix:

current consumer code:
https://github.com/broadinstitute/ml/blob/dd13b4518b3547ebdcad13698f0c71a4abaaafb8/ml4cvd/tensor_generators.py#L171-L186

current producer code:
https://github.com/broadinstitute/ml/blob/dd13b4518b3547ebdcad13698f0c71a4abaaafb8/ml4cvd/tensor_generators.py#L426-L437

@StevenSong
Copy link
Collaborator Author

StevenSong commented Jun 26, 2020

this change might currently be causing deadlock, investigating

@StevenSong
Copy link
Collaborator Author

StevenSong commented Jun 26, 2020

found the deadlock...

lets say num workers = 4

  • 4 producers all put things in q then signal that they have all put items in
  • consumer begins consuming until the q is empty - consumer reports only 3 items were dequeued and counts of tensors presented are off because the last item from the q was not reported
  • since queue was initialized to max size num_workers, next time producers try to enqueue, the last producer will have a q with 4 items already in it (1 leftover from last round) will be stuck since q is full and can't signal consumer, therefore consumer won't see signal and won't dequeue - deadlock

the implication here is that Queue put is non-atomic and that there is a lag between an item being put into queue in a producer process and the item being visible in queue in a consumer process

there's some documentation of non-atomicity here:
https://bugs.python.org/issue14976
and here:
https://codewithoutrules.com/2017/08/16/concurrency-python/ (not exactly same bug but implies put is non-atomic)
and here:
https://stackoverflow.com/questions/34641807/why-does-pythons-multiprocessing-queue-have-a-buffer-and-a-pipe

the worst part is that apparently this issue was fixed in python 3.7 (see the python bug report link). we're on 3.6 (let's upgrade eventually! @lucidtronix)

the fix is hopefully to enforce the consumer consumes num_workers number of items - currently testing fixed

@@ -161,7 +161,7 @@ def set_worker_paths(self, paths: List[Path]):
def __next__(self) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Optional[List[str]]]:
if not self._started:
self._init_workers()
if self.stats_q.qsize() == self.num_workers:
if all(worker.signal.is_set() for worker in self.worker_instances):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we abstract this to a function, e.g. def epoch_just_finished() or something? I want to use it to know when to stop inference with multiprocessing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, aggregate_and_print_stats will reset this condition though, just a thing to be aware of

@@ -161,7 +161,7 @@ def set_worker_paths(self, paths: List[Path]):
def __next__(self) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray], Optional[List[str]]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a test that runs one epoch worth of batches (with multiprocessing) and checks that every path is visited exactly once?

Copy link
Collaborator Author

@StevenSong StevenSong Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, this was a really good idea, during development of this test, it seems its possible to see a path twice before a true epoch is reached (all the paths are seen at least once)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

screenshot from a notebook, batch size = 10, validation steps = 20

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized this may be inevitable if some workers finish before others. Maybe the test should just be each path is visited at least once. 200 vs 121 feels extreme though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

200 vs 121 might be because of small batch size, the worker that is started first is probably able to get through 20 (10 per batch * 2 batches) before the last worker is started maybe

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can get this closer but to fix issue in such a way that tensors are repeated correctly when number of tensors is not a multiple of batch size is proving very difficult

image

see: #345 (comment)

@StevenSong
Copy link
Collaborator Author

StevenSong commented Jul 1, 2020

intermediate implementation is to prevent workers from completing cycle N+1 before other workers complete cycle N. this lead to this discovery:

using old example, batch size = 10, validation steps = 20, validation tensors = 200, num workers = 6. each worker has 33-34 tensors each - i.e. to visit each tensor at least once, all workers must complete 4 cycles

validation worker 1-6 complete cycle 1 and yield 60 tensors (batch size 10) (6 validation steps)
validation worker 1-6 complete cycle 2 and yield 60 tensors, total 120 tensors (12 validation steps)
validation worker 1-6 complete cycle 3 and yield 60 tensors, total 180 tensors (18 validation steps)
2 more validation workers complete cycle 4 and yield 20 tensors - these encompass 6-8 more unique tensors and 14-12 non-unique tensors (depending on if the workers have 33 or 34 tensors each). this is the completion of 20 validation steps. It is not possible to reach all 200 validation tensors in 20 validation steps with 6 validation workers.

the fix is to mess with num workers again to allocate according to batch size, number of steps and number of tensors.

@StevenSong
Copy link
Collaborator Author

the fix is to mess with num workers again to allocate according to batch size, number of steps and number of tensors.

not as simple as changing number of workers, changing number of workers is okay when the number of tensors is a multiple of the batch size as then you can reach a true epoch by requesting N batches. if the number of tensors in a true epoch are not a multiple of batch size, then every true epoch after the first will require varying number of batches, depending on which workers finish first

@StevenSong
Copy link
Collaborator Author

StevenSong commented Jul 1, 2020

intermediate implementation is to prevent workers from completing cycle N+1 before other workers complete cycle N. this lead to this discovery:

using old example, batch size = 10, validation steps = 20, validation tensors = 200, num workers = 6. each worker has 33-34 tensors each - i.e. to visit each tensor at least once, all workers must complete 4 cycles

validation worker 1-6 complete cycle 1 and yield 60 tensors (batch size 10) (6 validation steps)
validation worker 1-6 complete cycle 2 and yield 60 tensors, total 120 tensors (12 validation steps)
validation worker 1-6 complete cycle 3 and yield 60 tensors, total 180 tensors (18 validation steps)
2 more validation workers complete cycle 4 and yield 20 tensors - these encompass 6-8 more unique tensors and 14-12 non-unique tensors (depending on if the workers have 33 or 34 tensors each). this is the completion of 20 validation steps. It is not possible to reach all 200 validation tensors in 20 validation steps with 6 validation workers.

the fix is to mess with num workers again to allocate according to batch size, number of steps and number of tensors.

A proposal to this is to set the number of steps larger such that all tensors are seen by the model at least once.

Probably num_steps = int(2 * num_tensors / batch_size) + 1 would be good

@StevenSong
Copy link
Collaborator Author

rewriting tensor generator #353

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

not all tensors properly used by train refactor concurrency to not use qsize
2 participants