Multiprocessing IPC with multiple Queues

surya bhusal
3 min readNov 26, 2021

--

Often in the multiprocessing environment, we encounter a problem where we need to exchange data in between the process with more than a single queue.

In this article i’m going to present the demo toy example where we’ll use two different processes and three queues for exchanging the data in between the processes.

Our Problem

  • Calculating a square of an item in a queue in one process.
  • Another Process to calculate the square root of the items.

Architecting The Solution

As seen in the above diagram, we’re using three different queues, where the first queue (Square Queue)stores the items whose square is to be calculated.

Second Queue (SquareRoot Queue) stores the items whose square root has to be calculated.

And the final one (Result Queue) where we store the final result.

Solution 1: Wait between the processes starting time.

  • Not a recommended solution.
  • Here we wait for certain time after starting of process 1 so that the SquareRoot Queue at least has an item that can be consumed by processEdit 2.

Solution 2: Wait until the result queue is full

From the Docs:

Queue.full()

Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

Queue.get()

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

Solution 3: Using JoinableQueue

  • Joinable Queue Docs
  • JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

Using a JoinableQueue Allows us to implement a solution in a way that makes it possible to block the process until items in the queue are fully consumed without blocking the processes itself.

Underlying Methods

  • JoinableQueue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

  • JoinableQueue.join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

Link To Jupyter Notebook

Thanks for having a time to go through this toy example 😄 to get started with IPC communication within processes. Please do share the story if you find it to be helpful. Which of the solution from above three methods will you prefer.

--

--