Sunday, December 22, 2013

Signals and threads in python

The task: start a set of processes and wait till they terminate, if SIGTERM is received send same signal to all child processes and again wait till they terminate. (Ok, I'd go with sending signal to the process group, but however.)
The naive solution: use subprocess and threading modules, start thread per child process and communicate(), in main thread join() with others. Why naive? This doesn't work. Signal handler is not invoked. The documentation to signal module says:
Although Python signal handlers are called asynchronously as far as the Python user is concerned, they can only occur between the "atomic" instructions of the Python interpreter. This means that signals arriving during long calculations implemented purely in C (such as regular expression matches on large bodies of text) may be delayed for an arbitrary amount of time.
That is signal handler will be processed only after current "atomic" operation finishes. Unfortunately join() is one of such atomic operations. In other works signal handler will be invoked only after thread termination.
Another way is to use coroutines and select:
proc = subprocess.Popen(cmd,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT,
                        close_fds=True,
                        preexec_fn=preexec_fn)

fdesc = proc.stdout.fileno()
flags = fcntl.fcntl(fdesc, fcntl.F_GETFL)
fcntl.fcntl(fdesc, fcntl.F_SETFL, flags | os.O_NONBLOCK)

while True:
    try:
        dt = proc.stdout.read()
        if dt == '':
            break
        yield dt
    except IOError:
        # EWOULDBLOCK
        
        yield ''
        
        try:
            select.select([proc.stdout], [], [], 1)
        except select.error:
            # select.error: (4, 'Interrupted system call') - ignore it,
            # just call select again
            pass

while True:
    try:
        proc.wait()
        break
    except OSError:
        # OSError: [Errno 4] Interrupted system call
        continue
And the supervisor (left - an array of generators from coroutines)
while left:
    new_left = []
    
    for execute in left:
        try:
            execute.next()
            new_left.append(execute)
        except StopIteration:
            pass
        except Exception, e:
            err = e

    left = new_left
Also note that EINTR in general is not processed by python standard library, it is just forwarded up the stack as C does. In most cases if you are lucky it is enough to call interrupted routine again as in C (but C guarantees that this works, python doesn't).

No comments:

Post a Comment