'''
Register Writing Request/Response Messages
-------------------------------------------
'''
import struct
from pymodbus.pdu import ModbusRequest
from pymodbus.pdu import ModbusResponse
from pymodbus.pdu import ModbusExceptions as merror
[docs]class WriteSingleRegisterRequest(ModbusRequest):
'''
This function code is used to write a single holding register in a
remote device.
The Request PDU specifies the address of the register to
be written. Registers are addressed starting at zero. Therefore register
numbered 1 is addressed as 0.
'''
function_code = 6
_rtu_frame_size = 8
[docs] def __init__(self, address=None, value=None, **kwargs):
''' Initializes a new instance
:param address: The address to start writing add
:param value: The values to write
'''
ModbusRequest.__init__(self, **kwargs)
self.address = address
self.value = value
[docs] def encode(self):
''' Encode a write single register packet packet request
:returns: The encoded packet
'''
packet = struct.pack('>H', self.address)
if self.skip_encode:
packet += self.value
else:
packet += struct.pack('>H', self.value)
return packet
[docs] def decode(self, data):
''' Decode a write single register packet packet request
:param data: The request to decode
'''
self.address, self.value = struct.unpack('>HH', data)
[docs] def execute(self, context):
''' Run a write single register request against a datastore
:param context: The datastore to request from
:returns: An initialized response, exception message otherwise
'''
if not (0 <= self.value <= 0xffff):
return self.doException(merror.IllegalValue)
if not context.validate(self.function_code, self.address, 1):
return self.doException(merror.IllegalAddress)
context.setValues(self.function_code, self.address, [self.value])
values = context.getValues(self.function_code, self.address, 1)
return WriteSingleRegisterResponse(self.address, values[0])
[docs] def get_response_pdu_size(self):
"""
Func_code (1 byte) + Register Address(2 byte) + Register Value (2 bytes)
:return:
"""
return 1 + 2 + 2
[docs] def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
return "WriteRegisterRequest %d" % self.address
[docs]class WriteSingleRegisterResponse(ModbusResponse):
'''
The normal response is an echo of the request, returned after the
register contents have been written.
'''
function_code = 6
_rtu_frame_size = 8
[docs] def __init__(self, address=None, value=None, **kwargs):
''' Initializes a new instance
:param address: The address to start writing add
:param value: The values to write
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.value = value
[docs] def encode(self):
''' Encode a write single register packet packet request
:returns: The encoded packet
'''
return struct.pack('>HH', self.address, self.value)
[docs] def decode(self, data):
''' Decode a write single register packet packet request
:param data: The request to decode
'''
self.address, self.value = struct.unpack('>HH', data)
[docs] def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
params = (self.address, self.value)
return "WriteRegisterResponse %d => %d" % params
#---------------------------------------------------------------------------#
# Write Multiple Registers
#---------------------------------------------------------------------------#
[docs]class WriteMultipleRegistersRequest(ModbusRequest):
'''
This function code is used to write a block of contiguous registers (1
to approx. 120 registers) in a remote device.
The requested written values are specified in the request data field.
Data is packed as two bytes per register.
'''
function_code = 16
_rtu_byte_count_pos = 6
_pdu_length = 5 #func + adress1 + adress2 + outputQuant1 + outputQuant2
[docs] def __init__(self, address=None, values=None, **kwargs):
''' Initializes a new instance
:param address: The address to start writing to
:param values: The values to write
'''
ModbusRequest.__init__(self, **kwargs)
self.address = address
if values is None:
values = []
elif not hasattr(values, '__iter__'):
values = [values]
self.values = values
self.count = len(self.values)
self.byte_count = self.count * 2
[docs] def encode(self):
''' Encode a write single register packet packet request
:returns: The encoded packet
'''
packet = struct.pack('>HHB', self.address, self.count, self.byte_count)
if self.skip_encode:
return packet + b''.join(self.values)
for value in self.values:
packet += struct.pack('>H', value)
return packet
[docs] def decode(self, data):
''' Decode a write single register packet packet request
:param data: The request to decode
'''
self.address, self.count, \
self.byte_count = struct.unpack('>HHB', data[:5])
self.values = [] # reset
for idx in range(5, (self.count * 2) + 5, 2):
self.values.append(struct.unpack('>H', data[idx:idx + 2])[0])
[docs] def execute(self, context):
''' Run a write single register request against a datastore
:param context: The datastore to request from
:returns: An initialized response, exception message otherwise
'''
if not (1 <= self.count <= 0x07b):
return self.doException(merror.IllegalValue)
if (self.byte_count != self.count * 2):
return self.doException(merror.IllegalValue)
if not context.validate(self.function_code, self.address, self.count):
return self.doException(merror.IllegalAddress)
context.setValues(self.function_code, self.address, self.values)
return WriteMultipleRegistersResponse(self.address, self.count)
[docs] def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
params = (self.address, self.count)
return "WriteMultipleRegisterRequest %d => %d" % params
[docs]class WriteMultipleRegistersResponse(ModbusResponse):
'''
"The normal response returns the function code, starting address, and
quantity of registers written.
'''
function_code = 16
_rtu_frame_size = 8
[docs] def __init__(self, address=None, count=None, **kwargs):
''' Initializes a new instance
:param address: The address to start writing to
:param count: The number of registers to write to
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.count = count
[docs] def encode(self):
''' Encode a write single register packet packet request
:returns: The encoded packet
'''
return struct.pack('>HH', self.address, self.count)
[docs] def decode(self, data):
''' Decode a write single register packet packet request
:param data: The request to decode
'''
self.address, self.count = struct.unpack('>HH', data)
[docs] def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
params = (self.address, self.count)
return "WriteMultipleRegisterResponse (%d,%d)" % params
class MaskWriteRegisterRequest(ModbusRequest):
'''
This function code is used to modify the contents of a specified holding
register using a combination of an AND mask, an OR mask, and the
register's current contents. The function can be used to set or clear
individual bits in the register.
'''
function_code = 0x16
_rtu_frame_size = 10
def __init__(self, address=0x0000, and_mask=0xffff, or_mask=0x0000,
**kwargs):
''' Initializes a new instance
:param address: The mask pointer address (0x0000 to 0xffff)
:param and_mask: The and bitmask to apply to the register address
:param or_mask: The or bitmask to apply to the register address
'''
ModbusRequest.__init__(self, **kwargs)
self.address = address
self.and_mask = and_mask
self.or_mask = or_mask
def encode(self):
''' Encodes the request packet
:returns: The byte encoded packet
'''
return struct.pack('>HHH', self.address, self.and_mask,
self.or_mask)
def decode(self, data):
''' Decodes the incoming request
:param data: The data to decode into the address
'''
self.address, self.and_mask, self.or_mask = struct.unpack('>HHH',
data)
def execute(self, context):
''' Run a mask write register request against the store
:param context: The datastore to request from
:returns: The populated response
'''
if not (0x0000 <= self.and_mask <= 0xffff):
return self.doException(merror.IllegalValue)
if not (0x0000 <= self.or_mask <= 0xffff):
return self.doException(merror.IllegalValue)
if not context.validate(self.function_code, self.address, 1):
return self.doException(merror.IllegalAddress)
values = context.getValues(self.function_code, self.address, 1)[0]
values = ((values & self.and_mask) | self.or_mask)
context.setValues(self.function_code, self.address, [values])
return MaskWriteRegisterResponse(self.address, self.and_mask,
self.or_mask)
class MaskWriteRegisterResponse(ModbusResponse):
'''
The normal response is an echo of the request. The response is returned
after the register has been written.
'''
function_code = 0x16
_rtu_frame_size = 10
def __init__(self, address=0x0000, and_mask=0xffff, or_mask=0x0000,
**kwargs):
''' Initializes a new instance
:param address: The mask pointer address (0x0000 to 0xffff)
:param and_mask: The and bitmask applied to the register address
:param or_mask: The or bitmask applied to the register address
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.and_mask = and_mask
self.or_mask = or_mask
def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
return struct.pack('>HHH', self.address, self.and_mask,
self.or_mask)
def decode(self, data):
''' Decodes a the response
:param data: The packet data to decode
'''
self.address, self.and_mask, self.or_mask = struct.unpack('>HHH',
data)
#---------------------------------------------------------------------------#
# Exported symbols
#---------------------------------------------------------------------------#
__all__ = [
"WriteSingleRegisterRequest", "WriteSingleRegisterResponse",
"WriteMultipleRegistersRequest", "WriteMultipleRegistersResponse",
"MaskWriteRegisterRequest", "MaskWriteRegisterResponse"
]