Monitoring The Output

Next we want to test the following behaviour: we register a callback with our LineMonitor object using its .register_callback() method, and it calls our callback with each line of output it reads from the pseudo-terminal.

Python streams have a useful .readline() method, so let’s wrap the read file-descriptor of the pseudo-terminal with a stream. It turns out that you can wrap a file descriptor with a simple call to the built-on open() function, so we’ll use that.

Note that we add a new test, leaving the previous one intact. This means that we keep everything we already have working, while we add a test for this new behaviour.

Let’s start by describing a scenario where we read several lines from the pseudo-terminal and demand that they are transferred to our callback.

 1
 2def test_receive_output_lines_via_callback(override_imports):
 3    tested = line_monitor.LineMonitor()
 4    with Scenario() as s:
 5        s.pty.openpty() >> ('write_to_fd', 'read_from_fd')
 6        s.open('read_from_fd', encoding='latin-1') >> Fake('reader') # wrapping a binary file descriptor with a text-oriented stream requires an encoding
 7        s.subprocess.Popen(['my', 'command', 'line'], stdout='write_to_fd', close_fds=True)
 8
 9        tested.launch_subprocess(['my', 'command', 'line'])
10
11        s.reader.readline() >> 'line 1'
12        s.my_callback('line 1')
13        s.reader.readline() >> 'line 2'
14        s.my_callback('line 2')
15        s.reader.readline() >> 'line 3'

What’s going on here?

  1. We add a demand that our code create a Python stream from the pseudo-terminal’s read-descriptor before launching the subprocess.

  2. We then call .launch_subprocess() to meet those demands.

  3. We describe the “read-from-pseudo-terminal-forwared-to-callback” data flow for 3 consecutive lines.

  4. We register a Fake('my_callback') object as our callback - this way, when the code calls the callback, it will be meeting our demands in this test. It’s important that 'my_callback' is used as this Fake’s name, since we refer to it in the Scenario.

  5. We then call the .monitor() method - this method should do all the reading and forwarding.

We must also remember to mock the built-in open:

1@pytest.fixture
2def override_imports(patch_module):
3    patch_module(line_monitor, 'subprocess') # this replaces the subprocess object inside line_monitor with a Fake("subprocess") object
4    patch_module(line_monitor, 'pty') # does the same for the pty module
5    patch_module(line_monitor, 'open')

We can already see a problem: the scenario is actually built out of two parts - the part which tests .launch_subprocess(), and the part which tests .monitor().

Furthermore, since we have our previous test in test_lauch_subprocess_with_pseudoterminal, which doesn’t expect the call to open(), the two tests are in contradiction.

The way to handle this is to refactor our test a bit:

 1from testix import *
 2import pytest
 3import line_monitor
 4
 5@pytest.fixture
 6def override_imports(patch_module):
 7    patch_module(line_monitor, 'subprocess')
 8    patch_module(line_monitor, 'pty')
 9    patch_module(line_monitor, 'open')
10
11def launch_scenario(s):
12    s.pty.openpty() >> ('write_to_fd', 'read_from_fd')
13    s.open('read_from_fd', encoding='latin-1') >> Fake('reader')
14    s.subprocess.Popen(['my', 'command', 'line'], stdout='write_to_fd', close_fds=True)
15
16def test_lauch_subprocess_with_pseudoterminal(override_imports):
17    tested = line_monitor.LineMonitor()
18    with Scenario() as s:
19        launch_scenario(s)
20        tested.launch_subprocess(['my', 'command', 'line'])
21
22def test_receive_output_lines_via_callback(override_imports):
23    tested = line_monitor.LineMonitor()
24    with Scenario() as s:
25        launch_scenario(s)
26        tested.launch_subprocess(['my', 'command', 'line'])
27
28        s.reader.readline() >> 'line 1'
29        s.my_callback('line 1')
30        s.reader.readline() >> 'line 2'
31        s.my_callback('line 2')
32        s.reader.readline() >> 'line 3'
33        s.my_callback('line 3')
34
35        tested.register_callback(Fake('my_callback'))
36        tested.monitor()

By convention, helper functions that help us modify scenarios end with _scenario.

OK this seems reasonable, let’s get some RED! Running this both our tests fail:

E       Failed:
E       testix: ExpectationException
E       testix details:
E       === Scenario (no title) ===
E        expected: open('read_from_fd', encoding = 'latin-1')
E        actual  : subprocess.Popen(['my', 'command', 'line'], stdout = 'write_to_fd', close_fds = True)

We changed our expectations from .launch_subprocess() to call open(), but we did not change the implementation yet, so Testix is surprised to find that we actually call subprocess.Popen - and makes our test fail.

Good, let’s fix it and get to GREEN. We introduce the following to our code:

 1import subprocess
 2import pty
 3
 4class LineMonitor:
 5    def __init__(self):
 6        self._callback = None
 7
 8    def register_callback(self, callback):
 9        self._callback = callback
10
11    def launch_subprocess(self, *popen_args, **popen_kwargs):
12        write_to, read_from = pty.openpty()
13        popen_kwargs['stdout'] = write_to
14        popen_kwargs['close_fds'] = True
15        self._reader = open(read_from, encoding='latin-1')
16        subprocess.Popen(*popen_args, **popen_kwargs)
17
18    def monitor(self):
19        for _ in range(3):
20            line = self._reader.readline()
21            self._callback(line)

This passes the test, but that’s not really what we meant - right? Obviously we would like a while True to replace the for _ in range(3) here.

However, if we write a while True, then Testix will fail us for the 4th call to .readline(), since it only expects 3 calls.

Testing infinite, while True loops is a problem, but we can get around it by injecting an exception that will terminate the loop. Just as we can determine what calls to Fake objects return, we can make them raise exceptions.

Testix even comes with an exception class just for this use case, LoopBreaker. Let’s introduce another .readline() expectation into our test, using Testix’s Throwing construct:

 1def test_receive_output_lines_via_callback(override_imports):
 2    tested = line_monitor.LineMonitor()
 3    with Scenario() as s:
 4        launch_scenario(s)
 5        tested.launch_subprocess(['my', 'command', 'line'])
 6
 7        s.reader.readline() >> 'line 1'
 8        s.my_callback('line 1')
 9        s.reader.readline() >> 'line 2'
10        s.my_callback('line 2')
11        s.reader.readline() >> 'line 3'
12        s.my_callback('line 3')
13        s.reader.readline() >> Throwing(loop_breaker.LoopBreaker) # this tells the Fake('reader') to raise an instance of TestixLoopBreaker()
14
15        tested.register_callback(Fake('my_callback'))
16        with pytest.raises(loop_breaker.LoopBreaker):
17            tested.monitor()

NOTE - we once more change the test first. Also note that we can use Throwing to raise any type of exception we want, not just LoopBreaker.

This gets us back into the RED.

E           Failed: DID NOT RAISE <class 'testix.loop_breaker.LoopBreaker'>

Since our code calls .readline() 3 times exactly, the fourth call, which would have resulted in LoopBreaker being raised, did not happen.

Let’s fix our code:

1    def monitor(self):
2        while True:
3            line = self._reader.readline()
4            self._callback(line)

And we’re back in GREEN.

Edge Case Test: When There is no Callback

What happens if .monitor() is called, but no callback has been registered? We can of course implement all kinds of behaviour, for example, we can make it “illegal”, and raise an Exception from .monitor() in such a case.

However, let’s do something else. Let’s just define things such that output collected from the subprocess when no callback has been registered is discarded.

 1def test_monitoring_with_no_callback(override_imports):
 2    tested = line_monitor.LineMonitor()
 3    with Scenario() as s:
 4        launch_scenario(s)
 5        tested.launch_subprocess(['my', 'command', 'line'])
 6
 7        s.reader.readline() >> 'line 1'
 8        s.reader.readline() >> 'line 2'
 9        s.reader.readline() >> 'line 3'
10        s.reader.readline() >> Throwing(loop_breaker.LoopBreaker) # this tells the Fake('reader') to raise an instance of TestixLoopBreaker()
11
12        with pytest.raises(loop_breaker.LoopBreaker):
13            tested.monitor()

Notice there’s no .register_callback() here. We demand that .readline() be called, but we don’t demand anything else.

Running this fails with a RED

    def monitor(self):
        while True:
            line = self._reader.readline()
>           self._callback(line)
E           TypeError: 'NoneType' object is not callable

Which reveals that we in fact, did not handle this edge case very well.

Let’s add code that fixes this.

1    def monitor(self):
2        while True:
3            line = self._reader.readline()
4            if self._callback is None:
5                continue
6            self._callback(line)

Our test passes - back to GREEN.

Edge Case Test: Asynchronous Callback Registration

What happens if we start monitoring without a callback, wait a while, and only then register a callback?

This allows a use case where we call the .monitor() (which blocks) in one thread, and register a callback in another thread.

Let’s decide that in this case, the callback will receive output which is captured only after the callback has been registered.

Our test will be similar to test_receive_output_lines_via_callback(), however, we need to somehow make tested.register_callback() happen somewhere between one .readline() and the next. This is not so easy to do because of the same while True that gave us some trouble before.

Testix allows us to simulate asynchronous events like this using its Hook construct. Essentially Hook(function, *args, **kawrgs) can be injected into the middle of a Scenario, and it will call function(*args, **kwargs) at the point in which it’s injected.

Here’s how to write such a test:

 1def test_callback_registered_mid_monitoring(override_imports):
 2    tested = line_monitor.LineMonitor()
 3    with Scenario() as s:
 4        launch_scenario(s)
 5        tested.launch_subprocess(['my', 'command', 'line'])
 6
 7        s.reader.readline() >> 'line 1'
 8        s.reader.readline() >> 'line 2'
 9        s.reader.readline() >> 'line 3'
10        s << Hook(tested.register_callback, Fake('my_callback')) # the hook will execute right after the 'line 3' readline finishes
11        s.my_callback('line 3') # callback is now registered, so it should be called
12        s.reader.readline() >> Throwing(loop_breaker.LoopBreaker)
13
14        with pytest.raises(loop_breaker.LoopBreaker):
15            tested.monitor()

When we run it, we discover it’s already GREEN! Oh no!

Turns out our previous change already solved this problem. This happens sometimes in TDD, so to deal with it, we revert our previous change and make sure this test becomes RED - and carefully check that it failed properly. Happily, this is the case for this particular test.

Let’s Recap

We now have our first implementation of the LineMonitor. It essentially works, but it’s still has its problems. We’ll tackle these problems later in this tutorial, but first, let’s do a short recap.