Tutorial: Writing a pyhard2 driver

Serial drivers always consist of a list of commands (often bytes or mnemonics) and some type of framing. In short, framing is a way to tell the hardware controller where the command starts and ends and whether we want to read or write a value. They are generally described in different parts of the hardware manual and are actually independent. pyhard2 drivers reflect this in the way they are organized. Subsystem instances organize lists of Command that declare properties like the mnemonic, whether the command is read-only, the minimum value allowed, functions to use for pretty-printing, etc. whereas framing is defined in the read() and write() methods of classes deriving from Protocol.

Minimum required toward a working temperature controller driver

This first tutorial shows how to implement a simple serial driver using a concrete example: the driver to a Series 982 Watlow temperature controller.

Declaring commands

We start by declaring the list of commands as it generally does not required any understanding of the protocol itself but simply to translate the instructions given in the manual to pyhard2. In the present case, the manual describes the protocol in form of large tables. We provide an extract of such a table here, limiting ourselves to the most relevant elements.

Commands as given in the manual
Name Description Access Range
DE1 Derivative Output 1 PID read-write 0.00 to 9.99 min
C1 Input 1 Value read-only RL1 to RH1
C2 Input 2 Value read-only RL2 to RH2
IT1 Integral for Output 1 read-write 0.00 to 99.99
PB1 Proportional Band Output 1 read-write ...
PWR Percent Power Present Output read-only 0 to 100
SP1 Set Point 1 read-write RL1 to RH1

There are actually over 60 commands given so that, even if we limit ourselves to the 7 commands given above now, we should organize the driver in a meaningful manner. That will allow us add other useful commands later as the needs emerge. We simply organize the commands in the same way as the menus on the hardware controller.

The DE1, IT1 and PB1 commands that set PID values in the controller are in the operation pid menu. C1, C2, PWR, and SP1 are not in any menu. In pyhard2, commands are organized in a Subsystem and Subsystems can be nested. The hardware controller menus can thus be represented by pyhard2‘s Subsystems very straightforwardly. After the necessary imports:

import unittest
import pyhard2.driver as drv
# We define shortcuts.
Cmd, Access = drv.Command, drv.Access
# Access.RW for read-write commands (default),
# Access.RO for read-only, and Access.WO for write-only.

here is the skeleton of the driver with the most important menus:

class Series982(drv.Subsystem):

   """Driver to Watlow Series 982 controller."""

   def __init__(self, socket):
      self.operation = drv.Subsystem(self)
      self.operation.pid = drv.Subsystem(self.operation)
      # For the sake of the example:
      self.operation.system = drv.Subsystem(self.operation)
      self.setup = drv.Subsystem(self)
      self.setup.output = drv.Subsystem(self.setup)
      self.setup.global_ = drv.Subsystem(self.setup)
      self.factory = drv.Subsystem(self)
      self.factory.lockout = drv.Subsystem(self.factory)
      self.factory.diagnostic = drv.Subsystem(self.factory)
      self.factory.calibration = drv.Subsystem(self.factory)

Note

The parameter passed to drv.Subsystem is its parent Subsystem and the library relies on properly setting it.

The commands themselves are declared as Command and take the command name given in the manual (C1, C2, etc.) and a series of optional arguments like access (read-only, read-write, or write-only), minimum and maximum values, conversion functions, etc., such that we can now complete the driver, now limiting ourselves to the most relevant commands:

class Series982(drv.Subsystem):

    """Driver to Watlow Series 982 controller."""

    def __init__(self, socket, parent=None):
        super(Series982, self).__init__(parent)
        self.setProtocol(XonXoffProtocol(socket))
        self.temperature1 = Cmd("C1", access=Access.RO)
        self.temperature2 = Cmd("C2", access=Access.RO)
        self.setpoint = Cmd("SP1", minimum=-250, maximum=9999)  # assume deg C
        self.power = Cmd("PWR", access=Access.RO, minimum=0, maximum=100)
        self.operation = drv.Subsystem(self)
        self.operation.pid = drv.Subsystem(self.operation)
        self.operation.pid.proportional = Cmd("PB1")
        self.operation.pid.integral = Cmd("IT1", minimum=0.00, maximum=99.99)
        self.operation.pid.derivative = Cmd("DE1", minimum=0.00, maximum=9.99)

Protocol

We should now implement the communication protocol used to read and write commands to the hardware. In the present case, we implement the XON/XOFF protocol described in the manual. Write commands start with ? and end with a carriage return \r and read commands start with = and end with \r. Adapted from the manual:

“=” Command Example
  • Master: = SP1 50\r (Set the setpoint prompt value SP1 to 50.)
  • Remote: XOFF (byte \x13) (This will be returned once the device starts processing. The master must stay offline.)
  • Remote: XON (byte \x11) (Processing is done. Do not send another message until this character is received.)
”?” Command Example
  • Master: ? SP1\r (Request the SP1 prompt value.)
  • Remote: XOFF (The remote is preparing the response. The master must stay offline.)
  • Remote: XON 50\r (The value is returned and the master may send another message once the \r is received.)

that can be translated to the short UML sequence diagram:

group Write
User     ->  Hardware: "= {mnemonic} {value}\r"
User     <-- Hardware: XOFF
User     <-- Hardware: XON
end

group Read
User     ->  Hardware: "? {mnemonic}\r"
User     <-- Hardware: XOFF
User     <-- Hardware: XON "{value}\r"
end

Unit test

Now that we understand the protocol, we can implement unit tests so that (1) we can test the driver offline (i.e., without the hardware controller) and (2) we know when the driver is working. Canned responses using TesterSocket should be enough. We also do not have to test every command but just enough to be sure that the protocol is implemented correctly:

class TestSeries982(unittest.TestCase):

    def setUp(self):
        socket = drv.TesterSocket()
        # socket.msg = {} such as: dict(send message: receive message)
        socket.msg = {"? PB1\r": "\x13\x1111\r",  # send lhs, receive rhs
                      "= DE1 3\r": "\x13\x11",
                      "? C1\r": "\x13\x1150\r",
                      "= SP1 25\r": "\x13\x11"}
        self.i = Series982(socket)

    def test_read(self):
        self.assertEqual(self.i.temperature1.read(), 50)

    def test_write(self):
        self.i.setpoint.write(25)

    def test_read_operation_pid(self):
        self.assertEqual(self.i.operation.pid.proportional.read(), 11)

    def test_write_operation_pid(self):
        self.i.operation.pid.derivative.write(3)

Actual implementation

In pyhard2, all the communication with the socket is handled in the read() and write() methods of a class deriving Protocol. Both methods receive a Context argument that contains all the necessary information. In this particular simple example, we derive CommunicationProtocol. The serial socket receives and returns text. We use python’s format minilanguage for its readability.

class XonXoffProtocol(drv.CommunicationProtocol):

    """Communication using the XON/XOFF protocol."""

    def _xonxoff(self):
        xonxoff = self._socket.read(2)
        if not xonxoff == "\x13\x11":
            raise drv.DriverError("Expected XON/XOFF (%r) got %r instead."
                                  % ("\x13\x11", xonxoff))

    def read(self, context):
        line = "? {mnemonic}\r".format(mnemonic=context.reader)
        self._socket.write(line)
        self._xonxoff()
        return float(self._socket.readline())  # convert unicode to float

    def write(self, context):
        line = "= {mnemonic} {value}\r".format(mnemonic=context.writer,
                                               value=context.value)
        self._socket.write(line)
        self._xonxoff()

Using the driver in the GUI

We could use this driver directly but installing it in the GUI provided by pyhard2 gives us a lot more: threaded communication, data saved periodically, the ability to program the hardware, and more. It is also about as trivial as setting headers in a table!

We need to import the other part of the library:

import pyhard2.ctrlr as ctrlr
import pyhard2.driver.virtual as virtual

and to actually fill the driver table where every column contains a command from the driver definition and every row contains the reference to a node (one piece of hardware connected to a serial port.)

Note

If only a single instrument is connected to the serial port, the node may be set to None or 0.

We declare this in a function:

def createController():
    # Parse the commandline arguments:
    args = ctrlr.Config("watlow")
    # Create an driver instance:
    driver = Series982(drv.Serial(args.port))
    # Create an interface instance:
    iface = ctrlr.Controller(driver, u"Watlow")
    # Add commands, create new columns in the `driver table`:
    iface.addCommand(driver.temperature1, "TC sample", poll=True, log=True)
    iface.addCommand(driver.temperature2, "TC heater", poll=True, log=True)
    iface.addCommand(driver.setpoint, "setpoint", log=True,
                     specialColumn="programmable")
    iface.addCommand(driver.power, "output", poll=True, log=True)
    iface.addCommand(driver.operation.pid.a1.gain, "PID P", hide=True,
                     specialColumn="pidp")
    iface.addCommand(driver.operation.pid.a1.integral, "PID I", hide=True,
                     specialColumn="pidi")
    iface.addCommand(driver.operation.pid.a1.derivative, "PID D", hide=True,
                     specialColumn="pidd")
    # Add at least one node:
    iface.addNode(0, u"Watlow")
    # Fill the table with a call to `populate`
    iface.populate()
    return iface

and start it using standard Qt4:

def main():
    import sys
    import PyQt4.QtGui as QtGui
    app = QtGui.QApplication(sys.argv)
    app.lastWindowClosed.connect(app.quit)
    iface = createController()
    iface.show()
    sys.exit(app.exec_())