'''
Modbus Payload Builders
------------------------
A collection of utilities for building and decoding
modbus messages payloads.
'''
from struct import pack, unpack
from pymodbus.interfaces import IPayloadBuilder
from pymodbus.constants import Endian
from pymodbus.utilities import pack_bitstring
from pymodbus.utilities import unpack_bitstring
from pymodbus.utilities import make_byte_string
from pymodbus.exceptions import ParameterException
[docs]class BinaryPayloadBuilder(IPayloadBuilder):
'''
A utility that helps build payload messages to be
written with the various modbus messages. It really is just
a simple wrapper around the struct module, however it saves
time looking up the format strings. What follows is a simple
example::
builder = BinaryPayloadBuilder(endian=Endian.Little)
builder.add_8bit_uint(1)
builder.add_16bit_uint(2)
payload = builder.build()
'''
[docs] def __init__(self, payload=None, endian=Endian.Little):
''' Initialize a new instance of the payload builder
:param payload: Raw binary payload data to initialize with
:param endian: The endianess of the payload
'''
self._payload = payload or []
self._endian = endian
[docs] def to_string(self):
''' Return the payload buffer as a string
:returns: The payload buffer as a string
'''
return b''.join(self._payload)
[docs] def __str__(self):
''' Return the payload buffer as a string
:returns: The payload buffer as a string
'''
return self.to_string().decode('utf-8')
[docs] def reset(self):
''' Reset the payload buffer
'''
self._payload = []
[docs] def to_registers(self):
''' Convert the payload buffer into a register
layout that can be used as a context block.
:returns: The register layout to use as a block
'''
fstring = self._endian + 'H'
payload = self.build()
return [unpack(fstring, value)[0] for value in payload]
[docs] def build(self):
''' Return the payload buffer as a list
This list is two bytes per element and can
thus be treated as a list of registers.
:returns: The payload buffer as a list
'''
string = self.to_string()
length = len(string)
string = string + (b'\x00' * (length % 2))
return [string[i:i+2] for i in range(0, length, 2)]
[docs] def add_bits(self, values):
''' Adds a collection of bits to be encoded
If these are less than a multiple of eight,
they will be left padded with 0 bits to make
it so.
:param value: The value to add to the buffer
'''
value = pack_bitstring(values)
self._payload.append(value)
[docs] def add_8bit_uint(self, value):
''' Adds a 8 bit unsigned int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'B'
self._payload.append(pack(fstring, value))
[docs] def add_16bit_uint(self, value):
''' Adds a 16 bit unsigned int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'H'
self._payload.append(pack(fstring, value))
[docs] def add_32bit_uint(self, value):
''' Adds a 32 bit unsigned int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'I'
self._payload.append(pack(fstring, value))
[docs] def add_64bit_uint(self, value):
''' Adds a 64 bit unsigned int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'Q'
self._payload.append(pack(fstring, value))
[docs] def add_8bit_int(self, value):
''' Adds a 8 bit signed int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'b'
self._payload.append(pack(fstring, value))
[docs] def add_16bit_int(self, value):
''' Adds a 16 bit signed int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'h'
self._payload.append(pack(fstring, value))
[docs] def add_32bit_int(self, value):
''' Adds a 32 bit signed int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'i'
self._payload.append(pack(fstring, value))
[docs] def add_64bit_int(self, value):
''' Adds a 64 bit signed int to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'q'
self._payload.append(pack(fstring, value))
[docs] def add_32bit_float(self, value):
''' Adds a 32 bit float to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'f'
self._payload.append(pack(fstring, value))
[docs] def add_64bit_float(self, value):
''' Adds a 64 bit float(double) to the buffer
:param value: The value to add to the buffer
'''
fstring = self._endian + 'd'
self._payload.append(pack(fstring, value))
[docs] def add_string(self, value):
''' Adds a string to the buffer
:param value: The value to add to the buffer
'''
value = make_byte_string(value)
fstring = self._endian + str(len(value)) + 's'
self._payload.append(pack(fstring, value))
[docs]class BinaryPayloadDecoder(object):
'''
A utility that helps decode payload messages from a modbus
reponse message. It really is just a simple wrapper around
the struct module, however it saves time looking up the format
strings. What follows is a simple example::
decoder = BinaryPayloadDecoder(payload)
first = decoder.decode_8bit_uint()
second = decoder.decode_16bit_uint()
'''
[docs] def __init__(self, payload, endian=Endian.Little):
''' Initialize a new payload decoder
:param payload: The payload to decode with
:param endian: The endianess of the payload
'''
self._payload = payload
self._pointer = 0x00
self._endian = endian
@classmethod
[docs] def fromRegisters(klass, registers, endian=Endian.Little):
''' Initialize a payload decoder with the result of
reading a collection of registers from a modbus device.
The registers are treated as a list of 2 byte values.
We have to do this because of how the data has already
been decoded by the rest of the library.
:param registers: The register results to initialize with
:param endian: The endianess of the payload
:returns: An initialized PayloadDecoder
'''
if isinstance(registers, list): # repack into flat binary
payload = b''.join(pack(endian + 'H', x) for x in registers)
return klass(payload, endian)
raise ParameterException('Invalid collection of registers supplied')
@classmethod
[docs] def fromCoils(klass, coils, endian=Endian.Little):
''' Initialize a payload decoder with the result of
reading a collection of coils from a modbus device.
The coils are treated as a list of bit(boolean) values.
:param coils: The coil results to initialize with
:param endian: The endianess of the payload
:returns: An initialized PayloadDecoder
'''
if isinstance(coils, list):
payload = pack_bitstring(coils)
return klass(payload, endian)
raise ParameterException('Invalid collection of coils supplied')
[docs] def reset(self):
''' Reset the decoder pointer back to the start
'''
self._pointer = 0x00
[docs] def decode_8bit_uint(self):
''' Decodes a 8 bit unsigned int from the buffer
'''
self._pointer += 1
fstring = self._endian + 'B'
handle = self._payload[self._pointer - 1:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_bits(self):
''' Decodes a byte worth of bits from the buffer
'''
self._pointer += 1
# fstring = self._endian + 'B'
handle = self._payload[self._pointer - 1:self._pointer]
handle = make_byte_string(handle)
return unpack_bitstring(handle)
[docs] def decode_16bit_uint(self):
''' Decodes a 16 bit unsigned int from the buffer
'''
self._pointer += 2
fstring = self._endian + 'H'
handle = self._payload[self._pointer - 2:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_32bit_uint(self):
''' Decodes a 32 bit unsigned int from the buffer
'''
self._pointer += 4
fstring = self._endian + 'I'
handle = self._payload[self._pointer - 4:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_64bit_uint(self):
''' Decodes a 64 bit unsigned int from the buffer
'''
self._pointer += 8
fstring = self._endian + 'Q'
handle = self._payload[self._pointer - 8:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_8bit_int(self):
''' Decodes a 8 bit signed int from the buffer
'''
self._pointer += 1
fstring = self._endian + 'b'
handle = self._payload[self._pointer - 1:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_16bit_int(self):
''' Decodes a 16 bit signed int from the buffer
'''
self._pointer += 2
fstring = self._endian + 'h'
handle = self._payload[self._pointer - 2:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_32bit_int(self):
''' Decodes a 32 bit signed int from the buffer
'''
self._pointer += 4
fstring = self._endian + 'i'
handle = self._payload[self._pointer - 4:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_64bit_int(self):
''' Decodes a 64 bit signed int from the buffer
'''
self._pointer += 8
fstring = self._endian + 'q'
handle = self._payload[self._pointer - 8:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_32bit_float(self):
''' Decodes a 32 bit float from the buffer
'''
self._pointer += 4
fstring = self._endian + 'f'
handle = self._payload[self._pointer - 4:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_64bit_float(self):
''' Decodes a 64 bit float(double) from the buffer
'''
self._pointer += 8
fstring = self._endian + 'd'
handle = self._payload[self._pointer - 8:self._pointer]
handle = make_byte_string(handle)
return unpack(fstring, handle)[0]
[docs] def decode_string(self, size=1):
''' Decodes a string from the buffer
:param size: The size of the string to decode
'''
self._pointer += size
return self._payload[self._pointer - size:self._pointer]
#---------------------------------------------------------------------------#
# Exported Identifiers
#---------------------------------------------------------------------------#
__all__ = ["BinaryPayloadBuilder", "BinaryPayloadDecoder"]