当前位置:   article > 正文

python如何用pymodbus库进行modbus tcp通信_用python编写变频器和电脑用modbus通讯控制

用python编写变频器和电脑用modbus通讯控制

这篇文章主要介绍了python如何用pymodbus库进行modbus tcp通信问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

使用pymodbus库进行modbus tcp通信
使用python解决工业通信问题是一个非常好的选择,python具有丰富的生态,可以轻松解决工业通信的各种问题。

本篇主要介绍使用pymodbus库进行modbus tcp仿真,实现pc端读取plc或工业设备modbus变量。

安装pymodbus:

pip install -U pymodbus

创建modbus tcp server
这里我们先创建一个虚拟的modbus设备,如果你手里有一个plc或者工业设备,可以直接跳过本节。

modbus_server.py

  1. '''
  2. * @Author: liuzhao
  3. * @Last Modified time: 2022-10-05 09:56:13
  4. '''
  5. from pymodbus.server.sync import (
  6. StartTcpServer,
  7. )
  8. from pymodbus.datastore import (
  9. ModbusSequentialDataBlock,
  10. ModbusServerContext,
  11. ModbusSlaveContext,
  12. )
  13. from pymodbus.version import version
  14. datablock = ModbusSequentialDataBlock.create()
  15. context = ModbusSlaveContext(
  16. di=datablock,
  17. co=datablock,
  18. hr=datablock,
  19. ir=datablock,
  20. )
  21. single = True
  22. # Build data storage
  23. store = ModbusServerContext(slaves=context, single=single)
  24. if __name__ == '__main__':
  25. address = ("0.0.0.0", 503)
  26. StartTcpServer(
  27. context=store, # Data storage
  28. address=address, # listen address
  29. allow_reuse_address=True, # allow the reuse of an address
  30. )

直接运行该脚本,就可以在本机的503端口创建一台modbus设备了,具体实现暂不深追,我们学习的重点是客户端对modbus变量的读写。

读写modbus变量

modbus变量类型以及地址

coil是线圈,Discrete input是数字量输入,Input register是模拟量输入,Holding register是保持寄存器。一般地址范围是0-65535

读取常规变量
读写线圈 | 读取输入变量 | 读写保持寄存器

  1. from pymodbus.client.sync import ModbusTcpClient
  2. from pymodbus.bit_read_message import ReadCoilsResponse
  3. from pymodbus.register_read_message import ReadInputRegistersResponse
  4. from pymodbus.exceptions import ConnectionException # 连接失败,用于异常处理
  5. host = '127.0.0.1'
  6. port = 503
  7. client = ModbusTcpClient(host,port)
  8. # 写入线圈
  9. client.write_coil(1, True)
  10. client.write_coil(2, False)
  11. client.write_coil(3, True)
  12. # 读取线圈 注意对于离散量的读取,第二个参数cout是有坑的,必须为8的倍数个
  13. result:ReadCoilsResponse = client.read_coils(address=1,cout=8) # 从地址1开始读,读取8个线圈,一次读8的倍数个线圈,不设置为8的倍数可能会出现问题
  14. print(result.isError())
  15. # 不建议使用
  16. print(result.getBit(7)) # 这里的参数address不是plc里的地址,而是python列表的address
  17. print('read_coils ')
  18. # 建议使用
  19. print(result.bits) # 打印读取结果,一共8
  20. # 读取其中的位
  21. print(
  22. result.bits[0],
  23. result.bits[1],
  24. result.bits[2]
  25. ) # 相当于result.getBit(0)
  26. # 读取数字输入
  27. result = client.read_discrete_inputs(address=10001,count=8) # 从10001开始读,读取8
  28. print(result.bits)
  29. # 读取模拟输入寄存器
  30. input_register_result:ReadInputRegistersResponse = client.read_input_registers(1,count=8)
  31. # print(f'is_error:{input_register_result.isError()}')
  32. print('read_input_registers ')
  33. print(input_register_result.registers)
  34. print(input_register_result.getRegister(0))
  35. # 读写保持寄存器
  36. client.write_register(address=40001,value=100)
  37. result:ReadInputRegistersResponse = client.read_holding_registers(address=40001,count=1)
  38. print('read_holding_registers ')
  39. print(result.registers)
  40. # 关闭连接
  41. client.close()

读取复杂变量
字符串、浮点数、负数等

这里需要注意modbus设备的存储结构是低位低字节还是低位高字节,也就是设备内存的字节、字的排列顺序。

根据不同的设备,对照下表调整正确的组合方式。

  1. # 复杂数据类型
  2. from collections import OrderedDict
  3. import logging
  4. from pymodbus.client.sync import ModbusTcpClient as ModbusClient
  5. from pymodbus.constants import Endian
  6. from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
  7. ORDER_DICT = {"<": "LITTLE", ">": "BIG"}
  8. def run_binary_payload_client(host:str,port:int):
  9. for word_endian, byte_endian in (
  10. (Endian.Big, Endian.Big),
  11. (Endian.Big, Endian.Little),
  12. (Endian.Little, Endian.Big),
  13. (Endian.Little, Endian.Little),
  14. ):
  15. print("-" * 60)
  16. print(f"Word Order: {ORDER_DICT[word_endian]}")
  17. print(f"Byte Order: {ORDER_DICT[byte_endian]}")
  18. print()
  19. builder = BinaryPayloadBuilder(
  20. wordorder=word_endian,
  21. byteorder=byte_endian,
  22. )
  23. # 写入的变量
  24. my_string = "abcd-efgh123345765432"
  25. builder.add_string(my_string)
  26. builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])
  27. builder.add_8bit_int(-0x12)
  28. builder.add_8bit_uint(0x12)
  29. builder.add_16bit_int(-0x5678)
  30. builder.add_16bit_uint(0x1234)
  31. builder.add_32bit_int(-0x1234)
  32. builder.add_32bit_uint(0x12345678)
  33. builder.add_16bit_float(12.34)
  34. builder.add_16bit_float(-12.34)
  35. builder.add_32bit_float(22.34)
  36. builder.add_32bit_float(-22.34)
  37. builder.add_64bit_int(-0xDEADBEEF)
  38. builder.add_64bit_uint(0x12345678DEADBEEF)
  39. builder.add_64bit_uint(0x12345678DEADBEEF)
  40. builder.add_64bit_float(123.45)
  41. builder.add_64bit_float(-123.45)
  42. registers = builder.to_registers()
  43. print("Writing Registers:")
  44. print(registers)
  45. print("\n")
  46. payload = builder.build()
  47. address = 40001 # 从40001开始写入
  48. # We can write registers
  49. client.write_registers(address, registers, unit=1) # 写入
  50. # 读取复杂变量
  51. print("Reading Registers:")
  52. address = 40001
  53. count = len(payload)
  54. print(f"payload_len {count}")
  55. result = client.read_holding_registers(address, count, slave=1)
  56. print(result.registers)
  57. print("\n")
  58. decoder = BinaryPayloadDecoder.fromRegisters(
  59. result.registers, byteorder=byte_endian, wordorder=word_endian
  60. )
  61. # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder
  62. assert (
  63. decoder._byteorder == builder._byteorder # pylint: disable=protected-access
  64. ) # nosec
  65. assert (
  66. decoder._wordorder == builder._wordorder # pylint: disable=protected-access
  67. ) # nosec
  68. decoded = OrderedDict(
  69. [
  70. ("string", decoder.decode_string(len(my_string))),
  71. ("bits", decoder.decode_bits()),
  72. ("8int", decoder.decode_8bit_int()),
  73. ("8uint", decoder.decode_8bit_uint()),
  74. ("16int", decoder.decode_16bit_int()),
  75. ("16uint", decoder.decode_16bit_uint()),
  76. ("32int", decoder.decode_32bit_int()),
  77. ("32uint", decoder.decode_32bit_uint()),
  78. ("16float", decoder.decode_16bit_float()),
  79. ("16float2", decoder.decode_16bit_float()),
  80. ("32float", decoder.decode_32bit_float()),
  81. ("32float2", decoder.decode_32bit_float()),
  82. ("64int", decoder.decode_64bit_int()),
  83. ("64uint", decoder.decode_64bit_uint()),
  84. ("ignore", decoder.skip_bytes(8)),
  85. ("64float", decoder.decode_64bit_float()),
  86. ("64float2", decoder.decode_64bit_float()),
  87. ]
  88. )
  89. print("Decoded Data")
  90. for name, value in iter(decoded.items()):
  91. print(
  92. "%s\t" % name, # pylint: disable=consider-using-f-string
  93. hex(value) if isinstance(value, int) else value,
  94. )
  95. print("\n")
  96. # 关闭连接
  97. client.close()
  98. if __name__ == "__main__":
  99. run_binary_payload_client("127.0.0.1", 503)

总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持vb.net教程C#教程python教程SQL教程access 2010教程xin3721自学网
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/991493
推荐阅读
相关标签
  

闽ICP备14008679号