当前位置:   article > 正文

python写的多进程并发测试框架_python 多进程框架

python 多进程框架

MultiProcessWrapper.py

  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. #############################################################
  4. #teaching wisedom to my machine,please call me Croco#
  5. #############################################################
  6. # graceful_exit_event.py
  7. # UTF-8 without BOM
  8. #
  9. # refer:
  10. # http://stackoverflow.com/questions/26414704/how-does-a-python-process-exit-gracefully-after-receiving-sigterm-while-waiting?rq=1
  11. # http://www.cnblogs.com/kaituorensheng/p/4445418.html
  12. # init created: 2016-07-13
  13. # last updated: 2016-07-14
  14. #
  15. #######################################################################
  16. import os
  17. import signal
  18. import multiprocessing
  19. import sys
  20. from abc import ABCMeta,abstractmethod
  21. import time
  22. import traceback
  23. ###################################################
  24. # WorkerInterface
  25. # 定义了一个worker必备的功能接口
  26. ###################################################
  27. class WorkerInterface(object):
  28. __metaclass__ = ABCMeta
  29. @abstractmethod
  30. def onStart(self):
  31. pass
  32. @abstractmethod
  33. def onEnd(self,end_code,end_reason):
  34. pass
  35. @abstractmethod
  36. def onRunOnce(self):
  37. pass
  38. @abstractmethod
  39. def onTimer(self):
  40. pass
  41. @abstractmethod
  42. def finished(self):
  43. pass
  44. @abstractmethod
  45. def timeout_ms(self):
  46. pass
  47. @abstractmethod
  48. def log(self,txt):
  49. pass
  50. @abstractmethod
  51. def daemon(self):
  52. pass
  53. ###################################################
  54. # SimpleWorker
  55. # 基于WorkerInterface实现一个简单的worker,
  56. # 提供使用示例
  57. ###################################################
  58. class SimpleWorker(WorkerInterface):
  59. def __init__(self):
  60. self.is_finished=False
  61. def onStart(self):
  62. t= "SimpleWorker(%d) start ..." % os.getpid()
  63. self.log(t)
  64. pass
  65. def onEnd(self,end_code,end_reason):
  66. t = "SimpleWorker(%d) end.with reason(%d:%s)" % (os.getpid(),end_code,end_reason)
  67. self.log(t)
  68. pass
  69. def onRunOnce(self):
  70. t = "SimpleWorker(%d) onRunOnce" % (os.getpid())
  71. self.log(t)
  72. self.is_finished=True
  73. pass
  74. def onTimer(self):
  75. t = "SimpleWorker(%d) onTimer" % (os.getpid())
  76. self.log(t)
  77. pass
  78. def finished(self):
  79. return self.is_finished
  80. def done(self):
  81. self.is_finished=True
  82. def timeout_ms(self):
  83. return 3000
  84. def log(self,txt):
  85. print "=>", txt
  86. pass
  87. def daemon(self):
  88. return True
  89. ###################################################
  90. # GracefulExitException
  91. #
  92. ###################################################
  93. class GracefulExitException(Exception):
  94. @staticmethod
  95. def sigterm_handler(signum, frame):
  96. raise GracefulExitException()
  97. pass
  98. ###################################################
  99. # GracefulExitException
  100. #
  101. ###################################################
  102. class GracefulExitEvent(object):
  103. def __init__(self):
  104. self.workers = []
  105. self.exit_event = multiprocessing.Event()
  106. # Use signal handler to throw exception which can be caught
  107. # by worker process to allow graceful exit.
  108. signal.signal(signal.SIGTERM, GracefulExitException.sigterm_handler)
  109. pass
  110. def reg_worker(self, wp):
  111. self.workers.append(wp)
  112. pass
  113. def is_stop(self):
  114. return self.exit_event.is_set()
  115. def notify_stop(self):
  116. self.exit_event.set()
  117. def wait_all(self):
  118. while True:
  119. try:
  120. for wp in self.workers:
  121. print "main process({0}) observe child status=>name:{1} pid:{2} is_alive:{3}".format(os.getpid(), wp.name,wp.pid,wp.is_alive(),)
  122. wp.join()
  123. print "main process(%d) exit." % os.getpid()
  124. break
  125. except GracefulExitException:
  126. self.notify_stop()
  127. print "main process(%d) got GracefulExitException." % os.getpid()
  128. except Exception, ex:
  129. self.notify_stop()
  130. print "main process(%d) got unexpected Exception: %r" % (os.getpid(), ex)
  131. break
  132. pass
  133. #######################################################################
  134. def worker_proc(gee,worker_interface=SimpleWorker()):
  135. try:
  136. worker_interface.onStart()
  137. last = int(time.time()*1000)
  138. while not gee.is_stop():
  139. worker_interface.onRunOnce()
  140. if worker_interface.finished():
  141. worker_interface.onEnd(0, "finished")
  142. break
  143. current = int( time.time() * 1000)
  144. if current >= last+worker_interface.timeout_ms():
  145. worker_interface.onTimer()
  146. last=current
  147. else:
  148. worker_interface.onEnd(1,"got parent exit notify")
  149. except GracefulExitException:
  150. t= "got graceful exit exception"
  151. worker_interface.onEnd(-1,t)
  152. except Exception, ex:
  153. t = "caught unhandle exception\n"
  154. info = sys.exc_info()
  155. for file, lineno, function, text in traceback.extract_tb(info[2]):
  156. t += "{0} line:{1} in function:{2}\n".format(file, lineno, function)
  157. t += text+"\n"
  158. t += "** %s: %s" % info[:2]
  159. t += "\n"
  160. worker_interface.onEnd(-2, t)
  161. finally:
  162. worker_interface.onEnd(2,"exit")
  163. sys.exit(0)
  164. #######################################################################
  165. class MultiProcessWrapper(object):
  166. def __init__(self):
  167. pass
  168. # Start some workers process and run forever
  169. def startAsForver(self,worker_interface_array=list()):
  170. gee = GracefulExitEvent()
  171. for worker_interface in worker_interface_array:
  172. wp = multiprocessing.Process(target=worker_proc, args=(gee,worker_interface))
  173. wp.daemon=worker_interface.daemon()
  174. wp.start()
  175. gee.reg_worker(wp)
  176. gee.wait_all()
  177. if __name__ == "__main__":
  178. print "main process(%d) start" % os.getpid()
  179. # p=MultiProcessWrapper()
  180. # p.startAsForver(SimpleWorker() )
  181. # print "main process(%d) end" % (os.getpid())


TestHttpMain.py
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. #############################################################
  4. # teaching wisedom to my machine,please call me Croco#
  5. # 这是一个多进程并发的压力测试程序框架
  6. # 注意不要把进程数设置大于10000,这容易把机器跑卡死。
  7. #############################################################
  8. import HttpModule
  9. import MultiProcessWrapper
  10. import os
  11. import sys
  12. import time
  13. from multiprocessing.sharedctypes import Array
  14. import ctypes
  15. import socket
  16. socket.setdefaulttimeout(3)
  17. '''
  18. GLOBAL Vars
  19. '''
  20. WORKER_COUNT=3
  21. REQ_MAX_COUNT=1000
  22. USE_MAX_TIME =12000
  23. REQ = 'GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: close\r\nUser-Agent: Python-urllib/2.7\r\n\r\n'
  24. REQ = 'GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: keep-alive\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n\r\n'
  25. REQ = 'GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: keep-alive\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n\r\n'
  26. # req='GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: keep-alive\r\nUser-Agent: Python-urllib/2.7\r\n\r\n'
  27. def current_ms():
  28. return int(time.time() * 1000)
  29. '''
  30. ############################################
  31. class WorkerData
  32. 定义Worker工作结束后的状态数据,该数据
  33. 存储在share mempory ,便于全部worker结束后
  34. main process做汇总
  35. ############################################
  36. '''
  37. class WorkerData(ctypes.Structure):
  38. _fields_ = [('worker_id', ctypes.c_long),
  39. ('reqcount', ctypes.c_long),
  40. ('rspcount', ctypes.c_long),
  41. ('rspokcount', ctypes.c_long),
  42. ('sendbytes', ctypes.c_long),
  43. ('recvbytes', ctypes.c_long),
  44. ('startms', ctypes.c_long),
  45. ('endms', ctypes.c_long),
  46. ]
  47. @staticmethod
  48. def createWorkerDataList(worker_count):
  49. worker_data_list = []
  50. for i in xrange(worker_count):
  51. w = WorkerData()
  52. w.worker_id = -1
  53. w.reqcount = 0
  54. w.rspcount = 0
  55. w.rspokcount = 0
  56. w.startms = 0
  57. w.endms = 0
  58. w.sendbytes = 0
  59. w.recvbytes = 0
  60. worker_data_list.append(w)
  61. return worker_data_list
  62. def normalizeNetIO(total_bytes, total_ms):
  63. if total_ms == 0:
  64. return "error net io"
  65. speed = total_bytes / total_ms
  66. speed = speed * 1000
  67. if speed < 1024:
  68. return "{0}Bytes/s".format(speed)
  69. if 1024 <= speed < 1048576:
  70. return "{0}KB/s".format(speed / 1024)
  71. return "{0}MB/s".format(speed / 1048576)
  72. '''
  73. ############################################
  74. gatherResult()
  75. main process做汇总的函数
  76. ############################################
  77. '''
  78. def gatherResult(g_share_array, worker_count):
  79. print "--> gather result---"
  80. reqcount = 0
  81. rspcount = 0
  82. rspokcount = 0
  83. sendbytes = 0
  84. recvbytes = 0
  85. use_ms = 0
  86. for i in xrange(worker_count):
  87. s = g_share_array[i]
  88. print s.worker_id, s.reqcount, s.rspcount, s.rspokcount, s.startms, s.endms, s.sendbytes, s.recvbytes
  89. reqcount += s.reqcount
  90. rspcount += s.rspcount
  91. rspokcount += s.rspokcount
  92. sendbytes += s.sendbytes
  93. recvbytes += s.recvbytes
  94. tmp_ms = s.endms - s.startms
  95. if tmp_ms > use_ms:
  96. use_ms = tmp_ms
  97. result = "------------------------------------------\n"
  98. result += "worker count:{0}\n".format(worker_count)
  99. result += "total reqcount:{0}\n".format(reqcount)
  100. result += "total rspcount:{0}\n".format(rspcount)
  101. result += "total rspokcount:{0}\n".format(rspokcount)
  102. result += "total use_ms:{0}\n".format(use_ms)
  103. result += "total sendbytes:{0}\n".format(sendbytes)
  104. result += "total recvbytes:{0}\n".format(recvbytes)
  105. tps = 0
  106. if use_ms > 0:
  107. tps = rspcount / use_ms
  108. result += "averge tps:{0}\n".format(tps * 1000)
  109. result += "averge sendspeed:{0}\n".format(normalizeNetIO(sendbytes, use_ms))
  110. result += "averge recvspeed:{0}\n".format(normalizeNetIO(recvbytes, use_ms))
  111. result += "=========================================\n"
  112. print result
  113. with open("result.txt", 'a') as f:
  114. f.write(result)
  115. print "Everying is Finished!"
  116. '''
  117. ############################################
  118. class HttpTestWorker
  119. 实现了WorkerInterface,是一个worker进程实质
  120. 运行的类
  121. ############################################
  122. '''
  123. class HttpTestWorker(MultiProcessWrapper.SimpleWorker):
  124. def __init__(self, worker_id, share_array):
  125. global REQ_MAX_COUNT
  126. self.worker_id = worker_id
  127. self.share_array = share_array
  128. self.is_finished = False
  129. self.reqcount = 0
  130. self.rspcount = 0
  131. self.reqmaxcount = REQ_MAX_COUNT
  132. self.rspokcount = 0
  133. self.sendbytes = 0
  134. self.recvbytes = 0
  135. self.startms = current_ms()
  136. self.rsponsetime = 0
  137. super(HttpTestWorker, self).__init__()
  138. def onStart(self):
  139. t = "HttpTestWorker(%d) start ..." % os.getpid()
  140. self.log(t)
  141. self.buildConnection(addr=("127.0.0.1", 80))
  142. pass
  143. def onEnd(self, end_code, end_reason):
  144. print end_reason
  145. # t = "HttpTestWorker(%d) end.withreason:%d\n%s" % (os.getpid(),end_code,end_reason)
  146. # self.log(t)
  147. # use_ms = current_ms() - self.startms
  148. # print "HttpTestWorker({0}) finishd use_ms:{1}.".format(os.getpid(),use_ms)
  149. if end_code == 2:
  150. self.onExit()
  151. pass
  152. def report(self):
  153. use_ms = current_ms() - self.startms
  154. if use_ms == 0:
  155. return
  156. s_speed = normalizeNetIO(self.sendbytes,use_ms)
  157. r_speed = normalizeNetIO(self.recvbytes,use_ms)
  158. tps = self.rspcount*1000/use_ms
  159. print "report id:{0} req:{1} rsptime:{2} sspeed:{3} rspeed:{4} tps:{5}".format(os.getpid(), self.reqcount, self.rsponsetime,s_speed,r_speed,tps)
  160. def onRunOnce(self):
  161. global REQ
  162. global USE_MAX_TIME
  163. # in_data="zhangtao"
  164. # url="http://127.0.0.1:5000/user/{0}".format(in_data)
  165. url = "http://127.0.0.1"
  166. self.reqcount += 1
  167. # response = HttpModule.doHttpGetRequest(url)
  168. response = self.ioCtrlMockHttpGet(REQ)
  169. # print response
  170. # print len(response)
  171. self.rspcount += 1
  172. if response and len(response) >= 1:
  173. self.rspokcount += 1
  174. if 1 == 0 or self.reqcount >= self.reqmaxcount or current_ms() > self.startms + USE_MAX_TIME:
  175. self.report()
  176. self.done()
  177. if self.reqcount % 1000 == 0:
  178. self.report()
  179. pass
  180. def buildConnection(self, addr=("127.0.0.1", 80)):
  181. self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  182. try:
  183. self.clientSocket.connect(addr)
  184. print "TcpConnector connect succ:", addr
  185. except:
  186. (ErrorType, ErrorValue, ErrorTB) = sys.exc_info()
  187. print "Connect server failed: ", ErrorValue, addr
  188. self.clientSocket.close()
  189. self.clientSocket = None
  190. sys.exit(1)
  191. # 模拟HTTPGET的请求的IOCtrl
  192. def ioCtrlMockHttpGet(self, req=""):
  193. t1 = time.time()
  194. if not self.clientSocket:
  195. print "worker:{0} socketerror,or connection is reset,or cannot connect to server".format(os.getpid())
  196. sys.exit(100)
  197. return ""
  198. ret = self.clientSocket.send(req)
  199. if ret != len(req):
  200. print "worker:{0} senderror".format(os.getpid())
  201. sys.exit(102)
  202. return ""
  203. self.sendbytes += ret
  204. recvbuf = ""
  205. headsize = 0
  206. totalsize = 0
  207. while True:
  208. rsp = self.clientSocket.recv(512)
  209. if len(rsp) == 0:
  210. print "worker:{0} recv zero1".format(os.getpid())
  211. sys.exit(103)
  212. recvbuf += rsp
  213. rsp = recvbuf
  214. p = rsp.rfind("\r\n\r\n")
  215. if p < 0:
  216. continue
  217. headsize = p + 4
  218. p1 = rsp.find("Content-Length: ")
  219. if p1 < 0:
  220. print "worker:{0} cannot find Content-Length,check httpheader".format(os.getpid())
  221. sys.exit(104)
  222. p2 = rsp.find("\r\n", p1 + 16)
  223. if p2 < 0:
  224. print "worker:{0} cannot find Content-Length2,check httpheader".format(os.getpid())
  225. sys.exit(105)
  226. contentlength = rsp[p1 + 16:p2]
  227. totalsize = headsize + int(contentlength)
  228. break
  229. ###########################################
  230. while True:
  231. rsp = self.clientSocket.recv(512)
  232. if len(rsp) == 0:
  233. print "worker:{0} recv zero2".format(os.getpid())
  234. sys.exit(103)
  235. recvbuf += rsp
  236. rsp = recvbuf
  237. if len(rsp) >= totalsize:
  238. break
  239. self.recvbytes += len(rsp)
  240. t2 = time.time()
  241. t3 = t2 - t1
  242. self.rsponsetime = t3
  243. # print len(req),len(rsp)
  244. # print "--------------"
  245. # print req
  246. # print rsp
  247. # print "==========="
  248. # sys.exit(0)
  249. return rsp
  250. def onTimer(self):
  251. return
  252. t = "SimpleWorker(%d) onTimer" % (os.getpid())
  253. self.log(t)
  254. def onExit(self):
  255. pid = os.getpid()
  256. self.report()
  257. s = self.share_array[self.worker_id]
  258. s.worker_id = self.worker_id
  259. s.reqcount = self.reqcount
  260. s.reqmaxcount = self.reqmaxcount
  261. s.rspcount = self.rspcount
  262. s.rspokcount = self.rspokcount
  263. s.startms = self.startms
  264. s.endms = current_ms()
  265. s.sendbytes = self.sendbytes
  266. s.recvbytes = self.recvbytes
  267. pass
  268. if __name__ == '__main__':
  269. print "main process(%d) start" % os.getpid()
  270. worker_count = WORKER_COUNT
  271. g_share_array = Array(WorkerData, WorkerData.createWorkerDataList(worker_count))
  272. worker_interface_array = list()
  273. for i in xrange(worker_count):
  274. w = HttpTestWorker(i, g_share_array)
  275. worker_interface_array.append(w)
  276. p = MultiProcessWrapper.MultiProcessWrapper()
  277. p.startAsForver(worker_interface_array)
  278. print "main process(%d) end" % (os.getpid())
  279. print "*" * 100
  280. gatherResult(g_share_array, worker_count)
  281. sys.exit(0)




声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/236073
推荐阅读
相关标签
  

闽ICP备14008679号