local rfid = require("rfid")
local indication = require("indication")
local PORT = 502
local wdt_on = Settings.get("sys_wdt") == "on"
local snd = indication.Sound.new()
local leds = indication.Leds.new({RGB=true})
local mtx = thread.createmutex()
local holding_registers = {0, 0, 0, 0, 0, 0}
local function pad_uid(uid)
return string.format("%016s", uid):gsub(" ", "0")
end
local function split_into_words_32(number)
return (number >> 16) & 0xFFFF, number & 0xFFFF
end
local function split_uid_into_words(uid)
return tonumber(string.sub(uid, 1, 4), 16) or 0,
tonumber(string.sub(uid, 5, 8), 16) or 0,
tonumber(string.sub(uid, 9, 12), 16) or 0,
tonumber(string.sub(uid, 13, 16), 16) or 0
end
local function to_holding_registers(regs, ts, uid)
regs[1], regs[2] = split_into_words_32(ts)
regs[3], regs[4], regs[5], regs[6] = split_uid_into_words(uid)
end
thread.start(function()
-- определяем классы
reader = rfid.Reader({})
-- стартовая индикация
leds:start()
snd:start()
-- запускаем
reader.process({
timeout_ms = 200, -- таймаут в цикле; всегда должен быть больше 0
mode = rfid.MODE_LOOP,
wdt = wdt_on,
halt = true,
checkfunc = function(uid) -- функция для проверки метки
mtx:lock()
to_holding_registers(holding_registers, os.time(), pad_uid(uid))
mtx:unlock()
leds:ok() -- индикация
snd:ok() -- звук
end
})
end)
--tcp modbus server
local function to_bytes(value)
return string.pack(">I2", value)
end
local function from_bytes(b1, b2)
return b1*256 + b2
end
local function build_mbap_header(trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, length)
return string.pack(">BBBBH", trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, length)
end
local function send_response(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, pdu)
local response = string.pack(">HHHBc"..#pdu, trans_id_hi * 256 + trans_id_lo, protocol_id_hi * 256 + protocol_id_lo, #pdu + 1, unit_id, pdu)
client:send(response)
end
local function send_error(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, func_code, error_code)
local error_func_code = string.char(func_code + 0x80)
local error_pdu = error_func_code .. string.char(error_code)
send_response(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, error_pdu)
end
local server = assert(socket.bind("*", PORT))
--server:settimeout(0)
cacheutils.addLog(log, "Modbus TCP устройство запущено на порту " .. PORT, cacheutils.Info)
while true do
local client = server:accept()
if client then
-- client:settimeout(10)
-- Читаем MBAP Header (7 байт)
local header, err = client:receive(7)
if header then
local trans_id_hi, trans_id_lo,
protocol_id_hi, protocol_id_lo,
length_hi, length_lo,
unit_id = header:byte(1, 7)
local length = from_bytes(length_hi, length_lo)
-- Длина включает в себя байты Unit ID и следующие за ним байты.
-- Значит, нам нужно прочитать (length - 1) байт, чтобы получить PDU
local pdu_len = length - 1
local pdu, err = client:receive(pdu_len)
if pdu then
local fcode = pdu:byte(1)
if fcode == 3 then
-- Чтение holding регистров
-- Формат запроса: Function Code (1 байт), Start Address Hi, Start Address Lo, Quantity Hi, Quantity Lo
local start_hi, start_lo, qty_hi, qty_lo = pdu:byte(2, 5)
local start_addr = from_bytes(start_hi, start_lo) + 1 -- +1, т.к. Lua-индексация с 1
local quantity = from_bytes(qty_hi, qty_lo)
-- Проверяем, не выходит ли за пределы массива регистров
if start_addr + quantity - 1 <= #holding_registers then
local byte_count = quantity * 2
mtx:lock() -- Блокируем доступ к holding_registers для безопасного чтения
local format = ">BB" .. string.rep("H", quantity)
-- Упаковываем Function Code, Byte Count и данные регистров в одну строку
local response_pdu = string.pack(format, fcode, byte_count, table.unpack(holding_registers, start_addr, start_addr + quantity -1))
mtx:unlock()
send_response(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, response_pdu)
else
-- Если запрос выходит за пределы, возвращаем ошибку Modbus
send_error(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, fcode, 0x02)
end
else
-- Неподдерживаемая функция
send_error(client, trans_id_hi, trans_id_lo, protocol_id_hi, protocol_id_lo, unit_id, fcode, 0x01)
end
end
end
client:close()
end
socket.sleep(0.01)
end
from pymodbus.client import ModbusTcpClient
# Конфигурация подключения
HOST = "192.168.8.227"
PORT = 502
try:
# Используем контекстный менеджер для автоматического закрытия соединения
with ModbusTcpClient(HOST, port=PORT) as client:
if not client.connect():
raise Exception("Не удалось подключиться к устройству")
# Считываем 6 регистров, начиная с адреса 0
start_address = 0
count = 6
response = client.read_holding_registers(address=start_address, count=count)
# Проверяем ответ
if response is not None and not response.isError():
print("Считанные регистры:", response.registers)
else:
print("Ошибка при чтении:", response if response else "Нет ответа от устройства")
except Exception as e:
print(f"Произошла ошибка: {e}")
from gc import collect
import uasyncio as asyncio
import struct
from machine import Pin
import sys
from os import uname
from time import time
from rfid_utils import RFID, BUZZ, GLED, CardFilter
from machine import WDT
from config import settings
# Коды функций Modbus
READ_HOLDING_REGISTERS = 0x03
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_REGISTERS = 0x10
# Коды ошибок
ILLEGAL_FUNCTION = 0x01
ILLEGAL_DATA_ADDRESS = 0x02
ILLEGAL_DATA_VALUE = 0x03
SERVER_DEVICE_FAILURE = 0x04
MODBUS_REGISTERS = [0, 0, 0, 0, 0, 0, 0, 0]
def uid_to_registers(uid_str, registers):
# Проверка формата UID (должен быть 8 символов для 4 байт или 14 символов для 7 байт)
if len(uid_str) not in [8, 14]:
raise ValueError("UID должен быть 4 или 7 байт (8 или 14 символов в hex)")
# Преобразование строки в байты вручную для MicroPython
uid_bytes = bytearray()
for i in range(0, len(uid_str), 2):
if i + 1 < len(uid_str):
byte_str = uid_str[i:i+2]
try:
byte_val = int(byte_str, 16)
uid_bytes.append(byte_val)
except ValueError:
raise ValueError("UID должен содержать только шестнадцатеричные символы")
# Если UID 7-байтный, добавляем нулевой байт для выравнивания до четного числа
if len(uid_bytes) == 7:
uid_bytes.append(0)
# Заполняем регистры (каждый регистр - 2 байта)
for i in range(0, len(uid_bytes), 2):
reg_index = i // 2
if reg_index < len(registers):
# В формате big-endian (старший байт первый)
registers[reg_index] = (uid_bytes[i] << 8) | uid_bytes[i + 1]
return registers
# Класс для обработки Modbus запросов
class ModbusHandler:
def __init__(self, holding_registers):
self.holding_registers = holding_registers
def toggle_led(self):
if self.led:
self.led.value(not self.led.value())
def process_request(self, function_code, data, unit_id):
"""Обработка Modbus запроса и генерация ответа"""
if function_code == READ_HOLDING_REGISTERS:
return self.handle_read_holding_registers(data)
elif function_code == WRITE_SINGLE_REGISTER:
return self.handle_write_single_register(data)
elif function_code == WRITE_MULTIPLE_REGISTERS:
return self.handle_write_multiple_registers(data)
else:
# Неподдерживаемый код функции
return struct.pack('BB', function_code | 0x80, ILLEGAL_FUNCTION)
def handle_read_holding_registers(self, data):
"""Обработка функции Modbus 03 (Чтение регистров хранения)"""
starting_address, quantity = struct.unpack('>HH', data)
# Проверка диапазона адресов
if starting_address + quantity > len(self.holding_registers):
return struct.pack('BB', READ_HOLDING_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
# Подготовка ответа
response = bytearray()
response.append(READ_HOLDING_REGISTERS) # Код функции
response.append(quantity * 2) # Количество байт
# Добавление значений регистров
for i in range(quantity):
register_value = self.holding_registers[starting_address + i]
response.extend(struct.pack('>H', register_value))
return response
def handle_write_single_register(self, data):
"""Обработка функции Modbus 06 (Запись в один регистр)"""
address, value = struct.unpack('>HH', data)
# Проверка адреса
if address >= len(self.holding_registers):
return struct.pack('BB', WRITE_SINGLE_REGISTER | 0x80, ILLEGAL_DATA_ADDRESS)
# Обновление значения регистра
self.holding_registers[address] = value
# Возврат эхо запроса
return struct.pack('BHH', WRITE_SINGLE_REGISTER, address, value)
def handle_write_multiple_registers(self, data):
"""Обработка функции Modbus 16 (Запись в несколько регистров)"""
if len(data) < 5: # Необходимо минимум 5 байт для этой функции
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
starting_address, quantity, byte_count = struct.unpack('>HHB', data[:5])
# Проверка диапазона адресов
if starting_address + quantity > len(self.holding_registers):
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_ADDRESS)
# Проверка количества (1-123 регистров)
if quantity < 1 or quantity > 123 or byte_count != quantity * 2:
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
# Проверка достаточности данных
if len(data) < 5 + byte_count:
return struct.pack('BB', WRITE_MULTIPLE_REGISTERS | 0x80, ILLEGAL_DATA_VALUE)
# Обновление значений регистров
for i in range(quantity):
value = struct.unpack('>H', data[5 + i * 2:7 + i * 2])[0]
self.holding_registers[starting_address + i] = value
# Подготовка ответа
return struct.pack('>BHH', WRITE_MULTIPLE_REGISTERS, starting_address, quantity)
# Обработчик клиентских соединений
async def handle(reader, writer):
modbus_handler = ModbusHandler(MODBUS_REGISTERS)
print('Новое подключение Modbus')
while True:
try:
# Получение заголовка Modbus TCP (7 байт)
mbap_header = await reader.read(7)
if not mbap_header or len(mbap_header) < 7:
print('Клиент разорвал соединение')
break
# Разбор заголовка MBAP
(transaction_id, protocol_id, length, unit_id) = struct.unpack('>HHHB', mbap_header)
# Проверка валидности Modbus TCP пакета
if protocol_id != 0:
print(f"Неверный ID протокола: {protocol_id}")
break
# Получение кода функции и данных
data = await reader.read(length - 1)
if not data:
break
function_code = data[0]
print(f"Получен запрос Modbus, функция: {function_code}, адрес устройства: {unit_id}")
# Обработка запроса в зависимости от кода функции
response = modbus_handler.process_request(function_code, data[1:], unit_id)
# Создание заголовка MBAP для ответа
response_length = len(response) + 1 # +1 для unit_id
mbap_response = struct.pack('>HHHB', transaction_id, protocol_id, response_length, unit_id)
# Отправка полного ответа
await writer.awrite(mbap_response + response)
except Exception as e:
print('Ошибка сервера {} ({})'.format(e, type(e)))
break
await asyncio.sleep_ms(50)
await writer.aclose()
print('Соединение закрыто')
# Запуск сервера
async def start(host='0.0.0.0', port=502): # Стандартный порт Modbus TCP - 502
loop = asyncio.get_event_loop()
loop.create_task(asyncio.start_server(handle, host, port))
print(f"Modbus TCP сервер запущен на {host}:{port}")
async def run():
rfid = RFID()
BUZZ.beeps()
dst_ip = settings.get('rfid-dest')
dst_port = settings.get('rfid-i-dport')
format_str = settings.get('rfid-format')
wdt_en = settings.get('sys-i-wdt', False)
wdt = None
await start()
if wdt_en:
wdt = WDT(timeout=10000)
while 1:
if wdt_en:
wdt.feed()
uid = rfid.read(fmt=format_str)
if uid:
uid_to_registers(uid, MODBUS_REGISTERS)
await asyncio.sleep_ms(250)