赞
踩
MultiProcessWrapper.py
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- #############################################################
- #teaching wisedom to my machine,please call me Croco#
- #############################################################
- # graceful_exit_event.py
- # UTF-8 without BOM
- #
- # refer:
- # http://stackoverflow.com/questions/26414704/how-does-a-python-process-exit-gracefully-after-receiving-sigterm-while-waiting?rq=1
- # http://www.cnblogs.com/kaituorensheng/p/4445418.html
- # init created: 2016-07-13
- # last updated: 2016-07-14
- #
- #######################################################################
- import os
- import signal
- import multiprocessing
- import sys
- from abc import ABCMeta,abstractmethod
- import time
- import traceback
-
-
- ###################################################
- # WorkerInterface
- # 定义了一个worker必备的功能接口
- ###################################################
- class WorkerInterface(object):
- __metaclass__ = ABCMeta
- @abstractmethod
- def onStart(self):
- pass
- @abstractmethod
- def onEnd(self,end_code,end_reason):
- pass
- @abstractmethod
- def onRunOnce(self):
- pass
- @abstractmethod
- def onTimer(self):
- pass
- @abstractmethod
- def finished(self):
- pass
- @abstractmethod
- def timeout_ms(self):
- pass
- @abstractmethod
- def log(self,txt):
- pass
- @abstractmethod
- def daemon(self):
- pass
-
-
-
- ###################################################
- # SimpleWorker
- # 基于WorkerInterface实现一个简单的worker,
- # 提供使用示例
- ###################################################
- class SimpleWorker(WorkerInterface):
- def __init__(self):
- self.is_finished=False
- def onStart(self):
- t= "SimpleWorker(%d) start ..." % os.getpid()
- self.log(t)
- pass
- def onEnd(self,end_code,end_reason):
- t = "SimpleWorker(%d) end.with reason(%d:%s)" % (os.getpid(),end_code,end_reason)
- self.log(t)
- pass
- def onRunOnce(self):
- t = "SimpleWorker(%d) onRunOnce" % (os.getpid())
- self.log(t)
- self.is_finished=True
- pass
- def onTimer(self):
- t = "SimpleWorker(%d) onTimer" % (os.getpid())
- self.log(t)
- pass
- def finished(self):
- return self.is_finished
- def done(self):
- self.is_finished=True
-
- def timeout_ms(self):
- return 3000
- def log(self,txt):
- print "=>", txt
- pass
- def daemon(self):
- return True
-
-
- ###################################################
- # GracefulExitException
- #
- ###################################################
- class GracefulExitException(Exception):
- @staticmethod
- def sigterm_handler(signum, frame):
- raise GracefulExitException()
- pass
-
- ###################################################
- # GracefulExitException
- #
- ###################################################
- class GracefulExitEvent(object):
- def __init__(self):
- self.workers = []
- self.exit_event = multiprocessing.Event()
-
- # Use signal handler to throw exception which can be caught
- # by worker process to allow graceful exit.
- signal.signal(signal.SIGTERM, GracefulExitException.sigterm_handler)
- pass
-
- def reg_worker(self, wp):
- self.workers.append(wp)
- pass
-
- def is_stop(self):
- return self.exit_event.is_set()
-
- def notify_stop(self):
- self.exit_event.set()
-
- def wait_all(self):
- while True:
- try:
- for wp in self.workers:
- print "main process({0}) observe child status=>name:{1} pid:{2} is_alive:{3}".format(os.getpid(), wp.name,wp.pid,wp.is_alive(),)
- wp.join()
- print "main process(%d) exit." % os.getpid()
- break
- except GracefulExitException:
- self.notify_stop()
- print "main process(%d) got GracefulExitException." % os.getpid()
- except Exception, ex:
- self.notify_stop()
- print "main process(%d) got unexpected Exception: %r" % (os.getpid(), ex)
- break
- pass
-
-
- #######################################################################
- def worker_proc(gee,worker_interface=SimpleWorker()):
- try:
- worker_interface.onStart()
- last = int(time.time()*1000)
- while not gee.is_stop():
- worker_interface.onRunOnce()
- if worker_interface.finished():
- worker_interface.onEnd(0, "finished")
- break
- current = int( time.time() * 1000)
- if current >= last+worker_interface.timeout_ms():
- worker_interface.onTimer()
- last=current
- else:
- worker_interface.onEnd(1,"got parent exit notify")
- except GracefulExitException:
- t= "got graceful exit exception"
- worker_interface.onEnd(-1,t)
- except Exception, ex:
- t = "caught unhandle exception\n"
- info = sys.exc_info()
- for file, lineno, function, text in traceback.extract_tb(info[2]):
- t += "{0} line:{1} in function:{2}\n".format(file, lineno, function)
- t += text+"\n"
- t += "** %s: %s" % info[:2]
- t += "\n"
- worker_interface.onEnd(-2, t)
- finally:
- worker_interface.onEnd(2,"exit")
- sys.exit(0)
-
-
- #######################################################################
- class MultiProcessWrapper(object):
- def __init__(self):
- pass
- # Start some workers process and run forever
- def startAsForver(self,worker_interface_array=list()):
- gee = GracefulExitEvent()
- for worker_interface in worker_interface_array:
- wp = multiprocessing.Process(target=worker_proc, args=(gee,worker_interface))
- wp.daemon=worker_interface.daemon()
- wp.start()
- gee.reg_worker(wp)
- gee.wait_all()
-
-
- if __name__ == "__main__":
- print "main process(%d) start" % os.getpid()
- # p=MultiProcessWrapper()
- # p.startAsForver(SimpleWorker() )
- # print "main process(%d) end" % (os.getpid())
-
-

TestHttpMain.py
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- #############################################################
- # teaching wisedom to my machine,please call me Croco#
- # 这是一个多进程并发的压力测试程序框架
- # 注意不要把进程数设置大于10000,这容易把机器跑卡死。
- #############################################################
-
- import HttpModule
- import MultiProcessWrapper
- import os
- import sys
- import time
- from multiprocessing.sharedctypes import Array
- import ctypes
- import socket
- socket.setdefaulttimeout(3)
-
- '''
- GLOBAL Vars
- '''
-
- WORKER_COUNT=3
- REQ_MAX_COUNT=1000
- USE_MAX_TIME =12000
- REQ = 'GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: close\r\nUser-Agent: Python-urllib/2.7\r\n\r\n'
- REQ = 'GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: keep-alive\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n\r\n'
- REQ = 'GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: keep-alive\r\nUser-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n\r\n'
- # req='GET / HTTP/1.1\r\nAccept-Encoding: identity\r\nHost: 127.0.0.1\r\nConnection: keep-alive\r\nUser-Agent: Python-urllib/2.7\r\n\r\n'
-
-
-
-
-
- def current_ms():
- return int(time.time() * 1000)
-
- '''
- ############################################
- class WorkerData
- 定义Worker工作结束后的状态数据,该数据
- 存储在share mempory ,便于全部worker结束后
- main process做汇总
- ############################################
- '''
- class WorkerData(ctypes.Structure):
- _fields_ = [('worker_id', ctypes.c_long),
- ('reqcount', ctypes.c_long),
- ('rspcount', ctypes.c_long),
- ('rspokcount', ctypes.c_long),
- ('sendbytes', ctypes.c_long),
- ('recvbytes', ctypes.c_long),
- ('startms', ctypes.c_long),
- ('endms', ctypes.c_long),
- ]
-
- @staticmethod
- def createWorkerDataList(worker_count):
- worker_data_list = []
- for i in xrange(worker_count):
- w = WorkerData()
- w.worker_id = -1
- w.reqcount = 0
- w.rspcount = 0
- w.rspokcount = 0
- w.startms = 0
- w.endms = 0
- w.sendbytes = 0
- w.recvbytes = 0
- worker_data_list.append(w)
- return worker_data_list
-
-
- def normalizeNetIO(total_bytes, total_ms):
- if total_ms == 0:
- return "error net io"
- speed = total_bytes / total_ms
- speed = speed * 1000
- if speed < 1024:
- return "{0}Bytes/s".format(speed)
- if 1024 <= speed < 1048576:
- return "{0}KB/s".format(speed / 1024)
- return "{0}MB/s".format(speed / 1048576)
-
- '''
- ############################################
- gatherResult()
- main process做汇总的函数
- ############################################
- '''
- def gatherResult(g_share_array, worker_count):
- print "--> gather result---"
- reqcount = 0
- rspcount = 0
- rspokcount = 0
- sendbytes = 0
- recvbytes = 0
- use_ms = 0
- for i in xrange(worker_count):
- s = g_share_array[i]
- print s.worker_id, s.reqcount, s.rspcount, s.rspokcount, s.startms, s.endms, s.sendbytes, s.recvbytes
- reqcount += s.reqcount
- rspcount += s.rspcount
- rspokcount += s.rspokcount
- sendbytes += s.sendbytes
- recvbytes += s.recvbytes
- tmp_ms = s.endms - s.startms
- if tmp_ms > use_ms:
- use_ms = tmp_ms
- result = "------------------------------------------\n"
- result += "worker count:{0}\n".format(worker_count)
- result += "total reqcount:{0}\n".format(reqcount)
- result += "total rspcount:{0}\n".format(rspcount)
- result += "total rspokcount:{0}\n".format(rspokcount)
- result += "total use_ms:{0}\n".format(use_ms)
- result += "total sendbytes:{0}\n".format(sendbytes)
- result += "total recvbytes:{0}\n".format(recvbytes)
- tps = 0
- if use_ms > 0:
- tps = rspcount / use_ms
- result += "averge tps:{0}\n".format(tps * 1000)
- result += "averge sendspeed:{0}\n".format(normalizeNetIO(sendbytes, use_ms))
- result += "averge recvspeed:{0}\n".format(normalizeNetIO(recvbytes, use_ms))
- result += "=========================================\n"
- print result
- with open("result.txt", 'a') as f:
- f.write(result)
- print "Everying is Finished!"
-
-
-
- '''
- ############################################
- class HttpTestWorker
- 实现了WorkerInterface,是一个worker进程实质
- 运行的类
- ############################################
- '''
- class HttpTestWorker(MultiProcessWrapper.SimpleWorker):
- def __init__(self, worker_id, share_array):
- global REQ_MAX_COUNT
- self.worker_id = worker_id
- self.share_array = share_array
- self.is_finished = False
- self.reqcount = 0
- self.rspcount = 0
- self.reqmaxcount = REQ_MAX_COUNT
- self.rspokcount = 0
- self.sendbytes = 0
- self.recvbytes = 0
- self.startms = current_ms()
- self.rsponsetime = 0
- super(HttpTestWorker, self).__init__()
-
- def onStart(self):
- t = "HttpTestWorker(%d) start ..." % os.getpid()
- self.log(t)
- self.buildConnection(addr=("127.0.0.1", 80))
- pass
-
- def onEnd(self, end_code, end_reason):
- print end_reason
- # t = "HttpTestWorker(%d) end.withreason:%d\n%s" % (os.getpid(),end_code,end_reason)
- # self.log(t)
- # use_ms = current_ms() - self.startms
- # print "HttpTestWorker({0}) finishd use_ms:{1}.".format(os.getpid(),use_ms)
- if end_code == 2:
- self.onExit()
- pass
-
- def report(self):
- use_ms = current_ms() - self.startms
- if use_ms == 0:
- return
- s_speed = normalizeNetIO(self.sendbytes,use_ms)
- r_speed = normalizeNetIO(self.recvbytes,use_ms)
- tps = self.rspcount*1000/use_ms
- print "report id:{0} req:{1} rsptime:{2} sspeed:{3} rspeed:{4} tps:{5}".format(os.getpid(), self.reqcount, self.rsponsetime,s_speed,r_speed,tps)
-
- def onRunOnce(self):
- global REQ
- global USE_MAX_TIME
- # in_data="zhangtao"
- # url="http://127.0.0.1:5000/user/{0}".format(in_data)
- url = "http://127.0.0.1"
- self.reqcount += 1
- # response = HttpModule.doHttpGetRequest(url)
- response = self.ioCtrlMockHttpGet(REQ)
- # print response
- # print len(response)
- self.rspcount += 1
- if response and len(response) >= 1:
- self.rspokcount += 1
- if 1 == 0 or self.reqcount >= self.reqmaxcount or current_ms() > self.startms + USE_MAX_TIME:
- self.report()
- self.done()
- if self.reqcount % 1000 == 0:
- self.report()
-
- pass
-
- def buildConnection(self, addr=("127.0.0.1", 80)):
- self.clientSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- self.clientSocket.connect(addr)
- print "TcpConnector connect succ:", addr
- except:
- (ErrorType, ErrorValue, ErrorTB) = sys.exc_info()
- print "Connect server failed: ", ErrorValue, addr
- self.clientSocket.close()
- self.clientSocket = None
- sys.exit(1)
-
- # 模拟HTTPGET的请求的IOCtrl
- def ioCtrlMockHttpGet(self, req=""):
- t1 = time.time()
- if not self.clientSocket:
- print "worker:{0} socketerror,or connection is reset,or cannot connect to server".format(os.getpid())
- sys.exit(100)
- return ""
- ret = self.clientSocket.send(req)
- if ret != len(req):
- print "worker:{0} senderror".format(os.getpid())
- sys.exit(102)
- return ""
- self.sendbytes += ret
-
- recvbuf = ""
- headsize = 0
- totalsize = 0
- while True:
- rsp = self.clientSocket.recv(512)
- if len(rsp) == 0:
- print "worker:{0} recv zero1".format(os.getpid())
- sys.exit(103)
- recvbuf += rsp
- rsp = recvbuf
- p = rsp.rfind("\r\n\r\n")
- if p < 0:
- continue
- headsize = p + 4
- p1 = rsp.find("Content-Length: ")
- if p1 < 0:
- print "worker:{0} cannot find Content-Length,check httpheader".format(os.getpid())
- sys.exit(104)
- p2 = rsp.find("\r\n", p1 + 16)
- if p2 < 0:
- print "worker:{0} cannot find Content-Length2,check httpheader".format(os.getpid())
- sys.exit(105)
- contentlength = rsp[p1 + 16:p2]
- totalsize = headsize + int(contentlength)
- break
- ###########################################
- while True:
- rsp = self.clientSocket.recv(512)
- if len(rsp) == 0:
- print "worker:{0} recv zero2".format(os.getpid())
- sys.exit(103)
- recvbuf += rsp
- rsp = recvbuf
- if len(rsp) >= totalsize:
- break
-
- self.recvbytes += len(rsp)
- t2 = time.time()
- t3 = t2 - t1
- self.rsponsetime = t3
- # print len(req),len(rsp)
- # print "--------------"
- # print req
- # print rsp
- # print "==========="
- # sys.exit(0)
- return rsp
-
- def onTimer(self):
- return
- t = "SimpleWorker(%d) onTimer" % (os.getpid())
- self.log(t)
-
- def onExit(self):
- pid = os.getpid()
- self.report()
- s = self.share_array[self.worker_id]
- s.worker_id = self.worker_id
- s.reqcount = self.reqcount
- s.reqmaxcount = self.reqmaxcount
- s.rspcount = self.rspcount
- s.rspokcount = self.rspokcount
- s.startms = self.startms
- s.endms = current_ms()
- s.sendbytes = self.sendbytes
- s.recvbytes = self.recvbytes
- pass
-
-
- if __name__ == '__main__':
- print "main process(%d) start" % os.getpid()
- worker_count = WORKER_COUNT
- g_share_array = Array(WorkerData, WorkerData.createWorkerDataList(worker_count))
- worker_interface_array = list()
- for i in xrange(worker_count):
- w = HttpTestWorker(i, g_share_array)
- worker_interface_array.append(w)
- p = MultiProcessWrapper.MultiProcessWrapper()
- p.startAsForver(worker_interface_array)
- print "main process(%d) end" % (os.getpid())
- print "*" * 100
- gatherResult(g_share_array, worker_count)
- sys.exit(0)

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。