1、模块导入

from locust import HttpUser, task, constant, between, constant_pacing, events(监听器)

LoadTestShape, tag(标签)

import logging(模块)

2、用requests库封装一个登陆的方法

def login():

import requests

res = requests.post(url="http://139.***.***.129:8899/adminapi/login",json={"account":"admin","pwd":"123456","imgcode":"A"})

return res.json()["data"]["token"]

3、退出码设置(所有项目都是这么用不会有任何区别)完全拷贝就可以

"""

满足以下任何条件,则将退出代码设置为非零:

超过0.01的请求失败

平均响应时间大于2000ms

响应时间的第95个百分位数大于5000毫秒

"""

@events.quitting.add_listener

def _(environment, **kw):

if environment.stats.total.fail_ratio > 0.01:

logging.error("Test failed due to failure ratio > 1%")

environment.process_exit_code = 1

elif environment.stats.total.avg_response_time > 2000:

logging.error("Test failed due to average response time ratio > 2000 ms")

environment.process_exit_code = 1

elif environment.stats.total.get_response_time_percentile(0.95) > 5000:

logging.error("Test failed due to 95th percentile response time > 800 ms")

environment.process_exit_code = 1

else:

environment.process_exit_code = 0

4、setup,在整个负载测试开始前,会执行此方法

@events.test_start.add_listener # todo:在整个负载测试开始前的setup方法

def on_test_start(environment, **kwargs):

print('test start event called.')

print('test start event finished.')

5、teardown,在整个负载测试开始后,会执行此方法(看情况)

@events.test_stop.add_listener # todo:在整个负载测试结束后的 teardown方法

def on_test_stop(environment, **kwargs):

print('test stop event called.')

6、创建一个类

class DoStressTestOnAPI(HttpUser): # todo: 继承HttpUser类,表明新创建的类是支持HTTP协议的API测试

wait_time = constant_pacing(1) # 系统自动调配等待时间保证每秒中每个user instance最多只能有一个task在执行

global project_host # 声明一个全局变量

host ="http://139.***.***.129:8899/" # todo: 定义项目地址

headers = { # todo: 放置请求头

"Accept": "application/json, text/plain, */*",

"Authori-zation": "Bearer " + login(), #(这把token信息放下来了)

}

def on_start(self): # todo:每个用户启动都会调用此方法 on_start则是每个用户开始时运行(开始前seatup方法)

print('test started.')

def on_stop(self): # todo:每个用户启动都会调用此方法 on_stop则是每个用户停止时运行(结束后的thendang方法)

logging.info('test stopped')

用例逻辑层:

 """

    tag:表示这个用例的名字,在运行时,可以通过相同的tag名称来执行性能测试场景

    task:权重的设置。locust的权重表示着被用户选择并请求的概率,随着权重越高,则被用户请求的几率就会越高。

    locust的权重设置比较灵活,可以设置每个方法接口访问的比例

    catch_response:

    布尔参数

    <1>如果设置为True,则表示,如果响应代码是正常(2XX),则标记通过。

    <2>如果设置为False,则表示,如果响应代码是正常(2XX),则标记不通过。

    name:

    name分组,如果API传参是在url后传递,也就是pathvariable。可以加上name做一个分组统计,最后报告会计算共请求了多少次

"""

7、标签

@tag("商品管理-商品搜索","1") # tag标签>(运行的标签设置)

@task(1) # todo:运行DoStressTestOnAPI时,会从多个task(表示权重)任务中随机选择一个,权重增加了他的选择几率(task值越高被用户选择的概率越大)

def Commodity_Search(self): # Commodity_Search接口名称命名

with self.client.get(url = f"adminapi/product/product",headers=DoStressTestOnAPI.headers, # with语句 get请求 url=接口 headers=请求头

data={"page":1,"limit":15,"cate_id":"","type":1,"store_name":"","excel":0})\ # data请求传参

as response:

logging.info('+++++++++++++++++' + str(response.status_code))

logging.info(response.request.headers) #日志

print(response.json())

if response.status_code == 200 and response.json()['status'] == 200:

# response.success()

logging.info(response.json()) # 成功,写在logging日志里

else:

response.failure('test failed and response code is: ' + str(response.json()['code']))

logging.info('error occured') # 失败,写在失败里

8、用例

@tag("商品分类-商品搜索","2",'1','3')

@task(2)

def Commodity_Classification(self):

with self.client.get(url=f"adminapi/product/category", headers=DoStressTestOnAPI.headers,

data={"pid": "", "is_show": "", "page": "1", "cate_name": "", "limit": "15"}) \

as response:

logging.info('+++++++++++++++++' + str(response.status_code))

logging.info(response.request.headers)

print(response.json())

if response.status_code == 200 and response.json()['status'] == 200:

# response.success()

logging.info(response.json())

else:

response.failure('test failed and response code is: ' + str(response.json()['code']))

logging.info('error occured')

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。