Writing a Binding in Python
Thanks to the afb-libpython
module, we will be able to write a small Binding in the Python programming language.
This part will detail what the Binding does and how it works.
Python script
You can get the Python script named anemometer_industrial_demo.py
available here. Copy it to your gitsources
repository.
The script uses the libafb
that is exposed through the Python module of the same name.
The logic of the script is to:
- load the Modbus binding with a dedicated configuration,
- load the Redis TSDB binding
- subscribe itself to updates of values on the Modbus device
- compute an angle and a speed from these values
- and publish them on a Redis timeseries
The script starts an infinite event loop from the Application Framework Binder library and waits for events.
Note that redpesk’s Modbus binding supports “plugins” for encoding and decoding raw values. The angle and speed computation part could thus also be done thanks to a Modbus binding plugin written in a compiled language (C/C++/Rust) if speed matters.
Initialization
The first call to the libafb
library happens in the main class constructor, declares a Binder and gives it some important configuration keys. In particular, the set
key allows you to define configuration of each binding that will be created further.
binder = libafb.binder(
{
"uid": "py-binder",
"verbose": 255, # be verbose
"port": 9998, # http port for debugging
"roothttp": ".",
"rootdir": ".",
"set": {
# Configuration of the Redis TSDB binding
"redis-binding.so": {
"$schema": ...,
"metadata": ...
},
# Configuration of the Modbus binding
"modbus-binding.so": {
"$schema": ...,
"metadata": ...,
"modbus": [
{
"uid": "King-Pigeon-M100T",
"info": "King Pigeon 'M100T' Modbus TCP Remote I/O Module",
"uri": "...",
"slaveid": 1,
"timeout": 250,
"autostart": 1,
"prefix": "m100t",
"hertz": 1,
"sensors": [...]
}
]
}
}
}
)
Then bindings are loaded thanks to calls to libafb.binding
modbus_binding = libafb.binding(
{
"uid": "modbus",
"path": "/usr/redpesk/modbus-binding/lib/modbus-binding.so",
}
)
redis_binding = libafb.binding(
{
"uid": "redis",
"path": "/usr/redpesk/redis-tsdb-binding/lib/redis-binding.so",
}
)
Event subscription
Finally the event loop is started with libafb.loopstart(binder, self._on_binder_init)
with self._on_binder_init
a method called upon initialization. This is where event subscription can be done.
def on_binder_init(self, binder, _):
# the event handlers must live as long as the event callbacks
# so we attach the return value of `evthandler` to self
self._sin_handler = libafb.evthandler(
binder,
{"uid": "modbus", "pattern": "*/sin", "callback": self.on_sin_value},
)
libafb.callsync(binder, "modbus", "m100t/sin", {"action": "subscribe"})
self._cos_handler = libafb.evthandler(
binder,
{"uid": "modbus", "pattern": "*/cos", "callback": self.on_cos_value},
)
libafb.callsync(binder, "modbus", "m100t/cos", {"action": "subscribe"})
return 0
The logic is, for each Modbus value we want to be notified of a change:
- call the verb
subscribe
on it - use
libafb.evthandler
to register a callback that will be called anytime an event with a certain pattern is received
Pay attention to the comments: at the time of writing, the object returned by
libafb.evthandler
must live as long as we want the event to be received. This is why we attach the return value to the class instance that lives as long as the event loop.
Computation
The last pieces needed are callback methods that will update the wind direction and speed and publish them in Redis.
def update_direction(self, binder):
ts_ms = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
if ts_ms - self._last_ts < 1:
return
if self._sin is not None and self._cos is not None:
angle_deg = math.atan2(self._sin, self._cos) / math.pi * 180.0
libafb.callasync(
binder,
"redis",
"add",
None, # update callback
None, # context
{
"key": "wind_direction_deg",
"timestamp": f"{ts_ms}",
"value": angle_deg,
},
)
self._last_ts = ts_ms
def on_sin_value(self, binder, event_name, user_data, value):
self._sin = value
self.update_direction(binder)
def on_cos_value(self, binder, event_name, user_data, value):
self._cos = value
self.update_direction(binder)
The provided script handles command line argument parsing so that it is possible to specify a .json file containing a Binding configuration to load during initialization. This is how we can have a configuration where the Modbus hardware device is simulated and one that corresponds to the actual pice of hardware.