赞
踩
一 单接口
#!/usr/bin/env python # -*- coding:utf-8 -*- import json import random import datetime import requests import threading import time class Presstest(object): headers = { 'Content-Type': 'application/json; charset=UTF-8' } def __init__(self, login_url, press_url, phone, password): self.login_url = login_url self.press_url = press_url self.phone = phone self.password = password self.session = requests.Session() self.session.headers = self.headers def login(self): '''登陆获取session''' data = {"mobile": self.phone, "pwd": self.password, "type": "1"} res = self.session.post(self.login_url, data=json.dumps(data)) XToken = res.json().get('data').get('token') self.session.headers['token'] = XToken def testinterface(self): '''压测接口''' # self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok' data = {"id": 418} global ERROR_NUM try: print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f')) response = self.session.post(self.press_url, data=json.dumps(data)) if response.json().get('code') != 0: print(response.json()) ERROR_NUM += 1 except Exception as e: print(e) ERROR_NUM += 1 def testonework(self): '''一次并发处理单个任务''' i = 0 while i < ONE_WORKER_NUM: i += 1 self.testinterface() time.sleep(LOOP_SLEEP) def run(self): '''使用多线程进程并发测试''' t1 = time.time() Threads = [] for i in range(THREAD_NUM): t = threading.Thread(target=self.testonework, name="T" + str(i)) t.setDaemon(True) Threads.append(t) for t in Threads: t.start() for t in Threads: t.join() t2 = time.time() print("===============压测结果===================") print("URL:", self.press_url) print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM) print("总耗时(秒):", t2 - t1) print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)) print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))) print("错误数量:", ERROR_NUM) if __name__ == '__main__': login_url = 'http://test/sale/login/login' press_url = 'http://test/sale/hsOrder/afterExamineAdopt' phone = "150000000" password = "123456" THREAD_NUM = 1 # 并发线程总数 ONE_WORKER_NUM = 5 # 每个线程的循环次数 LOOP_SLEEP = 0.1 # 每次请求时间间隔(秒) ERROR_NUM = 0 # 出错数 obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password) obj.login() obj.run()
二 多接口参数化(实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度)
#!/usr/bin/env python # -*- coding:utf-8 -*- import json import random import datetime import requests import threading import time class Presstest(object): headers = { 'Content-Type': 'application/json; charset=UTF-8' } def __init__(self, login_url, press_url, phone, password): self.login_url = login_url self.press_url = press_url self.phone = phone self.password = password self.session = requests.Session() self.session.headers = self.headers def login(self): '''登陆获取session''' data = {"mobile": self.phone, "pwd": self.password, "type": "1"} res = self.session.post(self.login_url, data=json.dumps(data)) XToken = res.json().get('data').get('token') self.session.headers['token'] = XToken def testinterface(self): '''压测接口''' # self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok' data = {"id": 418} global ERROR_NUM try: print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f')) response = self.session.post(self.press_url, data=json.dumps(data)) if response.json().get('code') != 0: print(response.json()) ERROR_NUM += 1 except Exception as e: print(e) ERROR_NUM += 1 def testinterface2(self, url, data): '''压测接口''' # self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok' # data = {"orderId": 1179, "cause": "让人"} global ERROR_NUM try: print('开始调接口2222222:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f')) print("接口请求入参url,data==", url, data) response = self.session.post(url, data=json.dumps(data)) if response.json().get('code') != 0: print(response.json()) ERROR_NUM += 1 except Exception as e: print(e) ERROR_NUM += 1 def testonework(self, url, data): '''一次并发处理单个任务''' i = 0 while i < ONE_WORKER_NUM: i += 1 self.testinterface2(url, data) self.testinterface() time.sleep(LOOP_SLEEP) def run(self): '''使用多线程进程并发测试''' t1 = time.time() Threads = [] # 实现多接口参数化并发,data和url必须一一对应,且THREAD_NUM并发线程数不能大于url_list长度 data_list = [{"orderId": 1194, "cause": "让人"}, {"orderId": 1193, "cause": "让人"}, {"orderId": 1192, "cause": "让人"}] url_list = ["http://test/api1", "http://test/api2", "http://test/api3"] list_arr = list(range(0, len(data_list))) print("index========", list_arr) for i in range(THREAD_NUM): index = random.choice(list_arr) list_arr.remove(index) t = threading.Thread(target=self.testonework(url_list[index], data_list[index]), name="T" + str(i)) t.setDaemon(True) Threads.append(t) for t in Threads: t.start() for t in Threads: t.join() t2 = time.time() print("===============压测结果===================") print("URL:", self.press_url) print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM) print("总耗时(秒):", t2 - t1) print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)) print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))) print("错误数量:", ERROR_NUM) if __name__ == '__main__': login_url = 'http://test/login' press_url = 'http://test/afterExamineAdopt' phone = "1500000000" password = "123456" THREAD_NUM = 3 # 并发线程总数 ONE_WORKER_NUM = 1 # 每个线程的循环次数 LOOP_SLEEP = 0 # 每次请求时间间隔(秒) ERROR_NUM = 0 # 出错数 obj = Presstest(login_url=login_url, press_url=press_url, phone=phone, password=password) obj.login() obj.run()
三 多接口并发调用方法二
import datetime import json import requests import threading import time def post_request(url, data): start_time = time.time() print('开始调接口111111:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f')) response = requests.post(url, data=json.dumps(data), headers={"Content-Type": "application/json;charset=UTF-8"}) print('调用结束:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f')) end_time = time.time() duration = end_time - start_time print("Response from", url, ":", data, response.text) print("Request duration:", duration * 1000, "ms") data = [{"orderId": 1194, "cause": "让人"}, {"orderId": 1193, "cause": "让人"}, {"orderId": 1192, "cause": "让人"}] urls = ["http://test/api1", "http://test/api2", "http://test/api3"] threads = [] for i in range(len(urls)): t = threading.Thread(target=post_request, args=(urls[i], data[i])) threads.append(t) t.start() for t in threads: t.join()
四 多接口同时并发(相当于集合点)(异步实现集合点)
import asyncio import datetime import aiohttp async def make_request(session, url, data): print('开始调接口:', datetime.datetime.now().strftime('%Y-%m-%d- %H:%M:%S:%f')) async with session.post(url, json=data) as response: result = await response.json(content_type='text/html', encoding='utf-8') return result async def run_concurrent_requests(urls, datas, max_concurrent_requests): headers = { "Content-Type": "application/json;charset=UTF-8" } tasks = [] async with aiohttp.ClientSession() as session: session.headers = headers session.headers["token"] = "BEE010F2A6D0696DD90A82FF28B21AF2" sem = asyncio.Semaphore(max_concurrent_requests) list_arr = list(range(0, len(datas))) for index in list_arr: await sem.acquire() task = asyncio.ensure_future(make_request(session, urls[index], datas[index])) task.add_done_callback(lambda t: sem.release()) tasks.append(task) responses = await asyncio.gather(*tasks) return responses if __name__ == '__main__': urls = ['http://test/SubmitSettlement'] * 5 datas = [{"idArr": [2086]}, {"idArr": [2090]}, {"idArr": [2089]}, {"idArr": [2086]}, {"idArr": [2090]}] max_concurrent_requests = 5 loop = asyncio.get_event_loop() responses = loop.run_until_complete(run_concurrent_requests(urls, datas, max_concurrent_requests)) print(responses)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。