BlogsDope image BlogsDope

Multiprocessing using Pool in Python

June 24, 2020 PYTHON 358

In the last tutorial, we did an introduction to multiprocessing and the Process class of the multiprocessing module. Today, we are going to go through the Pool class.

In the Process class, we had to create processes explicitly. However, the Pool class is more convenient, and you do not have to manage it manually. The syntax to create a pool object is multiprocessing.Pool(processes, initializer, initargs, maxtasksperchild, context). All the arguments are optional. processes represent the number of worker processes you want to create. The default value is obtained by os.cpu_count(). The second initializer argument is a function used for initialization, and the initargs are the arguments passed to it. maxtasksperchild represents the number of tasks assigned to each child process. After that number of tasks, the process will get replaced by a new worker process. The advantage of specifying this is that any unused resources will be released. If not provided any, the processes will exist as long as the pool does.

Consider the following example that calculates the square of the number and sleeps for 1 second.

import time
from multiprocessing import Pool


def square(x):
    print(f"start process:{x}")
    square = x * x
    print(f"square {x}:{square}")
    time.sleep(1)
    print(f"end process:{x}")


if __name__ == "__main__":
    starttime = time.time()
    pool = Pool()
    pool.map(square, range(0, 5))
    pool.close()
    endtime = time.time()
    print(f"Time taken {endtime-starttime} seconds")

Output

start process:0
start process:1
square 1:1
square 0:0
end process:1
start process:2
end process:0
start process:3
square 2:4
square 3:9
end process:3
end process:2
start process:4
square 4:16
end process:4
Time taken 3.0474610328674316 seconds

Here, we import the Pool class from the multiprocessing module. In the main function, we create an object of the Pool class. The pool.map() takes the function that we want parallelize and an iterable as the arguments. It runs the given function on every item of the iterable.  It also takes an optional chunksize argument, which splits the iterable into the chunks equal to the given size and passes each chunk as a separate task. The pool.close() is used to reject new tasks.

We can see that the time taken is approximately 3 seconds.

The pool.imap() is almost the same as the pool.map() method. The difference is that the result of each item is received as soon as it is ready, instead of waiting for all of them to be finished. Moreover, the map() method converts the iterable into a list (if it is not). However, the imap() method does not.

Consider the following example.

import time
from multiprocessing import Pool


def square(x):
    print(f"start process {x}")
    square = x * x
    time.sleep(1)
    print(f"end process {x}")
    return square


if __name__ == "__main__":
    pool = Pool()
    a = pool.map(square, range(0, 5))
    print(a)

Output

start process 0
start process 1
end process 0
start process 2
end process 1
start process 3
end process 2
start process 4
end process 3
end process 4
[0, 1, 4, 9, 16]

Let’s now do the same example using the imap() method.

import time
from multiprocessing import Pool


def square(x):
    print(f"start process {x}")
    square = x * x
    time.sleep(1)
    print(f"end process {x}")
    return square


if __name__ == "__main__":
    pool = Pool()
    a = pool.imap(square, range(0, 5))
    for i in a:
        print(f"showing the result as it is ready {i}")

Output

start process 0
start process 1
end process 0
start process 2
end process 1
start process 3
showing the result as it is ready 0
showing the result as it is ready 1
end process 2
start process 4
end process 3
showing the result as it is ready 4
showing the result as it is ready 9
end process 4
showing the result as it is ready 16

While the pool.map() method blocks the main program until the result is ready, the pool.map_async() method does not block, and it returns a result object. The syntax is pool.map_async(function, iterable, chunksize, callback, error_callback). The arguments, callback. and error_callback are optional.

Let’s see an example.

import time
from multiprocessing import Pool


def square(x):
    square = x * x
    print("start process")
    time.sleep(1)
    return square


if __name__ == "__main__":
    pool = Pool()
    result = pool.map_async(square, range(0, 5))
    print("main script")
    print(result.get())
    print("end main script")

start process
start process
main script
start process
start process
start process
[0, 1, 4, 9, 16]

As you can see in the output above, the map_async() method does not block the main script. The result.get() method is used to obtain the return value of the square() method. Note that result.get() holds up the main program until the result is ready. It also takes a timeout argument, which means that it will wait for timeout seconds for the result. If the result does not arrive by that time, a timeout error is thrown.

The pool.apply() method calls the given function with the given arguments. The syntax is pool.apply(function, args, keywordargs). Just like pool.map(), it also blocks the main program until the result is ready. It also has a variant, i.e., pool.apply_async(function, args, keyargs, error_callback).

Let’s see.

import time
from multiprocessing import Pool


def test(x):
    print("start process")
    time.sleep(x)
    print("end process")


if __name__ == "__main__":
    pool = Pool()
    pool.apply(test, (2,))
    pool.apply(test, (2,))
    print("main script")
    print("end main script")

Output

start process
end process
start process
end process
main script
end main script

Let’s do the same example with the asynchronous variant.

import time
from multiprocessing import Pool


def test(x):
    print("start process")
    time.sleep(x)
    print("end process")


if __name__ == "__main__":
    pool = Pool()
    a = pool.apply_async(test, (2,))
    b = pool.apply_async(test, (2,))
    print("main script")
    print("end main script")
    a.wait()
    b.wait()

Output

start process
start process
main script
end main script
end process
end process

As you can observe, the pool.apply() method blocks the main script, while the pool.apply_async() method doesn’t. The wait() method waits for the result, you can also pass timeout as an argument like the get() method.

You can also use ready() and successful() methods on the result object returned by the async methods. The ready() method returns True if the call has completed and False, otherwise. The successful() method returns True if the call has completed without raising an exception. It throws a ValueError (in version 3.7), and an AssertionError (in previous versions) if the result is not ready.


Liked the post?
A computer science student having interest in web development. Well versed in Object Oriented Concepts, and its implementation in various projects. Strong grasp of various data structures and algorithms. Excellent problem solving skills.
Editor's Picks
0 COMMENT

Please login to view or add comment(s).