在服务端的测试中,除了考虑服务端的业务功能和API的各个兼容性外,还需要考虑的就是服务端的稳定性以及高并发请求下服务端的承载能力。关于并发多少的数量以及具体的响应时间要求,其实每个产品的形态都是不一样的,很难使用标准的说法来进行统一。这具体看被测试的组件它所面对的业务形态,如果业务形态是是很少使用的产品,其实对性能也就没什么要求了。所以关于这点还是得根据被测组件的架构设计,承载的量以及业务目标。本文章主要分享使用Python语言编写一个简单的并发请求的测试代码。
在Python的并发编程模式中,主要涉及的点是线程以及进程,还有对应的协程。而locust主要是基于协程来进行设计,协程我们可以把它理解为微线程。在IO密集性和CPU密集性中,如果是IO密集性的,建议使用多线程的方式更加高效,如果是CPU密集性,建议使用多进程的方式更加高效。本文章主要分享基于IO密集性,也就是多线程的方式。开启一个线程的方式是非常简单,我们可以通过函数式的编程方式,也可以使用面向对象的编程方式,见如下的具体案例代码,函数式的方式:
from threading import Thread
import time as t
import random
def job(name):
print('我是{0},我要开始工作啦'.format(name))
if __name__ == '__main__':
t=Thread(target=job,args=('李四',))
t.start()
print('主线程执行结束')
面向对象的方式:
from threading import Thread
import time as t
import random
class Job(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self) -> None:
print('我是{0},我要开始工作啦'.format(self.name))
if __name__ == '__main__':
t=Job('李四')
t.start()
print('主线程程序执行结束')
其实在Thread的类中,并没有返回被测函数的返回值,也就是说我们在测试API接口的时候,需要拿到被测接口的状态码,请求响应时间,和响应数据,那么我们就需要重新Thread类继承后对run()的方法进行重写,拿到被测函数中我们所期望的数据,具体案例代码如下:
#!coding:utf-8
from threading import Thread
class ThreadTest(Thread):
def __init__(self,func,args=()):
'''
:param func: 被测试的函数
:param args: 被测试的函数的返回值
'''
super(ThreadTest,self).__init__()
self.func=func
self.args=args
def run(self) -> None:
self.result=self.func(*self.args)
def getResult(self):
try:
return self.result
except BaseException as e:
return e.args[0]
这里我们以测试百度首页作为案例,来并发请求后,拿到并发请求后响应时间,状态码,然后依据响应时间拿到中位数以及其他的数据,具体完整案例代码如下:
#!/usr/bin/env python
#!coding:utf-8
from threading import Thread
import requests
import matplotlib.pyplot as plt
import datetime
import time
import numpy as np
import json
class ThreadTest(Thread):
def __init__(self,func,args=()):
'''
:param func: 被测试的函数
:param args: 被测试的函数的返回值
'''
super(ThreadTest,self).__init__()
self.func=func
self.args=args
def run(self) -> None:
self.result=self.func(*self.args)
def getResult(self):
try:
return self.result
except BaseException as e:
return e.args[0]
def baiDu(code,seconds):
'''
:param code: 状态码
:param seconds: 请求响应时间
:return:
'''
r=requests.get(url='http://www.baidu.com/')
code=r.status_code
seconds=r.elapsed.total_seconds()
return code,seconds
def calculationTime(startTime,endTime):
'''计算两个时间之差,单位是秒'''
return (endTime-startTime).seconds
def getResult(seconds):
'''获取服务端的响应时间信息'''
data={
'Max':sorted(seconds)[-1],
'Min':sorted(seconds)[0],
'Median':np.median(seconds),
'99%Line':np.percentile(seconds,99),
'95%Line':np.percentile(seconds,95),
'90%Line':np.percentile(seconds,90)
}
return data
def highConcurrent(count):
'''
对服务端发送高并发的请求
:param cout: 并发数
:return:
'''
startTime=datetime.datetime.now()
sum=0
list_count=list()
tasks=list()
results = list()
#失败的信息
fails=[]
#成功任务数
success=[]
codes = list()
seconds = list()
for i in range(1,count):
t=ThreadTest(baiDu,args=(i,i))
tasks.append(t)
t.start()
for t in tasks:
t.join()
if t.getResult()[0]!=200:
fails.append(t.getResult())
results.append(t.getResult())
endTime=datetime.datetime.now()
for item in results:
codes.append(item[0])
seconds.append(item[1])
for i in range(len(codes)):
list_count.append(i)
#生成可视化的趋势图
fig,ax=plt.subplots()
ax.plot(list_count,seconds)
ax.set(xlabel='number of times', ylabel='Request time-consuming',
title='olap continuous request response time (seconds)')
ax.grid()
fig.savefig('olap.png')
plt.show()
for i in seconds:
sum+=i
rate=sum/len(list_count)
# print('\n总共持续时间:\n',endTime-startTime)
totalTime=calculationTime(startTime=startTime,endTime=endTime)
if totalTime
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?