当前位置:   article > 正文

Pyside6多线程实现对linux主机进行巡检_pyside6 linux

pyside6 linux

在制作巡检工具页面时,在执行处理时,出现界面无影响。

pyside6多线程Qthread的实现有两种方法:

        方法一:实例化一个thread类,继承Qthread类,通过重写Qthread的run方法来实现

        方法二:通过继承Qobject类,通过movetothread将需要处理的函数加入到一个新的子线程中进行处理。

如果不使用多线程,在程序点击开始按钮后,程序会处于无影响的状态,直至所有的巡检任务执行完后,将数据返回至主界面才会解除。为了避免这种情况的发生,必须需要使用多线程进行处理。使用多线程的核心代码如下:

  1. self.my_thread = QThread() # 实例化一个子线程
  2. self.obj = MyObject() # 实例化子线程巡检
  3. self.obj.moveToThread(self.my_thread) # 将子线程移至子线程中处理
  4. self.my_thread.started.connect(self.obj.ssh_to_host)
  5. self.my_thread.start() # 启动子线程

实现步骤: 

  1. 实例化一个线程self.my_thread
  2. 实例化执行任务类MyObject(),MyObject()为执行巡检任务工作的类
  3. 将执行任务的实例移至子线程中
  4. 将新产生的线程连接至执行任务的子线程的槽函数
  5. 启动子线程

程序实现思路:

程序通过两个类来实现,一个类用于页面显示(MainForm),一个类用于处理巡检任务功能(MyObject)。页面显示类,将获取的主机信息通过信号host_signal与处理巡检任务的槽函数建立连接,并将信息发送过去;巡检任务类接收信息并处理完后,将命令执行结果通过信号(update_signal,end_signal)传回显示在文本框中。

程序完整代码如下:

  1. # 测试多线程moveToThread 动态传递参数
  2. import sys
  3. import time
  4. from PySide6.QtCore import *
  5. from PySide6.QtGui import *
  6. from PySide6.QtWidgets import *
  7. import paramiko
  8. # 任务处理类
  9. class MyObject(QObject):
  10. update_signal = Signal(str) # 发送执行命令结果给主进程用于返回显示至界面
  11. end_signal = Signal(str) # 处理完任务发送信息
  12. def __init__(self,parent=None):
  13. super(MyObject, self).__init__(parent)
  14. self.ip = None
  15. # 执行巡检
  16. def ssh_to_host(self):
  17. print('执行任务中的Ip:',self.ip)
  18. for i in self.ip:
  19. try:
  20. self.trans = paramiko.Transport((i, 22)) # 使用Transport方式连接
  21. self.trans.start_client(timeout=0.5)
  22. # paramiko.util.log_to_file('paramiko-log.log') # 记录执行日志
  23. # 用户名密码方式
  24. self.trans.auth_timeout = 5
  25. self.trans.auth_password(username='root', password='123456', fallback=True)
  26. except Exception as e:
  27. print('连接错误:', e)
  28. else:
  29. print('连接主机:{}:{} ---> 正常'.format('host-70', i))
  30. # 打开一个通道
  31. self.channel = self.trans.open_session()
  32. self.channel.settimeout(100)
  33. # 获取一个终端
  34. self.channel.get_pty()
  35. # 激活器
  36. self.channel.invoke_shell()
  37. # 根据配置文件定义command项执行脚本
  38. cmd_file = (('date\r\nhostname\r\nuname\r\nifconfig',),)
  39. # print('cmd_file', cmd_file)
  40. if len(cmd_file) > 0:
  41. # print('cmd_file',cmd_file)
  42. single_cmd = cmd_file[0][0].split('\r\n') # 提取配置文件中脚本命令
  43. # print('single_cmd',single_cmd)
  44. for c in single_cmd: # 遍历每个命令
  45. # print('命令c:', c,type(c))
  46. # 发送要执行的命令
  47. time.sleep(1)
  48. self.channel.send(c + '\n') # 在每一个命令后加上换行
  49. # self.channel.send(c) # 在每一个命令后加上换行
  50. end_symbol = ('# ', '$ ', '$', '> ', '>') # 设置我们定义的结束符
  51. # 将命令执行结果保存到display_result
  52. display_result = ''
  53. # # 回显很长的命令可能执行较久,通过循环分批次取回回显
  54. time.sleep(0.1)
  55. while True:
  56. result = self.channel.recv(256)
  57. try:
  58. result = result.decode('utf-8')
  59. # logging.warning('使用UTF-8编码!')
  60. except:
  61. result = result.decode('gb18030')
  62. # logging.warning('使用gb18030编码!')
  63. display_result += result # 输出到日志显示窗口
  64. if result.endswith(end_symbol):
  65. break
  66. self.update_signal.emit(display_result) # 发送命令返回至主窗口结果
  67. print()
  68. print('=' * 80)
  69. else:
  70. print('没有配置相关命令!,请配置检查脚本命令后再操作!!')
  71. return
  72. finally:
  73. self.channel.close()
  74. self.trans.close()
  75. self.end_signal.emit('断开SSH连接') # 发送巡检命令执行完信号
  76. # 接收主界面线程发送的主机IP信息
  77. def accpet_hostsinfo(self,ip):
  78. self.ip = ip
  79. class MainForm(QWidget):
  80. host_signal = Signal(list) # 定义主机信息信号
  81. def __init__(self, parent=None):
  82. super(MainForm, self).__init__(parent)
  83. self.setWindowTitle('多线程测试-定时发送消息')
  84. self.resize(800, 600)
  85. layout = QVBoxLayout()
  86. self.start_btu = QPushButton('开始')
  87. layout.addWidget(self.start_btu)
  88. self.text = QTextEdit(self)
  89. layout.addWidget(self.text)
  90. self.setLayout(layout)
  91. self.ip = None
  92. self.my_thread = QThread() # 实例化一个子线程
  93. self.start_btu.clicked.connect(self.send_hosts_to_child) # 将开始按钮的点击信号连接至发送主机信息给子线程的槽函数self.send_hosts_to_child
  94. self.start_btu.clicked.connect(self.do_worker) # 将开始按钮的点击事件信号连接至执行任务的槽函数
  95. # 执行巡检任务命令发送
  96. def do_worker(self):
  97. print('开始运行程序。。')
  98. print('当前线程:', QThread.currentThread(), self.my_thread.isRunning())
  99. self.text.clear()
  100. self.obj = MyObject() # 实例化子线程巡检
  101. self.host_signal.connect(self.obj.accpet_hostsinfo(self.ip)) # 主机信号连接至任务处理的子线程获取主机信息槽函数
  102. self.obj.update_signal.connect(self.update_text) # 任务处理子线程显示巡检结果信号连接至主线程更新槽函数self.update_text)
  103. self.obj.end_signal.connect(self.stop) # 任务子线程执行巡检结果完毕信号连接至主线程关闭子线程槽函数self.stop
  104. self.obj.moveToThread(self.my_thread) # 将子线程移至子线程中处理
  105. self.my_thread.started.connect(self.obj.ssh_to_host)
  106. self.my_thread.start() # 启动子线程
  107. # 传递主机IP信息给任务处理子线程
  108. def send_hosts_to_child(self):
  109. self.ip = ['192.168.1.70','192.168.1.61']
  110. self.host_signal.emit(self.ip) # 发送主机信息self.ip
  111. # 更新文本框显示内容
  112. def update_text(self,text):
  113. cursor = self.text.textCursor()
  114. self.text.moveCursor(cursor.End) # 将光标移动到最后
  115. self.text.insertPlainText(text) # 插入文本
  116. # 关闭线程
  117. def stop(self):
  118. print('关闭当前线程')
  119. self.my_thread.quit() # 退出子线程
  120. self.my_thread.wait()
  121. if __name__ == '__main__':
  122. app = QApplication(sys.argv)
  123. win = MainForm()
  124. win.show()
  125. sys.exit(app.exec())

以上是巡检任务部分的核心功能,基本功能已经实现,其它优化即可。

 

注:在子进程执行完后,需要手动退出线程,否则再次点击运行时,不能执行,将异常退出。

  1. # 关闭线程
  2. def stop(self):
  3. print('关闭当前线程')
  4. self.my_thread.quit() # 退出子线程
  5. self.my_thread.wait()
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/854097?site
推荐阅读
相关标签
  

闽ICP备14008679号