Watching The Subprocess

If you try working with our current LineMonitor implementation you will find it has some disadvantages.

  1. There is no way to stop monitoring.

  2. In particular, if the underlying subprocess crashes, the monitor will just block forever - it is blocked trying to .readline() - but the line will never come.

Furthermore, we originally wanted the ability to have more than one callback.

Let’s improve our LineMonitor, starting by handling the underlying subprocess a little more carefully. We’ll start by checking for available data before we try to read it.

Polling the Read File Descriptor

We want to create a polling object, and register the reader’s file descriptor using its .register method.

Let’s test for it. We have to mock the select module, of course, and also change our launch_scenario().

 1@pytest.fixture
 2def override_imports(patch_module):
 3    patch_module(line_monitor, 'subprocess')
 4    patch_module(line_monitor, 'pty')
 5    patch_module(line_monitor, 'open')
 6    patch_module(line_monitor, 'select')
 7    Fake('select').POLLIN = select.POLLIN
 8
 9def launch_scenario(s):
10    s.pty.openpty() >> ('write_to_fd', 'read_from_fd')
11    s.open('read_from_fd', encoding='latin-1') >> Fake('reader')
12    s.select.poll() >> Fake('poller')
13    s.reader.fileno() >> 'reader_descriptor'
14    s.poller.register('reader_descriptor', select.POLLIN)
15    s.subprocess.Popen(['my', 'command', 'line'], stdout='write_to_fd', close_fds=True) >> Fake('the_process')

There is a quick here - after running patch_module(line_monitor, 'select'), the select object inside the tested line_monitor module is replace by a Fake('select') fake object. Later, we want to demand that poller.register() be called with the select.POLLIN constant. As things are, this would technically also be the fake object Fake('select.POLLIN'), since Testix automatically generates fake objects whenever you lookup a Fake’s attribute (unless it’s explicitly set up).

While it is possible to demand

s.poller.register('reader_descriptor', Fake('select').POLLIN)

And it will work just fine, I find it less readable. Therefore I’d rather “rescue” the POLLIN object from the real select and assign it to the fake select.

You may notice another quirk - the function .fileno() returns a file descriptor, which is an integer. However, in our test we make it return a string value, 'reader_descriptor', and later test that this value is transmitted to the .register() call on the polling object.

Of course it is possible to write something like

FAKE_FILE_DESCRIPTOR = 12121212
s.reader.fileno() >> FAKE_FILE_DESCRIPTOR
s.poller.register(FAKE_FILE_DESCRIPTOR, select.POLLIN)

This is totally legitimate. However, In my opinion, when testing the logic of “this object from here should get there”, using strings (which are immutable in Python) may be more readable than using the correct data type.

Changing launch_scenario has changed our tests, let’s run them, see if they fail:

$ python -m pytest -sv docs/line_monitor/tests/unit/11/test_line_monitor.py

...

E       Failed:
E       testix: ExpectationException
E       testix details:
E       === Scenario (no title) ===
E        expected: select.poll()
E        actual  : subprocess.Popen(['my', 'command', 'line'], stdout = 'write_to_fd', close_fds = True)

Yay :) we have RED. Our tests expect the new .poll() logic, but our code, of course, is still not up to date. Of course, all of our tests now fail, since they all depend on launch_scenario() being followed exactly.

Let’s get to GREEN with this and then continue with testing the actual polling:

 1import subprocess
 2import pty
 3import select
 4
 5class LineMonitor:
 6    def __init__(self):
 7        self._callback = None
 8
 9    def register_callback(self, callback):
10        self._callback = callback
11
12    def launch_subprocess(self, *popen_args, **popen_kwargs):
13        write_to, read_from = pty.openpty()
14        popen_kwargs['stdout'] = write_to
15        popen_kwargs['close_fds'] = True
16        self._reader = open(read_from, encoding='latin-1')
17        self._poller = select.poll()
18        self._poller.register(self._reader.fileno(), select.POLLIN)
19        subprocess.Popen(*popen_args, **popen_kwargs)
20
21    def monitor(self):
22        while True:
23            line = self._reader.readline()
24            if self._callback is None:
25                continue
26            self._callback(line)

Now our tests pass once again. We have GREEN, but we haven’t really added the actual feature we want to develop. We want the monitor to stop monitoring once the underlying subprocess is dead, and not get blocked trying to read a line that will never come.

This will involve using the poll object to poll the read descriptor to see that there’s some data to read before calling .readline(). Since our tests already involve various scenarios calling .readline() - doing this TDD doesn’t mean writing new tests - it means modifying the tests that we have.

This happens sometimes in TDD, and it’s perfectly normal. Now, let’s get to RED.

Looking at an excerpt from our tests:

with Scenario() as s:
    launch_scenario(s)
    tested.launch_subprocess(['my', 'command', 'line'])

    s.reader.readline() >> 'line 1'
    s.my_callback('line 1')
    s.reader.readline() >> 'line 2'
    s.my_callback('line 2')
    s.reader.readline() >> 'line 3'
    s.my_callback('line 3')

We want to demand that every .readline() is preceded by a .poll(), and to only be performed if there’s input available. The .poll() call returns a list of [(file_descriptor, events), ...] pairs, where events is a bitmask of flags indicating the state of the file descriptor (e.g. POLLIN | POLLOUT).

Still, the sequence of .poll() and .readline() is sort-of “the new readline”, it makes up a logical scenario, so let’s write it as a scenario function, read_line_scenario.

Here is our test_receive_output_lines_via_callback, adapted to the new situation.

 1
 2def read_line_scenario(s, line):
 3    s.poller.poll() >> [('reader_descriptor', select.POLLIN)]
 4    s.reader.readline() >> line
 5
 6def end_test_scenario(s):
 7    s.poller.poll() >> Throwing(loop_breaker.LoopBreaker)
 8
 9def test_receive_output_lines_via_callback(override_imports):
10    tested = line_monitor.LineMonitor()
11    with Scenario() as s:
12        launch_scenario(s)
13        tested.launch_subprocess(['my', 'command', 'line'])
14
15        read_line_scenario(s, 'line 1')
16        s.my_callback('line 1')
17        read_line_scenario(s, 'line 2')
18        s.my_callback('line 2')
19        read_line_scenario(s, 'line 3')
20        s.my_callback('line 3')
21        end_test_scenario(s)
22
23        tested.register_callback(Fake('my_callback'))
24        with pytest.raises(loop_breaker.LoopBreaker):
25            tested.monitor()
running this, we get |RED|

E       testix: ExpectationException
E       testix details:
E       === Scenario (no title) ===
E        expected: poller.poll()
E        actual  : reader.readline()

Very good. Now let’s fix our code to pass the tests. Note that we did not yet add a test for the case where the file descriptor does not have any data to read - that come later. Always proceed in small, baby steps - and you’ll be fine. Try to do it all at once, and you’ll crash and burn.

Getting to GREEN is super easy, we add just this one line of code:

1    def monitor(self):
2        while True:
3            self._poller.poll()
4            line = self._reader.readline()
5            if self._callback is None:
6                continue
7            self._callback(line)

Well, this is GREEN, but adds little value. It’s time for a serious test that makes sure that .readline() is called if and only if POLLIN is present. Let’s get to RED.

We introduce a skip_line_scenario(), and introduce it into our existing tests, such that they represent the situation when sometimes there is no data to read.

 1from testix import *
 2import pytest
 3import line_monitor
 4import select
 5
 6@pytest.fixture
 7def override_imports(patch_module):
 8    patch_module(line_monitor, 'subprocess')
 9    patch_module(line_monitor, 'pty')
10    patch_module(line_monitor, 'open')
11    patch_module(line_monitor, 'select')
12    Fake('select').POLLIN = select.POLLIN
13
14def launch_scenario(s):
15    s.pty.openpty() >> ('write_to_fd', 'read_from_fd')
16    s.open('read_from_fd', encoding='latin-1') >> Fake('reader')
17    s.select.poll() >> Fake('poller')
18    s.reader.fileno() >> 'reader_descriptor'
19    s.poller.register('reader_descriptor', select.POLLIN)
20    s.subprocess.Popen(['my', 'command', 'line'], stdout='write_to_fd', close_fds=True) >> Fake('the_process')
21
22def test_lauch_subprocess_with_pseudoterminal(override_imports):
23    tested = line_monitor.LineMonitor()
24    with Scenario() as s:
25        launch_scenario(s)
26        tested.launch_subprocess(['my', 'command', 'line'])
27
28def read_line_scenario(s, line):
29    s.poller.poll() >> [('reader_descriptor', select.POLLIN)]
30    s.reader.readline() >> line
31
32def skip_line_scenario(s):
33    s.poller.poll() >> [('reader_descriptor', 0)]
34
35def end_test_scenario(s):
36    s.poller.poll() >> Throwing(loop_breaker.LoopBreaker)
37
38def test_receive_output_lines_via_callback(override_imports):
39    tested = line_monitor.LineMonitor()
40    with Scenario() as s:
41        launch_scenario(s)
42        tested.launch_subprocess(['my', 'command', 'line'])
43
44        read_line_scenario(s, 'line 1')
45        s.my_callback('line 1')
46        read_line_scenario(s, 'line 2')
47        s.my_callback('line 2')
48        read_line_scenario(s, 'line 3')
49        s.my_callback('line 3')
50        skip_line_scenario(s)
51        skip_line_scenario(s)
52        read_line_scenario(s, 'line 4')
53        s.my_callback('line 4')
54        end_test_scenario(s)
55
56        tested.register_callback(Fake('my_callback'))
57        with pytest.raises(loop_breaker.LoopBreaker):
58            tested.monitor()
59
60def test_monitoring_with_no_callback(override_imports):
61    tested = line_monitor.LineMonitor()
62    with Scenario() as s:
63        launch_scenario(s)
64        tested.launch_subprocess(['my', 'command', 'line'])
65
66        read_line_scenario(s, 'line 1')
67        read_line_scenario(s, 'line 2')
68        skip_line_scenario(s)
69        read_line_scenario(s, 'line 3')
70        skip_line_scenario(s)
71        end_test_scenario(s)
72
73        with pytest.raises(loop_breaker.LoopBreaker):
74            tested.monitor()
75
76def test_callback_registered_mid_monitoring(override_imports):
77    tested = line_monitor.LineMonitor()
78    with Scenario() as s:
79        launch_scenario(s)
80        tested.launch_subprocess(['my', 'command', 'line'])
81
82        read_line_scenario(s, 'line 1')
83        skip_line_scenario(s)
84        read_line_scenario(s, 'line 2')
85        read_line_scenario(s, 'line 3')
86        s << Hook(tested.register_callback, Fake('my_callback')) # the hook will execute right after the 'line 3' readline finishes
87        s.my_callback('line 3') # callback is now registered, so it should be called
88        end_test_scenario(s)
89
90        with pytest.raises(loop_breaker.LoopBreaker):
91            tested.monitor()

The idea here is simple - sometimes .poll() returns a result where the POLLIN flag is not set - and then we should skip the .readline().

Do we have RED? Yes we do:

E       Failed:
E       testix: ExpectationException
E       testix details:
E       === Scenario (no title) ===
E        expected: poller.poll()
E        actual  : reader.readline()

Let’s get to GREEN. This requires us to add the following to our code:

 1    def monitor(self):
 2        while True:
 3            poll_results = self._poller.poll()
 4            _, events = poll_results[0]
 5            if not (events & select.POLLIN):
 6                continue
 7            line = self._reader.readline()
 8            if self._callback is None:
 9                continue
10            self._callback(line)

This is GREEN but not the best code, the .monitor() function is becoming too long, time for the REFACTOR step in our RED-GREEN-REFACTOR loop.

 1    def monitor(self):
 2        while True:
 3            if not self._data_available_to_read():
 4                continue
 5            line = self._reader.readline()
 6            if self._callback is None:
 7                continue
 8            self._callback(line)
 9
10    def _data_available_to_read(self):
11        poll_results = self._poller.poll()
12        _, events = poll_results[0]
13        return events & select.POLLIN

Ah, much nicer.

Solving the Blocking Problem

We are now in a position not to block forever when data does not arrive. To do that, we need to add a timeout on the .poll call - since as it is now, it may still block forever waiting for some event on the file.

Getting to RED is simple in principle, e.g. if we want a 10 seconds timeout, just change demands of our various scenarios, e.g.

def read_line_scenario(s, line):
    s.poller.poll(10) >> [('reader_descriptor', select.POLLIN)]
    # note the 10 second timeout above
    s.reader.readline() >> line

# similarly for other poll scenario functions

If we do this, however - and later on discover that a 60 second timeout is more reasonable, we will have to Test Drive the change from 10 to 60. This seems more annoying that it is helpful. Sometimes, tests can be too specific.

Testix has a way to specifically ignore the values of specific arguments - you specify the special value IgnoreArgument() instead of the overly specific 10.

Here’s how to use it in this case:

1def read_line_scenario(s, line):
2    s.poller.poll(IgnoreArgument()) >> [('reader_descriptor', select.POLLIN)]
3    s.reader.readline() >> line
4
5def skip_line_scenario(s):
6    s.poller.poll(IgnoreArgument()) >> [('reader_descriptor', 0)]
7
8def end_test_scenario(s):
9    s.poller.poll(IgnoreArgument()) >> Throwing(loop_breaker.LoopBreaker)

Using this we get to RED

E       testix: ExpectationException
E       testix details:
E       === Scenario (no title) ===
E        expected: poller.poll(|IGNORED|)
E        actual  : poller.poll()

Note the |IGNORED| annotation. Getting to green is now a matter of adding this timeout in our code:

1    def _data_available_to_read(self):
2        poll_results = self._poller.poll(10)
3        _, events = poll_results[0]
4        return events & select.POLLIN

And we have GREEN again.

Oops, a bug

If you try this code, you will find that there’s a bug: in real life .poll() may return an empty list.

When we find a bug, the TDD way is of course to write a test that reproduces it, and then fix the code. In our case, let’s add a poll_returns_empty_scenario and sprinkle it in our existing tests, thus covering the behaviour with and without a callback, etc.

 1def skip_line_on_empty_poll_scenario(s):
 2    s.poller.poll(IgnoreArgument()) >> []
 3def test_receive_output_lines_via_callback(override_imports):
 4    tested = line_monitor.LineMonitor()
 5    with Scenario() as s:
 6        launch_scenario(s)
 7        tested.launch_subprocess(['my', 'command', 'line'])
 8
 9        read_line_scenario(s, 'line 1')
10        s.my_callback('line 1')
11        read_line_scenario(s, 'line 2')
12        s.my_callback('line 2')
13        read_line_scenario(s, 'line 3')
14        s.my_callback('line 3')
15        skip_line_scenario(s)
16        skip_line_scenario(s)
17        skip_line_on_empty_poll_scenario(s)
18        read_line_scenario(s, 'line 4')
19        s.my_callback('line 4')
20        end_test_scenario(s)
21
22        tested.register_callback(Fake('my_callback'))
23        with pytest.raises(loop_breaker.LoopBreaker):
24            tested.monitor()

This gets us into RED territory

E       IndexError: list index out of range

Now let’s fix the bug.

1    def _data_available_to_read(self):
2        poll_results = self._poller.poll(10)
3        if len(poll_results) == 0:
4            return False
5        _, events = poll_results[0]
6        return events & select.POLLIN

We are now GREEN, and, since we are working TDD, we have a test for this bug - and it will not return in the future.

Has the Subprocess Died?

We are now ready to add functionality to stop the monitor in case the subprocess itself has died. We will want our code to use .poll() on the Popen object itself, and if .poll() returns a non-None value, stop the monitor.

If you think about it, we can poll the subprocess only when there’s no data available. It may be that there is data to read and process has died, but in that case, we’ll just discover it is dead when the data runs out. This way we make sure we read all the data out of the pipe when the process has died, even if it takes more than one read.

If, however, there’s no data to read, and the process is dead - then there’s no point in continuing to monitor the pipe for more data, and we should close the reader, e.g.

 1def process_lives_scenario(s):
 2    s.the_process.poll() >> None
 3
 4def test_receive_output_lines_via_callback(override_imports):
 5    tested = line_monitor.LineMonitor()
 6    with Scenario() as s:
 7        launch_scenario(s)
 8        tested.launch_subprocess(['my', 'command', 'line'])
 9
10        read_line_scenario(s, 'line 1')
11        s.my_callback('line 1')
12        read_line_scenario(s, 'line 2')
13        s.my_callback('line 2')
14        read_line_scenario(s, 'line 3')
15        s.my_callback('line 3')
16        skip_line_scenario(s)
17        process_lives_scenario(s)
18        skip_line_scenario(s)
19        process_lives_scenario(s)
20        skip_line_on_empty_poll_scenario(s)
21        process_lives_scenario(s)
22        read_line_scenario(s, 'line 4')
23        s.my_callback('line 4')
24        end_test_scenario(s)
25
26        tested.register_callback(Fake('my_callback'))
27        with pytest.raises(loop_breaker.LoopBreaker):
28            tested.monitor()

Note that we demand process polling only after no data was ready to read, here it only comes after some skip_line*scenario function.

This brings us into RED territory. Current we have not taken the case that the process dies into account, but as usual, we’re taking things slowly. Let’s get to GREEN.

 1    def launch_subprocess(self, *popen_args, **popen_kwargs):
 2        write_to, read_from = pty.openpty()
 3        popen_kwargs['stdout'] = write_to
 4        popen_kwargs['close_fds'] = True
 5        self._reader = open(read_from, encoding='latin-1')
 6        self._poller = select.poll()
 7        self._poller.register(self._reader.fileno(), select.POLLIN)
 8        self._process = subprocess.Popen(*popen_args, **popen_kwargs)
 9
10    def monitor(self):
11        while True:
12            if not self._data_available_to_read():
13                self._process.poll()
14                continue
15            line = self._reader.readline()
16            if self._callback is None:
17                continue
18            self._callback(line)

Note that this is the first time we bothered to save the subprocess Popen object! This is another example of how TDD helps us. If the test passes without us making some move - then we simply don’t make it. This helps us write minimalistic code. Remember, code that doesn’t exist - has no bugs.

We are now in GREEN - so let’s get into RED again, and add a specific test for the “process has died scenario”:

 1def process_died_scenario(s):
 2    s.the_process.poll() >> 'some_exit_code'
 3    s.reader.close()
 4
 5def test_receive_output_lines_via_callback__process_ends__orderly_close(override_imports):
 6    tested = line_monitor.LineMonitor()
 7    with Scenario() as s:
 8        launch_scenario(s)
 9        tested.launch_subprocess(['my', 'command', 'line'])
10
11        read_line_scenario(s, 'line 1')
12        s.my_callback('line 1')
13        read_line_scenario(s, 'line 2')
14        s.my_callback('line 2')
15        read_line_scenario(s, 'line 3')
16        s.my_callback('line 3')
17        skip_line_scenario(s)
18        process_lives_scenario(s)
19        skip_line_scenario(s)
20        process_died_scenario(s)
21
22        tested.register_callback(Fake('my_callback'))
23        tested.monitor()

Note that we no longer need the TestixLoopBreaker trick - since we now expect the .monitor() function to simply finish and break out of its infinite loop.

Are we in RED? Yes we are:

E       testix: ExpectationException
E       testix details:
E       === Scenario (no title) ===
E        expected: reader.close()
E        actual  : poller.poll(10)

The test wants the infinite loop to finish and close the reader, but the code just goes on.

Let’s fix our code:

 1    def monitor(self):
 2        while True:
 3            if not self._data_available_to_read():
 4                exit_code = self._process.poll()
 5                if exit_code is not None:
 6                    self._reader.close()
 7                    break
 8                continue
 9            line = self._reader.readline()
10            if self._callback is None:
11                continue
12            self._callback(line)

We’re GREEN, but this function has grown too long again, so let’s REFACTOR.

 1    def monitor(self):
 2        while True:
 3            if not self._data_available_to_read():
 4                if self._alive():
 5                    continue
 6                self._cleanup()
 7                return
 8            line = self._reader.readline()
 9            if self._callback is None:
10                continue
11            self._callback(line)
12
13    def _alive(self):
14        exit_code = self._process.poll()
15        return exit_code is None
16
17    def _cleanup(self):
18        self._reader.close()
19

Not shorter, but more semantically clear, at least it my opinion. Since we have tests, we can refactor without fear - since we can always make sure we are still in the GREEN!