Примеры реализации TCP MODBUS
Примеры для устройств с Lua / Micropython
TCP MODBUS сервер с использованием LuaSocket
Шаблон простого tcp modbus сервера. Считыватель сохраняет последнюю поднесенную метку в формате: время в unix timestamp в регистрах 0-1 и uid в регистрах 2-5.
LuaSocket не включен по-умолчанию в сборку, его нужно установить из репозитория.
local rfid = require("rfid")
local indication = require("indication")

local PORT = 502

local wdt_on = Settings.get("sys_wdt") == "on"
local snd = indication.Sound.new()
local leds = indication.Leds.new({RGB=true})
local mtx = thread.createmutex()
local holding_registers = {0, 0, 0, 0, 0, 0}

local function pad_uid(uid)
    return string.format("%016s", uid):gsub(" ", "0")
end

local function split_into_words_32(number)
    return (number >> 16) & 0xFFFF,  number & 0xFFFF
end

local function split_uid_into_words(uid)
  return tonumber(string.sub(uid, 1, 4), 16) or 0,
         tonumber(string.sub(uid, 5, 8), 16) or 0,
         tonumber(string.sub(uid, 9, 12), 16) or 0,
         tonumber(string.sub(uid, 13, 16), 16) or 0
end

local function to_holding_registers(regs, ts, uid)
  regs[1], regs[2] = split_into_words_32(ts)
  regs[3], regs[4], regs[5], regs[6] = split_uid_into_words(uid)
end

thread.start(function()
  -- определяем классы
  reader = rfid.Reader({})
  -- стартовая индикация
  leds:start()
  snd:start()
  -- запускаем 
  reader.process({
      timeout_ms = 200, -- таймаут в цикле; всегда должен быть больше 0
      mode = rfid.MODE_LOOP,
      wdt = wdt_on,
      halt = true,
      checkfunc = function(uid) -- функция для проверки метки
        mtx:lock()
        to_holding_registers(holding_registers,  os.time(), pad_uid(uid))
        mtx:unlock()
        leds:ok() -- индикация
        snd:ok() -- звук
      end
  })
end)

--tcp modbus server
local function to_bytes(value)
  return string.pack(">I2", value)
end

local function from_bytes(b1, b2)
  return b1*256 + b2
end

local function build_mbap_header(trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, length)
  return string.pack(">BBBBH", trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, length)
end

local function send_response(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, pdu)
  local response = string.pack(">HHHBc"..#pdu, trans_id_hi * 256 + trans_id_lo, protocol_id_hi * 256 + protocol_id_lo, #pdu + 1, unit_id, pdu)
  client:send(response)
end

local function send_error(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, func_code, error_code)
  local error_func_code = string.char(func_code + 0x80)
  local error_pdu = error_func_code .. string.char(error_code)
  send_response(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, error_pdu)
end

local server = assert(socket.bind("*", PORT))
--server:settimeout(0)

 cacheutils.addLog(log, "Modbus TCP устройство запущено на порту " .. PORT, cacheutils.Info)

while true do
  local client = server:accept()
  if client then
    -- client:settimeout(10)
    -- Читаем MBAP Header (7 байт)
    local header, err = client:receive(7)
    if header then
      local trans_id_hi, trans_id_lo,
      protocol_id_hi, protocol_id_lo,
      length_hi, length_lo,
      unit_id = header:byte(1, 7)
      
      local length = from_bytes(length_hi, length_lo)
      -- Длина включает в себя байты Unit ID и следующие за ним байты.
      -- Значит, нам нужно прочитать (length - 1) байт, чтобы получить PDU
      local pdu_len = length - 1
      local pdu, err = client:receive(pdu_len)
      if pdu then
        local fcode = pdu:byte(1)

        if fcode == 3 then
          -- Чтение holding регистров
          -- Формат запроса: Function Code (1 байт), Start Address Hi, Start Address Lo, Quantity Hi, Quantity Lo
          local start_hi, start_lo, qty_hi, qty_lo = pdu:byte(2, 5)
          local start_addr = from_bytes(start_hi, start_lo) + 1  -- +1, т.к. Lua-индексация с 1
          local quantity = from_bytes(qty_hi, qty_lo)

          -- Проверяем, не выходит ли за пределы массива регистров
          if start_addr + quantity - 1 <= #holding_registers then
              local byte_count = quantity * 2
              mtx:lock() -- Блокируем доступ к holding_registers для безопасного чтения
              local format = ">BB" .. string.rep("H", quantity)
              -- Упаковываем Function Code, Byte Count и данные регистров в одну строку
              local response_pdu = string.pack(format, fcode, byte_count, table.unpack(holding_registers, start_addr, start_addr + quantity -1))
              mtx:unlock()
              send_response(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, response_pdu)
          else
            -- Если запрос выходит за пределы, возвращаем ошибку Modbus
            send_error(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, fcode, 0x02)
          end
        else
          -- Неподдерживаемая функция
          send_error(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, fcode, 0x01)
        end
      end
    end
    client:close()
  end

  socket.sleep(0.01)
end
Python-клиент для тестирования
from pymodbus.client import ModbusTcpClient

# Конфигурация подключения
HOST = "192.168.8.227"
PORT = 502

try:
    # Используем контекстный менеджер для автоматического закрытия соединения
    with ModbusTcpClient(HOST, port=PORT) as client:
        if not client.connect():
            raise Exception("Не удалось подключиться к устройству")

        # Считываем 6 регистров, начиная с адреса 0
        start_address = 0
        count = 6
        response = client.read_holding_registers(address=start_address, count=count)

        # Проверяем ответ
        if response is not None and not response.isError():
            print("Считанные регистры:", response.registers)
        else:
            print("Ошибка при чтении:", response if response else "Нет ответа от устройства")

except Exception as e:
    print(f"Произошла ошибка: {e}")
TCP MODBUS сервер на Micropython
Шаблон простого tcp modbus сервера. Доработайте по своим запросам.
from gc import collect
import uasyncio as asyncio
import struct
from machine import Pin
import sys
from os import uname
from time import time
from rfid_utils import RFID, BUZZ, GLED, CardFilter
from machine import WDT
from config import settings

# Коды функций Modbus
READ_HOLDING_REGISTERS = 0x03
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_REGISTERS = 0x10

# Коды ошибок
ILLEGAL_FUNCTION = 0x01
ILLEGAL_DATA_ADDRESS = 0x02
ILLEGAL_DATA_VALUE = 0x03
SERVER_DEVICE_FAILURE = 0x04

MODBUS_REGISTERS = [0, 0, 0, 0, 0, 0, 0, 0]

def uid_to_registers(uid_str, registers):
    # Проверка формата UID (должен быть 8 символов для 4 байт или 14 символов для 7 байт)
    if len(uid_str) not in [8, 14]:
        raise ValueError("UID должен быть 4 или 7 байт (8 или 14 символов в hex)")
    
    # Преобразование строки в байты вручную для MicroPython
    uid_bytes = bytearray()
    for i in range(0, len(uid_str), 2):
        if i + 1 < len(uid_str):
            byte_str = uid_str[i:i+2]
            try:
                byte_val = int(byte_str, 16)
                uid_bytes.append(byte_val)
            except ValueError:
                raise ValueError("UID должен содержать только шестнадцатеричные символы")
    
    # Если UID 7-байтный, добавляем нулевой байт для выравнивания до четного числа
    if len(uid_bytes) == 7:
        uid_bytes.append(0)
    
    # Заполняем регистры (каждый регистр - 2 байта)
    for i in range(0, len(uid_bytes), 2):
        reg_index = i // 2
        if reg_index < len(registers):
            # В формате big-endian (старший байт первый)
            registers[reg_index] = (uid_bytes[i] << 8) | uid_bytes[i + 1]
    
    return registers

# Класс для обработки Modbus запросов
class ModbusHandler:
    def __init__(self, holding_registers):
        self.holding_registers = holding_registers
            
    def toggle_led(self):
        if self.led:
            self.led.value(not self.led.value())
    
    def process_request(self, function_code, data, unit_id):
        """Обработка Modbus запроса и генерация ответа"""
        if function_code == READ_HOLDING_REGISTERS:
            return self.handle_read_holding_registers(data)
        elif function_code == WRITE_SINGLE_REGISTER:
            return self.handle_write_single_register(data)
        elif function_code == WRITE_MULTIPLE_REGISTERS:
            return self.handle_write_multiple_registers(data)
        else:
            # Неподдерживаемый код функции
            return struct.pack('BB', function_code | 0x80, ILLEGAL_FUNCTION)
    
    def handle_read_holding_registers(self, data):
        """Обработка функции Modbus 03 (Чтение регистров хранения)"""
        starting_address, quantity = struct.unpack('>HH', data)
        
        # Проверка диапазона адресов
        if starting_address + quantity > len(self.holding_registers):
            return struct.pack('BB', READ_HOLDING_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
        
        # Подготовка ответа
        response = bytearray()
        response.append(READ_HOLDING_REGISTERS)  # Код функции
        response.append(quantity * 2)  # Количество байт
        
        # Добавление значений регистров
        for i in range(quantity):
            register_value = self.holding_registers[starting_address + i]
            response.extend(struct.pack('>H', register_value))
        
        return response
    
    def handle_write_single_register(self, data):
        """Обработка функции Modbus 06 (Запись в один регистр)"""
        address, value = struct.unpack('>HH', data)
        
        # Проверка адреса
        if address >= len(self.holding_registers):
            return struct.pack('BB', WRITE_SINGLE_REGISTER | 0x80, ILLEGAL_DATA_ADDRESS)
        
        # Обновление значения регистра
        self.holding_registers[address] = value
        
        # Возврат эхо запроса
        return struct.pack('BHH', WRITE_SINGLE_REGISTER, address, value)
    
    def handle_write_multiple_registers(self, data):
        """Обработка функции Modbus 16 (Запись в несколько регистров)"""
        if len(data) < 5:  # Необходимо минимум 5 байт для этой функции
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
        
        starting_address, quantity, byte_count = struct.unpack('>HHB', data[:5])
        
        # Проверка диапазона адресов
        if starting_address + quantity > len(self.holding_registers):
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
        
        # Проверка количества (1-123 регистров)
        if quantity < 1 or quantity > 123 or byte_count != quantity * 2:
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
        
        # Проверка достаточности данных
        if len(data) < 5 + byte_count:
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
        
        # Обновление значений регистров
        for i in range(quantity):
            value = struct.unpack('>H', data[5 + i * 2:7 + i * 2])[0]
            self.holding_registers[starting_address + i] = value
        
        # Подготовка ответа
        return struct.pack('>BHH', WRITE_MULTIPLE_REGISTERS, starting_address, quantity)


# Обработчик клиентских соединений
async def handle(reader, writer):
    modbus_handler = ModbusHandler(MODBUS_REGISTERS)
    print('Новое подключение Modbus')
    
    while True:
        try:
            # Получение заголовка Modbus TCP (7 байт)
            mbap_header = await reader.read(7)
            if not mbap_header or len(mbap_header) < 7:
                print('Клиент разорвал соединение')
                break
            
            # Разбор заголовка MBAP
            (transaction_id, protocol_id, length, unit_id) = struct.unpack('>HHHB', mbap_header)
            
            # Проверка валидности Modbus TCP пакета
            if protocol_id != 0:
                print(f"Неверный ID протокола: {protocol_id}")
                break
            
            # Получение кода функции и данных
            data = await reader.read(length - 1)
            if not data:
                break
            
            function_code = data[0]
            print(f"Получен запрос Modbus, функция: {function_code}, адрес устройства: {unit_id}")
            
            # Обработка запроса в зависимости от кода функции
            response = modbus_handler.process_request(function_code, data[1:], unit_id)
            
            # Создание заголовка MBAP для ответа
            response_length = len(response) + 1  # +1 для unit_id
            mbap_response = struct.pack('>HHHB', transaction_id, protocol_id, response_length, unit_id)
            
            # Отправка полного ответа
            await writer.awrite(mbap_response + response)

            
        except Exception as e:
            print('Ошибка сервера {} ({})'.format(e, type(e)))
            break
            
        await asyncio.sleep_ms(50)
    
    await writer.aclose()

    print('Соединение закрыто')


# Запуск сервера
async def start(host='0.0.0.0', port=502):  # Стандартный порт Modbus TCP - 502
    loop = asyncio.get_event_loop()
    loop.create_task(asyncio.start_server(handle, host, port))
    print(f"Modbus TCP сервер запущен на {host}:{port}")


async def run():
    rfid = RFID()
    BUZZ.beeps()
    dst_ip = settings.get('rfid-dest')
    dst_port = settings.get('rfid-i-dport')
    format_str = settings.get('rfid-format')
    wdt_en = settings.get('sys-i-wdt', False)
    wdt = None
   
    await start()
    
    if wdt_en:
        wdt = WDT(timeout=10000)

    while 1:
        if wdt_en:
            wdt.feed()

        uid = rfid.read(fmt=format_str)
        if uid:
            uid_to_registers(uid, MODBUS_REGISTERS)
       
        await asyncio.sleep_ms(250)