11.07.2017       Выпуск 186 (10.07.2017 - 16.07.2017)       Статьи

Взимодействуем с долгоживущим дочерним процессом в Python

Читать>>



Экспериментальная функция:

Ниже вы видите текст статьи по ссылке. По нему можно быстро понять ссылка достойна прочтения или нет

Просим обратить внимание, что текст по ссылке и здесь может не совпадать.

The Pythonsubprocessmodule is a powerful swiss-army knife for launching and interacting with child processes. It comes with several high-level APIs likecall,check_outputand (starting with Python 3.5)runthat are focused at child processes our program runs and waits to complete.

In this post I want to discuss a variation of this task that is less directly addressed - long-running child processes. Think about testing some server - for example an HTTP server. We launch it as a child process, then connect clients to it and run some testing sequence. When we're done we want to shut down the child process in an orderly way. This would be difficult to achieve with APIs that just run a child process to completion synchronously, so we'll have to look at some of the lower-level APIs.

Sure, we could launch a child process withsubprocess.runin one thread and interact with it (via a known port, for example) in another thread. But this would make it tricky to cleanly terminate the child process when we're done with it. If the child process has an orderly termination sequence (such as sending some sort of "quit" command), this is doable. But most servers do not, and will just spin forever until killed. This is the use-case this post addresses.

Launch, interact, terminate and get all output when done

The first, simplest use case will be launching an HTTP server, interacting with it, terminating it cleanly and getting all the server's stdout and stderr when done. Here are the important bits of the code (all full code samples for this post are available here), tested with Python 3.6:

def main():
    proc = subprocess.Popen(['python3', '-u', '-m', 'http.server', '8070'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    try:
        time.sleep(0.2)
        resp = urllib.request.urlopen('http://localhost:8070')
        assert b'Directory listing' in resp.read()
    finally:
        proc.terminate()
        try:
            outs, _ = proc.communicate(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
            print(outs.decode('utf-8'))
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')

The child process is an HTTP server using Python's ownhttp.servermodule, serving contents from the directory it was launched in. We use the low-levelPopenAPI to launch the process asynchronously (meaning thatPopenreturns immediately and the child process runs in the background).

Note the-upassed to Python on invocation: this is critical to avoid stdout buffering and seeing as much of stdout as possible when the process is killed. Buffering is a serious issue when interacting with child processes, and we'll see more examples of this later on.

The meat of the sample happens in thefinallyblock.proc.terminate()sends the child process aSIGTERMsignal. Then,proc.communicatewaits for the child to exit and captures all of its stdout.communicatehas a very convenienttimeoutargument starting with Python 3.3 [1], letting us know if the child does not exit for some reason. A more sophisticated technique could be to send the child aSIGKILL(withproc.kill) if it didn't exit due toSIGTERM.

If you run this script, you'll see the output:

$ python3.6 interact-http-server.py
== subprocess exited with rc = -15
Serving HTTP on 0.0.0.0 port 8070 (http://0.0.0.0:8070/) ...
127.0.0.1 - - [05/Jul/2017 05:48:34] "GET / HTTP/1.1" 200 -

The return code of the child is -15 (negative means terminated by a signal, 15 is the numeric code forSIGTERM). The stdout was properly captured and printed out.

Launch, interact, get output in real time, terminate

A related use case is getting the stdout of a child process in "real-time" and not everything together at the end. Here we have to be really careful about buffering, because it can easily bite and deadlock the program. Linux processes are usually line-buffered in interactive mode and fully buffered otherwise. Very few processes are fully unbuffered. Therefore, reading stdout in chunks of less than a line is not recommended, in my opinion. Really, just don't do it. Standard I/O is meant to be used in a line-wise way (think of how all the Unix command-line tools work); if you need sub-line granularity, stdout is not the way to go (use a socket or something).

Anyway, to our example:

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python3', '-u', '-m', 'http.server', '8070'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        for i in range(4):
            resp = urllib.request.urlopen('http://localhost:8070')
            assert b'Directory listing' in resp.read()
            time.sleep(0.1)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

The sample is similar except for how stdout is handled; there's no more calls tocommunicate; instead,proc.waitjust waits for the child to exit (afterSIGTERMhas been sent). A thread polls the child'sstdoutattribute, looping as long as new lines are available and printing them immediately. If you run this sample, you'll notice that the child's stdout is reported in real-time, rather than as one lump at the end.

Theiter(proc.stdout.readline, b'')snippet is continously callingproc.stdout.readline(), until this call returns an empty bytestring. This only happens whenproc.stdoutis closed, which occurs when the child exits. Thus, while it may seem like the reader thread might never terminate - it always will! As long as the child process is running, the thread will dutifully block on thatreadline; as soon as the child terminates, thereadlinecall returnsb''and the thread exits.

If we don't want to justprintthe captured stdout, but rather do something with it (such as look for expected patterns), this is easy to organize with Python's thread-safe queue. The reader thread becomes:

def output_reader(proc, outq):
    for line in iter(proc.stdout.readline, b''):
        outq.put(line.decode('utf-8'))

And we launch it with:

outq = queue.Queue()
t = threading.Thread(target=output_reader, args=(proc, outq))
t.start()

Then at any point we can check if there's stuff in the queue by using its non-blocking mode (the full code sample is here):

try:
    line = outq.get(block=False)
    print('got line from outq: {0}'.format(line), end='')
except queue.Empty:
    print('could not get line from queue')

Direct interaction with the child's stdin and stdout

This sample is getting into dangerous waters; thesubprocessmodule documentation warns against doing the things described here due to possible deadlocks, but sometimes there's simply no choice! Some programs like using their stdin and stdout for interaction. Alternatively, you may have a program with an interactive (interpreter) mode you'd like to test - like the Python interepreter itself. Sometimes it's OK to feed this program all its input at once and then check its output; this can, and should be done withcommunicate- the perfect API for this purpose. It properly feeds stdin, closes it when done (which signals many interactive programs that game's over), etc. But what if we really want to provide additional input based on some previous output of the child process. Here goes:

def main():
    proc = subprocess.Popen(['python3', '-i'],
                            stdin=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)

    # To avoid deadlocks: careful to: add \n to output, flush output, use
    # readline() rather than read()
    proc.stdin.write(b'2+2\n')
    proc.stdin.flush()
    print(proc.stdout.readline())

    proc.stdin.write(b'len("foobar")\n')
    proc.stdin.flush()
    print(proc.stdout.readline())

    proc.stdin.close()
    proc.terminate()
    proc.wait(timeout=0.2)

Let me reiterate what the comment in this code sample is saying:

  • When sending input to a line interpreter, don't forget to send the actual newline.
  • Always flush the stream after placing data into it, since it may be buffered.
  • Usereadlineto get input from the line interpreter.

We have to be very careful to avoid the following situation:

  1. We send data to the child's stdin, but it doesn't get the complete input for some reason (lack of newline, buffering etc.)
  2. We then invokereadlineto wait for the reply.

Since the child is still waiting for input to complete (step 1), our step 2 may hang forever. This is a classic deadlock.

In the end of the interaction, we close the child'sstdin(this is optional but useful for some kinds of child processes) callterminateand thenwait. It would be better to send the child process some sort of "exit" command (quit()in the case of the Python interpreter); theterminatehere is to demonstrate what we have to do if the other options are unavailable. Note that we could also usecommunicatehere instead ofwaitto capture the stderr output.

Interact using non-blocking reads and stoppable threads

The final sample demonstrates a slighly more advanced scenario. Suppose we're testing a long-lived socket server, and we're interested in orchestrating complex interactions with it, perhaps with multiple concurrent clients. We'll also want a clean shut-down of the whole setup of threads and child processes. The full code sample is here; what follows is a couple of representative snippets. The key ingredient is this socket reading funtion, meant to be run in its own thread:

def socket_reader(sockobj, outq, exit_event):
    while not exit_event.is_set():
        try:
            buf = sockobj.recv(1)
            if len(buf) < 1:
                break
            outq.put(buf)
        except socket.timeout:
            continue
        except OSError as e:
            break

Best used with a socket that has a timeout set on it, this function will repeatedly monitor the socket for new data and push everything it receives [2] intooutq, which is aqueue.Queue. The function exits when either the socket is closed (recvreturns an empty bytestring), or whenexit_event(athreading.Event) is set by the caller.

The caller can launch this function in a thread and occasionally try to read new items from the queue in a non-blocking way:

try:
    v = outq.get(block=False)
    print(v)
except queue.Empty:
    break

When all is done, the caller can set the exitEventto stop the thread (the thread will stop on its own if the socket it's reading from is closed, but the event lets us control this more directly).

Final words

There's no single fits-all solution for the task described in this post; I presented a bunch of recipes to handle the more commonly occurring situations, but it may be the case that specific use cases may not be addressed by them. Please let me know if you run into an interesting use case these recipes helped (or did not help!) resolve. Any other feedback is also welcome, as usual.




Лучшая Python рассылка

Нас поддерживает


Python Software Foundation



Разместим вашу рекламу

Пиши: mail@pythondigest.ru

Нашли опечатку?

Выделите фрагмент и отправьте нажатием Ctrl+Enter.

Система Orphus