Шаблон простого tcp modbus сервера. Доработайте по своим запросам.
from gc import collect
import uasyncio as asyncio
import struct
from machine import Pin
import sys
from os import uname
from time import time
from rfid_utils import RFID, BUZZ, GLED, CardFilter
from machine import WDT
from config import settings
# Коды функций Modbus
READ_HOLDING_REGISTERS = 0x03
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_REGISTERS = 0x10
# Коды ошибок
ILLEGAL_FUNCTION = 0x01
ILLEGAL_DATA_ADDRESS = 0x02
ILLEGAL_DATA_VALUE = 0x03
SERVER_DEVICE_FAILURE = 0x04
MODBUS_REGISTERS = [0, 0, 0, 0, 0, 0, 0, 0]
def uid_to_registers(uid_str, registers):
# Проверка формата UID (должен быть 8 символов для 4 байт или 14 символов для 7 байт)
if len(uid_str) not in [8, 14]:
raise ValueError("UID должен быть 4 или 7 байт (8 или 14 символов в hex)")
# Преобразование строки в байты вручную для MicroPython
uid_bytes = bytearray()
for i in range(0, len(uid_str), 2):
if i + 1 < len(uid_str):
byte_str = uid_str[i:i+2]
try:
byte_val = int(byte_str, 16)
uid_bytes.append(byte_val)
except ValueError:
raise ValueError("UID должен содержать только шестнадцатеричные символы")
# Если UID 7-байтный, добавляем нулевой байт для выравнивания до четного числа
if len(uid_bytes) == 7:
uid_bytes.append(0)
# Заполняем регистры (каждый регистр - 2 байта)
for i in range(0, len(uid_bytes), 2):
reg_index = i // 2
if reg_index < len(registers):
# В формате big-endian (старший байт первый)
registers[reg_index] = (uid_bytes[i] << 8) | uid_bytes[i + 1]
return registers
# Класс для обработки Modbus запросов
class ModbusHandler:
def __init__(self, holding_registers):
self.holding_registers = holding_registers
def toggle_led(self):
if self.led:
self.led.value(not self.led.value())
def process_request(self, function_code, data, unit_id):
"""Обработка Modbus запроса и генерация ответа"""
if function_code == READ_HOLDING_REGISTERS:
return self.handle_read_holding_registers(data)
elif function_code == WRITE_SINGLE_REGISTER:
return self.handle_write_single_register(data)
elif function_code == WRITE_MULTIPLE_REGISTERS:
return self.handle_write_multiple_registers(data)
else:
# Неподдерживаемый код функции
return struct.pack('BB', function_code | 0x80, ILLEGAL_FUNCTION)
def handle_read_holding_registers(self, data):
"""Обработка функции Modbus 03 (Чтение регистров хранения)"""
starting_address, quantity = struct.unpack('>HH', data)
# Проверка диапазона адресов
if starting_address + quantity > len(self.holding_registers):
return struct.pack('BB', READ_HOLDING_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
# Подготовка ответа
response = bytearray()
response.append(READ_HOLDING_REGISTERS) # Код функции
response.append(quantity * 2) # Количество байт
# Добавление значений регистров
for i in range(quantity):
register_value = self.holding_registers[starting_address + i]
response.extend(struct.pack('>H', register_value))
return response
def handle_write_single_register(self, data):
"""Обработка функции Modbus 06 (Запись в один регистр)"""
address, value = struct.unpack('>HH', data)
# Проверка адреса
if address >= len(self.holding_registers):
return struct.pack('BB', WRITE_SINGLE_REGISTER | 0x80, ILLEGAL_DATA_ADDRESS)
# Обновление значения регистра
self.holding_registers[address] = value
# Возврат эхо запроса
return struct.pack('BHH', WRITE_SINGLE_REGISTER, address, value)
def handle_write_multiple_registers(self, data):
"""Обработка функции Modbus 16 (Запись в несколько регистров)"""
if len(data) < 5: # Необходимо минимум 5 байт для этой функции
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
starting_address, quantity, byte_count = struct.unpack('>HHB', data[:5])
# Проверка диапазона адресов
if starting_address + quantity > len(self.holding_registers):
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
# Проверка количества (1-123 регистров)
if quantity < 1 or quantity > 123 or byte_count != quantity * 2:
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
# Проверка достаточности данных
if len(data) < 5 + byte_count:
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
# Обновление значений регистров
for i in range(quantity):
value = struct.unpack('>H', data[5 + i * 2:7 + i * 2])[0]
self.holding_registers[starting_address + i] = value
# Подготовка ответа
return struct.pack('>BHH', WRITE_MULTIPLE_REGISTERS, starting_address, quantity)
# Обработчик клиентских соединений
async def handle(reader, writer):
modbus_handler = ModbusHandler(MODBUS_REGISTERS)
print('Новое подключение Modbus')
while True:
try:
# Получение заголовка Modbus TCP (7 байт)
mbap_header = await reader.read(7)
if not mbap_header or len(mbap_header) < 7:
print('Клиент разорвал соединение')
break
# Разбор заголовка MBAP
(transaction_id, protocol_id, length, unit_id) = struct.unpack('>HHHB', mbap_header)
# Проверка валидности Modbus TCP пакета
if protocol_id != 0:
print(f"Неверный ID протокола: {protocol_id}")
break
# Получение кода функции и данных
data = await reader.read(length - 1)
if not data:
break
function_code = data[0]
print(f"Получен запрос Modbus, функция: {function_code}, адрес устройства: {unit_id}")
# Обработка запроса в зависимости от кода функции
response = modbus_handler.process_request(function_code, data[1:], unit_id)
# Создание заголовка MBAP для ответа
response_length = len(response) + 1 # +1 для unit_id
mbap_response = struct.pack('>HHHB', transaction_id, protocol_id, response_length, unit_id)
# Отправка полного ответа
await writer.awrite(mbap_response + response)
except Exception as e:
print('Ошибка сервера {} ({})'.format(e, type(e)))
break
await asyncio.sleep_ms(50)
await writer.aclose()
print('Соединение закрыто')
# Запуск сервера
async def start(host='0.0.0.0', port=502): # Стандартный порт Modbus TCP - 502
loop = asyncio.get_event_loop()
loop.create_task(asyncio.start_server(handle, host, port))
print(f"Modbus TCP сервер запущен на {host}:{port}")
async def run():
rfid = RFID()
BUZZ.beeps()
dst_ip = settings.get('rfid-dest')
dst_port = settings.get('rfid-i-dport')
format_str = settings.get('rfid-format')
wdt_en = settings.get('sys-i-wdt', False)
wdt = None
await start()
if wdt_en:
wdt = WDT(timeout=10000)
while 1:
if wdt_en:
wdt.feed()
uid = rfid.read(fmt=format_str)
if uid:
uid_to_registers(uid, MODBUS_REGISTERS)
await asyncio.sleep_ms(250)