1、模块导入
from locust import HttpUser, task, constant, between, constant_pacing, events(监听器)
LoadTestShape, tag(标签)
import logging(模块)
2、用requests库封装一个登陆的方法
def login():
import requests
res = requests.post(url="http://139.***.***.129:8899/adminapi/login",json={"account":"admin","pwd":"123456","imgcode":"A"})
return res.json()["data"]["token"]
3、退出码设置(所有项目都是这么用不会有任何区别)完全拷贝就可以
"""
满足以下任何条件,则将退出代码设置为非零:
超过0.01的请求失败
平均响应时间大于2000ms
响应时间的第95个百分位数大于5000毫秒
"""
@events.quitting.add_listener
def _(environment, **kw):
if environment.stats.total.fail_ratio > 0.01:
logging.error("Test failed due to failure ratio > 1%")
environment.process_exit_code = 1
elif environment.stats.total.avg_response_time > 2000:
logging.error("Test failed due to average response time ratio > 2000 ms")
environment.process_exit_code = 1
elif environment.stats.total.get_response_time_percentile(0.95) > 5000:
logging.error("Test failed due to 95th percentile response time > 800 ms")
environment.process_exit_code = 1
else:
environment.process_exit_code = 0
4、setup,在整个负载测试开始前,会执行此方法
@events.test_start.add_listener # todo:在整个负载测试开始前的setup方法
def on_test_start(environment, **kwargs):
print('test start event called.')
print('test start event finished.')
5、teardown,在整个负载测试开始后,会执行此方法(看情况)
@events.test_stop.add_listener # todo:在整个负载测试结束后的 teardown方法
def on_test_stop(environment, **kwargs):
print('test stop event called.')
6、创建一个类
class DoStressTestOnAPI(HttpUser): # todo: 继承HttpUser类,表明新创建的类是支持HTTP协议的API测试
wait_time = constant_pacing(1) # 系统自动调配等待时间保证每秒中每个user instance最多只能有一个task在执行
global project_host # 声明一个全局变量
host ="http://139.***.***.129:8899/" # todo: 定义项目地址
headers = { # todo: 放置请求头
"Accept": "application/json, text/plain, */*",
"Authori-zation": "Bearer " + login(), #(这把token信息放下来了)
}
def on_start(self): # todo:每个用户启动都会调用此方法 on_start则是每个用户开始时运行(开始前seatup方法)
print('test started.')
def on_stop(self): # todo:每个用户启动都会调用此方法 on_stop则是每个用户停止时运行(结束后的thendang方法)
logging.info('test stopped')
用例逻辑层:
"""
tag:表示这个用例的名字,在运行时,可以通过相同的tag名称来执行性能测试场景
task:权重的设置。locust的权重表示着被用户选择并请求的概率,随着权重越高,则被用户请求的几率就会越高。
locust的权重设置比较灵活,可以设置每个方法接口访问的比例
catch_response:
布尔参数
<1>如果设置为True,则表示,如果响应代码是正常(2XX),则标记通过。
<2>如果设置为False,则表示,如果响应代码是正常(2XX),则标记不通过。
name:
name分组,如果API传参是在url后传递,也就是pathvariable。可以加上name做一个分组统计,最后报告会计算共请求了多少次
"""
7、标签
@tag("商品管理-商品搜索","1") # tag标签>(运行的标签设置)
@task(1) # todo:运行DoStressTestOnAPI时,会从多个task(表示权重)任务中随机选择一个,权重增加了他的选择几率(task值越高被用户选择的概率越大)
def Commodity_Search(self): # Commodity_Search接口名称命名
with self.client.get(url = f"adminapi/product/product",headers=DoStressTestOnAPI.headers, # with语句 get请求 url=接口 headers=请求头
data={"page":1,"limit":15,"cate_id":"","type":1,"store_name":"","excel":0})\ # data请求传参
as response:
logging.info('+++++++++++++++++' + str(response.status_code))
logging.info(response.request.headers) #日志
print(response.json())
if response.status_code == 200 and response.json()['status'] == 200:
# response.success()
logging.info(response.json()) # 成功,写在logging日志里
else:
response.failure('test failed and response code is: ' + str(response.json()['code']))
logging.info('error occured') # 失败,写在失败里
8、用例
@tag("商品分类-商品搜索","2",'1','3')
@task(2)
def Commodity_Classification(self):
with self.client.get(url=f"adminapi/product/category", headers=DoStressTestOnAPI.headers,
data={"pid": "", "is_show": "", "page": "1", "cate_name": "", "limit": "15"}) \
as response:
logging.info('+++++++++++++++++' + str(response.status_code))
logging.info(response.request.headers)
print(response.json())
if response.status_code == 200 and response.json()['status'] == 200:
# response.success()
logging.info(response.json())
else:
response.failure('test failed and response code is: ' + str(response.json()['code']))
logging.info('error occured')
推荐阅读
发表评论