Примеры реализации TCP MODBUS
Примеры для устройств с Lua / Micropython
TCP MODBUS сервер Lua
Шаблон простого tcp modbus сервера. Считыватель сохраняет последнюю поднесенную метку в формате: время в unix timestamp в регистрах 0-1 и uid в регистрах 2-5.
Код актуален для устройств с прошивкой 1.6.0 и далее.
local rfid = require("rfid")
local indication = require("indication")

local PORT = 502
local UNIT_ID = 1

local wdt_on = Settings.get("sys_wdt") == "on"
local snd = indication.Sound.new()
local leds = indication.Leds.new({RGB=true})

-- Функция дополнения UID нулями до 16 символов
local function pad_uid(uid)
    return string.format("%016s", uid):gsub(" ", "0")
end

-- Обновление holding регистров: timestamp (4 байта) + UID (8 байт)
local function update_registers(uid)
    -- Получаем timestamp как строку байтов (Big Endian для Modbus)
    local ts_bytes = os.timestamp("s", "be", true)
    
    -- Преобразуем hex UID в строку байтов
    local uid_bytes = cutils.unhexlify(uid, true)
    
    -- Записываем все 12 байт (6 регистров) одной командой
    modbus.setregs8(0, ts_bytes .. uid_bytes)
end

-- Запуск Modbus TCP сервера
modbus.tcp.init(PORT, UNIT_ID)
iprint("Modbus TCP сервер запущен на порту " .. PORT)

-- Проверка статуса
local running, port, unit_id = modbus.tcp.status()
if running then
    iprint(string.format("Статус: работает, порт %d, unit_id %d", port, unit_id))
end

-- Поток для работы с RFID reader
thread.start(function()
    -- Инициализация RFID reader
    reader = rfid.Reader({})
    
    -- Стартовая индикация
    leds:start()
    snd:start()
    
    -- Запуск обработки RFID меток
    reader.process({
        timeout_ms = 200,
        mode = rfid.MODE_LOOP,
        wdt = wdt_on,
        halt = true,
        checkfunc = function(uid)
            -- Обновляем регистры при обнаружении метки
            update_registers(pad_uid(uid))
            
            -- Индикация успешного чтения
            leds:ok()
            snd:ok()
            
        end
    })
end)
Python-клиент для тестирования
from pymodbus.client import ModbusTcpClient

# Конфигурация подключения
HOST = "192.168.8.227"
PORT = 502

try:
    # Используем контекстный менеджер для автоматического закрытия соединения
    with ModbusTcpClient(HOST, port=PORT) as client:
        if not client.connect():
            raise Exception("Не удалось подключиться к устройству")

        # Считываем 6 регистров, начиная с адреса 0
        start_address = 0
        count = 6
        response = client.read_holding_registers(address=start_address, count=count)

        # Проверяем ответ
        if response is not None and not response.isError():
            print("Считанные регистры:", response.registers)
        else:
            print("Ошибка при чтении:", response if response else "Нет ответа от устройства")

except Exception as e:
    print(f"Произошла ошибка: {e}")
TCP MODBUS сервер на Micropython
Шаблон простого tcp modbus сервера. Доработайте по своим запросам.
from gc import collect
import uasyncio as asyncio
import struct
from machine import Pin
import sys
from os import uname
from time import time
from rfid_utils import RFID, BUZZ, GLED, CardFilter
from machine import WDT
from config import settings

# Коды функций Modbus
READ_HOLDING_REGISTERS = 0x03
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_REGISTERS = 0x10

# Коды ошибок
ILLEGAL_FUNCTION = 0x01
ILLEGAL_DATA_ADDRESS = 0x02
ILLEGAL_DATA_VALUE = 0x03
SERVER_DEVICE_FAILURE = 0x04

MODBUS_REGISTERS = [0, 0, 0, 0, 0, 0, 0, 0]

def uid_to_registers(uid_str, registers):
    # Проверка формата UID (должен быть 8 символов для 4 байт или 14 символов для 7 байт)
    if len(uid_str) not in [8, 14]:
        raise ValueError("UID должен быть 4 или 7 байт (8 или 14 символов в hex)")
    
    # Преобразование строки в байты вручную для MicroPython
    uid_bytes = bytearray()
    for i in range(0, len(uid_str), 2):
        if i + 1 < len(uid_str):
            byte_str = uid_str[i:i+2]
            try:
                byte_val = int(byte_str, 16)
                uid_bytes.append(byte_val)
            except ValueError:
                raise ValueError("UID должен содержать только шестнадцатеричные символы")
    
    # Если UID 7-байтный, добавляем нулевой байт для выравнивания до четного числа
    if len(uid_bytes) == 7:
        uid_bytes.append(0)
    
    # Заполняем регистры (каждый регистр - 2 байта)
    for i in range(0, len(uid_bytes), 2):
        reg_index = i // 2
        if reg_index < len(registers):
            # В формате big-endian (старший байт первый)
            registers[reg_index] = (uid_bytes[i] << 8) | uid_bytes[i + 1]
    
    return registers

# Класс для обработки Modbus запросов
class ModbusHandler:
    def __init__(self, holding_registers):
        self.holding_registers = holding_registers
            
    def toggle_led(self):
        if self.led:
            self.led.value(not self.led.value())
    
    def process_request(self, function_code, data, unit_id):
        """Обработка Modbus запроса и генерация ответа"""
        if function_code == READ_HOLDING_REGISTERS:
            return self.handle_read_holding_registers(data)
        elif function_code == WRITE_SINGLE_REGISTER:
            return self.handle_write_single_register(data)
        elif function_code == WRITE_MULTIPLE_REGISTERS:
            return self.handle_write_multiple_registers(data)
        else:
            # Неподдерживаемый код функции
            return struct.pack('BB', function_code | 0x80, ILLEGAL_FUNCTION)
    
    def handle_read_holding_registers(self, data):
        """Обработка функции Modbus 03 (Чтение регистров хранения)"""
        starting_address, quantity = struct.unpack('>HH', data)
        
        # Проверка диапазона адресов
        if starting_address + quantity > len(self.holding_registers):
            return struct.pack('BB', READ_HOLDING_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
        
        # Подготовка ответа
        response = bytearray()
        response.append(READ_HOLDING_REGISTERS)  # Код функции
        response.append(quantity * 2)  # Количество байт
        
        # Добавление значений регистров
        for i in range(quantity):
            register_value = self.holding_registers[starting_address + i]
            response.extend(struct.pack('>H', register_value))
        
        return response
    
    def handle_write_single_register(self, data):
        """Обработка функции Modbus 06 (Запись в один регистр)"""
        address, value = struct.unpack('>HH', data)
        
        # Проверка адреса
        if address >= len(self.holding_registers):
            return struct.pack('BB', WRITE_SINGLE_REGISTER | 0x80, ILLEGAL_DATA_ADDRESS)
        
        # Обновление значения регистра
        self.holding_registers[address] = value
        
        # Возврат эхо запроса
        return struct.pack('BHH', WRITE_SINGLE_REGISTER, address, value)
    
    def handle_write_multiple_registers(self, data):
        """Обработка функции Modbus 16 (Запись в несколько регистров)"""
        if len(data) < 5:  # Необходимо минимум 5 байт для этой функции
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
        
        starting_address, quantity, byte_count = struct.unpack('>HHB', data[:5])
        
        # Проверка диапазона адресов
        if starting_address + quantity > len(self.holding_registers):
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
        
        # Проверка количества (1-123 регистров)
        if quantity < 1 or quantity > 123 or byte_count != quantity * 2:
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
        
        # Проверка достаточности данных
        if len(data) < 5 + byte_count:
            return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
        
        # Обновление значений регистров
        for i in range(quantity):
            value = struct.unpack('>H', data[5 + i * 2:7 + i * 2])[0]
            self.holding_registers[starting_address + i] = value
        
        # Подготовка ответа
        return struct.pack('>BHH', WRITE_MULTIPLE_REGISTERS, starting_address, quantity)


# Обработчик клиентских соединений
async def handle(reader, writer):
    modbus_handler = ModbusHandler(MODBUS_REGISTERS)
    print('Новое подключение Modbus')
    
    while True:
        try:
            # Получение заголовка Modbus TCP (7 байт)
            mbap_header = await reader.read(7)
            if not mbap_header or len(mbap_header) < 7:
                print('Клиент разорвал соединение')
                break
            
            # Разбор заголовка MBAP
            (transaction_id, protocol_id, length, unit_id) = struct.unpack('>HHHB', mbap_header)
            
            # Проверка валидности Modbus TCP пакета
            if protocol_id != 0:
                print(f"Неверный ID протокола: {protocol_id}")
                break
            
            # Получение кода функции и данных
            data = await reader.read(length - 1)
            if not data:
                break
            
            function_code = data[0]
            print(f"Получен запрос Modbus, функция: {function_code}, адрес устройства: {unit_id}")
            
            # Обработка запроса в зависимости от кода функции
            response = modbus_handler.process_request(function_code, data[1:], unit_id)
            
            # Создание заголовка MBAP для ответа
            response_length = len(response) + 1  # +1 для unit_id
            mbap_response = struct.pack('>HHHB', transaction_id, protocol_id, response_length, unit_id)
            
            # Отправка полного ответа
            await writer.awrite(mbap_response + response)

            
        except Exception as e:
            print('Ошибка сервера {} ({})'.format(e, type(e)))
            break
            
        await asyncio.sleep_ms(50)
    
    await writer.aclose()

    print('Соединение закрыто')


# Запуск сервера
async def start(host='0.0.0.0', port=502):  # Стандартный порт Modbus TCP - 502
    loop = asyncio.get_event_loop()
    loop.create_task(asyncio.start_server(handle, host, port))
    print(f"Modbus TCP сервер запущен на {host}:{port}")


async def run():
    rfid = RFID()
    BUZZ.beeps()
    dst_ip = settings.get('rfid-dest')
    dst_port = settings.get('rfid-i-dport')
    format_str = settings.get('rfid-format')
    wdt_en = settings.get('sys-i-wdt', False)
    wdt = None
   
    await start()
    
    if wdt_en:
        wdt = WDT(timeout=10000)

    while 1:
        if wdt_en:
            wdt.feed()

        uid = rfid.read(fmt=format_str)
        if uid:
            uid_to_registers(uid, MODBUS_REGISTERS)
       
        await asyncio.sleep_ms(250)