Source code for pymodbus.datastore.remote

from pymodbus.exceptions import NotImplementedException
from pymodbus.interfaces import IModbusSlaveContext

#---------------------------------------------------------------------------#
# Logging
#---------------------------------------------------------------------------#
import logging
_logger = logging.getLogger(__name__)


#---------------------------------------------------------------------------#
# Context
#---------------------------------------------------------------------------#
[docs]class RemoteSlaveContext(IModbusSlaveContext): ''' TODO This creates a modbus data model that connects to a remote device (depending on the client used) '''
[docs] def __init__(self, client): ''' Initializes the datastores :param client: The client to retrieve values with ''' self._client = client self.__build_mapping()
[docs] def reset(self): ''' Resets all the datastores to their default values ''' raise NotImplementedException()
[docs] def validate(self, fx, address, count=1): ''' Validates the request to make sure it is in range :param fx: The function we are working with :param address: The starting address :param count: The number of values to test :returns: True if the request in within range, False otherwise ''' _logger.debug("validate[%d] %d:%d" % (fx, address, count)) result = self.__get_callbacks[self.decode(fx)](address, count) return result.function_code < 0x80
[docs] def getValues(self, fx, address, count=1): ''' Validates the request to make sure it is in range :param fx: The function we are working with :param address: The starting address :param count: The number of values to retrieve :returns: The requested values from a:a+c ''' # TODO deal with deferreds _logger.debug("get values[%d] %d:%d" % (fx, address, count)) result = self.__get_callbacks[self.decode(fx)](address, count) return self.__extract_result(self.decode(fx), result)
[docs] def setValues(self, fx, address, values): ''' Sets the datastore with the supplied values :param fx: The function we are working with :param address: The starting address :param values: The new values to be set ''' # TODO deal with deferreds _logger.debug("set values[%d] %d:%d" % (fx, address, len(values))) self.__set_callbacks[self.decode(fx)](address, values)
[docs] def __str__(self): ''' Returns a string representation of the context :returns: A string representation of the context ''' return "Remote Slave Context(%s)" % self._client
def __build_mapping(self): ''' A quick helper method to build the function code mapper. ''' self.__get_callbacks = { 'd': lambda a, c: self._client.read_discrete_inputs(a, c), 'c': lambda a, c: self._client.read_coils(a, c), 'h': lambda a, c: self._client.read_holding_registers(a, c), 'i': lambda a, c: self._client.read_input_registers(a, c), } self.__set_callbacks = { 'd': lambda a, v: self._client.write_coils(a, v), 'c': lambda a, v: self._client.write_coils(a, v), 'h': lambda a, v: self._client.write_registers(a, v), 'i': lambda a, v: self._client.write_registers(a, v), } def __extract_result(self, fx, result): ''' A helper method to extract the values out of a response. TODO make this consistent (values?) ''' if result.function_code < 0x80: if fx in ['d', 'c']: return result.bits if fx in ['h', 'i']: return result.registers else: return result