云计算百科
云计算领域专业知识百科平台

ModbusLink 服务器使用说明

📚 ModbusLink 服务器使用说明

本文档详细介绍了 ModbusLink 软件包中服务器组件的使用方法。ModbusLink 的服务器基于异步 IO (asyncio) 构建,能够高效地处理并发连接(TCP)或快速响应串口请求(RTU/ASCII),非常适合用于开发虚拟从站设备、硬件仿真器或网关应用。


1. 核心概念:数据存储 (ModbusDataStore)

所有 Modbus 服务器的核心都是 ModbusDataStore。它代表了设备的内存映射,负责存储线圈、离散输入和寄存器的值。服务器接收到客户端请求后,会自动读写这个对象中的数据。

1.1 初始化与内存区域

在创建服务器之前,必须先实例化数据存储,并定义各内存区域的大小。

from modbuslink import ModbusDataStore

# 初始化数据存储,定义四个标准 Modbus 区域的大小
store = ModbusDataStore(
coils_size=1000, # 0x: 线圈 (Read/Write)
discrete_inputs_size=1000, # 1x: 离散输入 (Read Only)
holding_registers_size=1000, # 4x: 保持寄存器 (Read/Write)
input_registers_size=1000 # 3x: 输入寄存器 (Read Only)
)

1.2 内部读写操作

服务器程序本身可以通过以下方法初始化或更新数据(例如模拟传感器变化)。

区域写入方法读取方法参数说明
线圈 write_coils read_coils address: 起始地址values: List[bool] / count
离散输入 write_discrete_inputs read_discrete_inputs 同上
保持寄存器 write_holding_registers read_holding_registers address: 起始地址values: List[int] / count
输入寄存器 write_input_registers read_input_registers 同上

代码示例:

# 初始化地址 0 开始的 5 个保持寄存器
store.write_holding_registers(address=0, values=[10, 20, 30, 40, 50])

# 读取地址 2 开始的 2 个输入寄存器
data = store.read_input_registers(address=2, count=2)

# 设置地址 5 的线圈为 True
store.write_coils(address=5, values=[True])

1.3 数据变更回调

当外部客户端(或其他内部逻辑)修改了数据存储中的值时,您可以注册回调函数来接收通知。这对于实现控制逻辑非常有用(例如:当客户端写入"启动"线圈时,服务器执行实际的硬件操作)。

# 定义回调函数
def on_coil_change(address, values):
print(f"警告:客户端修改了线圈 @ {address}, 新值: {values}")

# 注册回调
# 区域键名: 'coils', 'discrete_inputs', 'holding_registers', 'input_registers'
store.add_callback('coils', on_coil_change)


2. TCP 服务器 (AsyncTcpModbusServer)

创建一个监听 TCP 端口的 Modbus 服务器。支持多客户端同时连接。

初始化与运行

import asyncio
from modbuslink import AsyncTcpModbusServer, ModbusDataStore

async def main():
# 1. 准备数据存储
store = ModbusDataStore(holding_registers_size=100)
store.write_holding_registers(0, [123, 456])

# 2. 配置并创建服务器
server = AsyncTcpModbusServer(
host='0.0.0.0', # 监听所有网卡
port=502, # Modbus TCP 标准端口
data_store=store, # 关联数据存储
slave_id=1 # 响应的单元标识符 (Unit ID)
)

print("TCP 服务器启动中…")

# 3. 启动服务器
await server.start()

# 4. 持续运行
# serve_forever() 会保持运行直到被取消
try:
await server.serve_forever()
finally:
await server.stop()

if __name__ == "__main__":
asyncio.run(main())

特有方法:

  • get_connected_clients_count(): 获取当前连接的 TCP 客户端数量。

3. RTU 服务器 (AsyncRtuModbusServer)

创建一个通过串口通信的 Modbus RTU 从站。

初始化与运行

import asyncio
from modbuslink import AsyncRtuModbusServer, ModbusDataStore

async def main():
store = ModbusDataStore(coils_size=50)

# 创建 RTU 服务器
server = AsyncRtuModbusServer(
port='COM3', # 串口号 (Linux: '/dev/ttyUSB0')
baudrate=9600, # 波特率
bytesize=8, # 数据位
parity='N', # 校验位 ('N', 'E', 'O')
stopbits=1, # 停止位
data_store=store, # 数据存储
slave_id=1 # 本机作为从站的地址 (非常重要)
)

print(f"RTU 从站 (ID={server.slave_id}) 运行在 {server.port}")

await server.start()
try:
await server.serve_forever()
finally:
await server.stop()

if __name__ == "__main__":
asyncio.run(main())

参数说明:

  • slave_id: RTU 模式下,服务器只会响应请求中从站 ID 与此参数匹配的数据包。

4. ASCII 服务器 (AsyncAsciiModbusServer)

创建一个 Modbus ASCII 从站。通常用于老旧设备或特定的文本传输场景。

初始化与运行

import asyncio
from modbuslink import AsyncAsciiModbusServer, ModbusDataStore

async def main():
store = ModbusDataStore(input_registers_size=50)

# 标准 ASCII 设置通常是 7位数据位, 偶校验
server = AsyncAsciiModbusServer(
port='COM4',
baudrate=9600,
bytesize=7, # ASCII 标准通常为 7
parity='E', # ASCII 标准通常为 Even
stopbits=1,
data_store=store,
slave_id=1
)

print("ASCII 服务器运行中…")

await server.start()
try:
await server.serve_forever()
finally:
await server.stop()

if __name__ == "__main__":
asyncio.run(main())


5. 高级功能:动态模拟传感器数据

Modbus 服务器通常不仅仅是静态存储,还需要模拟传感器数据的实时变化。利用 asyncio,我们可以轻松创建一个后台任务来更新数据。

完整示例:动态 TCP 服务器

以下代码演示了如何创建一个随时间自动更新数据的 TCP 服务器,并监控服务器状态。

import asyncio
import random
from modbuslink import AsyncTcpModbusServer, ModbusDataStore

async def simulate_sensor(store: ModbusDataStore):
"""后台任务:模拟温度传感器数据波动"""
print("传感器模拟任务已启动…")
address = 0
while True:
# 生成 200 到 300 之间的随机数
new_value = random.randint(200, 300)

# 更新输入寄存器 (Input Registers, 3x区)
store.write_input_registers(address, [new_value])

# 也可以同时更新线圈状态
store.write_coils(0, [random.choice([True, False])])

# 每秒更新一次
await asyncio.sleep(1.0)

async def main():
# 1. 创建数据存储
store = ModbusDataStore(
input_registers_size=10,
coils_size=10
)

# 2. 创建服务器
server = AsyncTcpModbusServer(
host='127.0.0.1',
port=502,
data_store=store
)

# 3. 启动服务器
await server.start()

# 4. 创建并发任务
# 使用 asyncio.gather 同时运行服务器主循环和模拟任务
try:
await asyncio.gather(
server.serve_forever(), # 处理客户端请求
simulate_sensor(store) # 后台更新数据
)
except asyncio.CancelledError:
print("任务取消")
finally:
await server.stop()

if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("服务器已停止")

GitHub地址

Miraitowa-la/ModbusLink

赞(0)
未经允许不得转载:网硕互联帮助中心 » ModbusLink 服务器使用说明
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!