Contents
Background
Python’s multiprocessing
package
One of the most useful APIs introduced by Python’s multiprocessing
package is the process Pool
, which “offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism).” [multiprocessing
documentation]
By creating child processes, the multiprocessing
package sidesteps the Global Interpreter Lock and allows full utilization of multiple processors. However, depending on the target function, memory footprint can be significant, and achieving desired behavior over process and pool termination can be tricky.
Calling system commands in Python
This post refers collectively to binary executables (such as ls
or wget
) and shell builtins (such as pwd
) as “system commands.” Several standard ways of calling system commands in Python are as follows:
os.system(<command>)
- Calls the
system()
system call, which according to the Linux man-pages,uses fork(2) to create a child process that executes the shell command specified in command using execl(3) as follows:
execl("/bin/sh", "sh", "-c", command, (char *) NULL);
system()
returns after the command has been completed.During execution of the command,
SIGCHLD
will be blocked, andSIGINT
andSIGQUIT
will be ignored, in the process that callssystem()
. (These signals will be handled according to their defaults inside the child process that executes command.)
- Calls the
os.exec*()
family of functions- Call the corresponding
exec*()
system calls.
- Call the corresponding
subprocess.Popen(<command>, **kwds)
- Implemented using
fork()
andexecv()
/execve()
system calls. - Nonblocking constructor that returns a
subprocess.Popen
object immediately, which can be waited upon usingsubprocess.Popen.wait()
. - The
<command>
can be executed directly or through a shell by passing theshell=True
argument. subprocess.Popen
objects cannot be picked.
- Implemented using
subprocess.run(<command>, **kwds)
- Calls
subprocess.Popen()
and blocks until the command finishes. - Returns a
CompletedProcess
object, which can be pickled.
- Calls
Process groups and signals
In Python, child processes created using multiprocessing
package or any of the methods above (with default arguments) for calling system commands will inherit the process group ID of the parent Python process. subprocess.Popen()
and subprocess.run()
additionally support a subprocess.CREATE_NEW_PROCESS_GROUP
flag that specifies “that a new process group will be created.” [subprocess
documentation] By default, Python translates SIGINT
signals into KeyboardInterrupt
exceptions. [signal
documentation]
In shells (e.g., sh
and bash
), child processes’ process group IDs depend on whether the shell is run interactively (i.e., in a terminal) or not (e.g., with the -c <command>
option). When run by an interactive shell, child processes are assigned their own unique process group IDs. When run by a non-interactive shell, child processes inherit the process group ID of the shell. In either mode, the shell catches and handles SIGINT
signals such that it waits for the current command to finish and “breaks out of any executing loops.” [Bash manual]
Signals generated by keyboard interrupts (e.g., SIGINT
, Ctrl+C; SIGTSTP
, Ctrl+Z; SIGQUIT
, Ctrl+\) are sent to the foreground process group. [Wikipedia]
Behavior of multiprocessing.Pool
For brevity and consistency, we will use the following conventions and assumptions:
- We follow the implementation provided by CPython v3.7.3.
mp
refers to themultiprocessing
package (e.g., assume that the lineimport multiprocessing as mp
has been run).- “Parent process” refers to the Python process that created the pool (i.e., the process in which
pool = mp.Pool()
was run).
Memory footprint
On Unix-based systems, the multiprocessing
package defaults to the 'fork'
method to start processes. Depending on when garbage collection routines are run, the physical memory occupied by the child processes may be quite high. Memory footprint of child processes can be significantly reduced by using the 'spawn'
start method (mp.set_start_method('spawn')
), which launches each child process as a fresh Python interpreter that only inherits resources as necessary.
Upon initialization (e.g., in the constructor), a pool
object creates a pool of processes, each of whose target is the mp.pool.worker()
function, which waits until a task (consisting of a function to run and arguments to call it with, such as provided to pool.apply_async()
) arrives in a task queue. We consider memory footprint with 3 types of task functions:
- Does not call the
fork()
orexec()
family of system calls.- Example:
pool.apply_async(time.sleep, (10,))
- Process hiearchy
- parent process
- pool process
- parent process
- Memory footprint depends on the process start method (
'fork'
or'spawn'
).
- Example:
- Uses the
fork()
system call to create a child process to run the actual command.- Example:
pool.apply_async(os.system, ('sleep 10',))
- Process hiearchy
- parent process
- pool process
- [shell process]
- command process
- [shell process]
- pool process
- parent process
- Memory footprint is relatively high and depends in part on the process start method. A Python pool process, and possibly a shell process, is required to run alongside the desired command process.
- Example:
- Uses the
exec()
family of system calls before/without callingfork()
to run the actual command- Example:
pool.apply_async(os.execvp, ('sleep', ('10',)))
- Process hiearchy
- parent process
- pool process (whose memory space, originally occupied by a Python process, has been cannibalized to run the command)
- parent process
- Memory footprint is low, since there are no additional Python or shell processes required to run alongside the desired command process.
- Example:
Handling SIGINT
signals
Consider the following set of processes:
- parent process (Python interpreter)
- idle pool process (idle, waiting for a task)
- active pool process (running task function)
- shell process (may or may not exist, depending on the task function)
- system command process (may or may not exist, depending on the task function)
We then consider sending a SIGINT
signal to any one process. For the parent process, idle pool process, shell process, and system command process, the behavior is the same regardless of the task function:
- Parent process: The Python interpreter raises a
KeyboardInterrupt
exception, which usually just means that the prompt is cleared andKeyboardInterrupt
is printed to the terminal. Thepool
is not affected, since the various threads (worker handler, task handler, and result handler) managing the pool are not stopped, in part because there is no direct way to stop a thread in Python. - Idle pool process: The process exits with exitcode 1. The pool worker handler then repopulates the pool with a new process.
- Details: The
mp.pool.worker()
function does not handle anyKeyboardInterrupt
exceptions raised while the pool process is blocked (idling) waiting for a task, so the exception is passed up one stack frame to the pool process object’s_boostrap()
method, which catches theKeyboardInterrupt
, prints out a traceback, and returns. The next stack frame up is the_launch()
method of aPopen
object (note: thePopen
classes defined in themp
package and thesubprocess.Popen
class are distinct) created by the pool process object. Within_launch()
,os._exit()
is called, killing the pool process.
- Details: The
- Shell processes: The shell waits for any currently executing system command to finish, then breaks out of any executing loops.
- System command processes: The behavior of the system command upon receiving a
SIGINT
signal will depend on whether a signal handler was installed, either by the system command program, or by thetrap
shell builtin.
For the active pool process, the behavior depends on the task function:
- A Python function that does not call the
fork()
orexec()
family of system calls.- Unless a
SIGINT
handler was installed, the task function is interrupted, and the pool process puts a result value of(False, KeyboardInterrupt)
in the pool results queue. The pool process then waits for the next task if it has not yet reached itsmaxtasks
limit, or exits otherwise (and is replaced with a new pool process by the pool worker handler).
- Unless a
os.system()
- The
SIGINT
signal is ignored (queued) until thesystem()
system call returns, after which the pool process puts a result value of(False, KeyboardInterrupt)
in the pool results queue. The pool process then waits for the next task if it has not yet reached itsmaxtasks
limit, or exits otherwise (and is replaced with a new pool process by the pool worker handler).
- The
subprocess.run()
- The pool process exits before placing any result in the pool results queue and is replaced with a new pool process by the pool worker handler. If
subprocess.run()
was called with the defaultshell=False
argument, then the system command is killed. If the argumentshell=True
was set, then the shell is killed, but the system command continues to run.- Details: It is unclear why no result is added to the pool results queue.
subprocess.run()
blocks until the system command exits, or an exception is raised. In the case of aKeyboardInterrupt
exception, the system command is killed, and theKeyboardInterrupt
exception is re-raised (sosubprocess.run()
ends but never returns). The pool process should catch this exception and add(False, KeyboardInterrupt)
to the pool results queue, but that is not the observed behavior.
- Details: It is unclear why no result is added to the pool results queue.
- The pool process exits before placing any result in the pool results queue and is replaced with a new pool process by the pool worker handler. If
subprocess.Popen()
- The pool process exits and is replaced with a new pool process by the pool worker handler. Any shell and/or system command continues to run.
- Details: Since
subprocess.Popen()
is nonblocking, it immediately returns, and the pool process tries to put thesubprocess.Popen
object into the pool results queue. However,subprocess.Popen
objects cannot be pickled, so the pool process puts amultiprocessing.pool.MaybeEncodingError
exception in the pool results queue and waits for the next task if it has not yet reached itsmaxtasks
limit, or exits otherwise (and is replaced with a new pool process by the pool worker handler). AnySIGINT
signal is likely to be received by the pool process when it is waiting for its next task. The behavior is then analogous to that of an idle pool process receiving aSIGINT
signal.
- Details: Since
- The pool process exits and is replaced with a new pool process by the pool worker handler. Any shell and/or system command continues to run.
os.exec*()
- The pool process becomes the system command process, so the behavior is defined by the signal handler (if any) installed by the system command. Assuming the default signal handler, the system command exits. The pool is repopulated by the worker handler.
Note that keyboard interrupts (e.g., Ctrl-C) are sent to the entire process group, and the overall behavior can be understood as if SIGINT
signals are sent to each process as discussed above.
pool.terminate()
pool.terminate()
calls the terminate()
method of each pool process, sending them SIGTERM
signals. As before, we consider sending the effect of pool.terminate()
on any one process.
- Parent process: not relevant / no effect
- Idle pool process: exits and is not replaced
- Active pool process: exits without putting any result in the pool results queue, and is not replaced
- The only exception is if the task function is
subprocess.Popen()
, in which case the pool process will put a result with an exception in the pool results queue unlesspool.terminate()
is run before the pool process finishes its nonblocking creation of a subprocess. - If task function is
os.exec*()
, the pool process becomes the system command process, so the behavior is defined by the signal handler (if any) installed by the system command. Assuming the default signal handler (i.e., immediate termination upon receiving aSIGTERM
signal), the system command process exits.
- The only exception is if the task function is
- Shell process: not affected
- System command process: not affected, unless the task function is
os.exec*()
(see above)
Summary
In many cases, the standard
pool = mp.Pool()
pool.apply_async(<target_function>)
usage is sufficient. However, the optimal use of multiprocessing
pools depends on the target functions one wants to run in the pool. Target functions can be split into two types:
- Python functions that do not call the
fork()
orexec()
family of system calls. To reduce memory utilization at the expense of speed of launching processes, you can set the process start method to'spawn'
usingmp.set_start_method('spawn')
at the start of your main Python program. - System commands. If you want to be able to terminate the system commands with
pool.terminate()
, you have to useos.exec*()
to launch them. However, this precludes getting any return value (not even an exit code) from the pool processes.
To run system commands with the ability to terminate them, check on their status, and retrieve their returned exit codes, you will have to create your own thread/process pool class, either by extending the existing multiprocessing.pool.Pool
class, or rolling your own altogether.
Appendix
multiprocessing.Pool
versus multiprocessing.pool.Pool
multiprocessing.Pool
is an alias of multiprocessing.pool.Pool
. Upon importing the multiprocessing package (running __init__.py
), the global namespace of the package is updated with names in _default_context
, which is an instance of the DefaultContext class that inherits the BaseContext class, which defines Pool
as a wrapper around the multiprocessing.pool.Pool
constructor.
Life of a pool process
Below is the hierarchy of some of the key functions and methods involved in creating a pool process. Each function / method is linked to the source code of the method being called, not where it was called in the parent stack frame.
pool._repopulate_pool()
w = pool.Process()
pool._ctx.Process()
- on Unix-based systems, the BaseProcess subclass that is instantiated ismp.context.ForkProcess
w.start()
_popen = self._Popen(self)
_popen._launch()
os.fork()
- If in the child process:
w._bootstrap()
w.run()
w._target()
- where_target
ismp.pool.worker()
inqueue.get()
- blocks until a task is receivedfunc(*args, *kwargs)
-func
is the function one desires to run in parallel, as passed topool.apply_async(func)
os._exit()
- If in the child process:
Obtaining process (group) IDs and sending signals
Python
- Current process ID:
os.getpid()
- Current process group ID:
os.getpgrp()
, oros.getpgid(0)
- Current process’s parent’s process ID:
os.getppid()
- Process group ID of a process with process ID
<PID>
:os.getpgid(<PID>)
- Sending a signal to a process:
os.kill(<PID>, <SIGNAL>)
- Sending a signal to a process group:
os.killpg(<PGID>, <SIGNAL>)
Shell
- Current process ID:
echo $$
- Process group ID, parent process ID, process ID:
ps -o pgid,ppid,pid,cmd <PID>
- Sending a signal to a process:
kill -s <SIGNAL> <PID>
- Sending a signal to a process group:
kill -s <SIGNAL> -- -<PGID>
multiprocessing.dummy
module
The multiprocessing.dummy
module “replicates the API of multiprocessing
but is no more than a wrapper around the threading
module.” [multiprocessing
documentation] Specifically, “dummy processes” inherit the threading.Thread
class. Even though a thread-based pool system (such as that implemented by multiprocessing.dummy
) is limited to a single processor by the Global Interpreter Lock, it can still be useful in the following contexts:
- Running multiple I/O-bound tasks simultaneously
- Lanching subprocesses (e.g., system commands)
The multiprocessing.dummy
module has some notable limitations, however.
- Starting dummy processes with a target of
os.exec*()
will fail with anOSError
exception:[Errno 12] Cannot allocate memory
. multiprocessing.pool.ThreadPool.terminate()
does not terminate workers.- Pools created using
multiprocessing.dummy.Pool()
are instances of themultiprocessing.pool.ThreadPool
class, which inherits themulitprocessing.pool.Pool
class and shares the sameterminate()
method, which calls_terminate()
, which calls_terminate_pool()
, which includes the linesif pool and hasattr(pool[0], 'terminate'): util.debug('terminating workers') for p in pool: if p.exitcode is None: p.terminate()
where
pool
is a list of process objects. Real processes have aterminate()
method, sohasattr(pool[0], 'terminate')
evaluates toTrue
, and theterminate()
method is called on each process object. However, threads and dummy processes do not have aterminate()
method or attribute, so they are not actually stopped.
- Pools created using
Topics not discussed (may be added later!)
- Using
os.spawn()
as a means of launching system commands. - Exactly how virtual and physical memory are managed across
fork()
system calls in Python.