跳转到3.0版本https://www.cnblogs.com/df888/p/12031649.html
AIM是我用python搭建的第一款接口自动化测试框架,随着技术的提升,框架也在升级,故有了AIM2.0。
附上 AIM1.0链接
什么是AIM2.0?
AIM2.0 makes it easier to interface testing more quickly and with less code.
————改编自django
先皮一下啦,哈哈哈。
AIM2.0特点
- 原生python,简单易用
- for while if,逻辑控制灵活
- 随机值、响应值、数据库值,取值简单
- 友好支持代码级json显示,参数化直接
- 写代码,调试方便
- parewise算法等,覆盖场景更多
为了能更好的发挥这些特点,需要一定的编程能力
现在这时代不懂点编程,怎么做测试呢?
AIM2.0怎么用
unittest->pytest
最简单的一个case代码
from common.request import post
from common.assertion import *
def test():
"""一个简单的post请求结构"""
r = post({
"url": "",
"headers": {},
"body": {}
})
check_flag(r)
执行测试
目录结构
Postar
----case 用例
----common 公共
----config 配置
----data 数据
----result 结果
简洁明了的风格
核心代码
request
request是基于requests封装的请求,重中之中。
相比AIM1.0,做了一些重构。1.采用dict来存取请求相关数据;2.改用装饰器来处理post、get、put等方法 3.csv解耦内聚,从csv_reader.py挪到了request.py
req = {
"default_headres": {"Content-Type": "application/json"},
"p": None,
"method": None,
"r": None,
"elapsed": None,
"log_level": 1 # req日志级别 0不log不record 1log 2record 3log+record
}
def _sender(f):
method = f.__name__
def s(p, param='json'):
param_map = {
'post': param, # json data
'get': 'params',
'put': 'json'
}
p['headers'] = p['headers'] if p['headers'] else req['default_headers']
start = time.process_time()
_timeout_try(f"requests.{method}(\"{p['url']}\", headers={p['headers']}, "
f"{param_map[method]}={p['body']}, verify=False)")
end = time.process_time()
req['elapsed'] = decimal.Decimal("%.2f" % float(end - start))
req['p'] = p
req['method'] = method
_req_log()
return req['r']
return s
def _timeout_try(expression):
while True:
try:
req['r'] = eval(expression)
except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as e:
if 'bad handshake' in str(e) or '10054' in str(e):
continue
else:
raise Exception(e)
break
def _req_log():
if req['log_level'] in (1, 3):
logger.info(f"{req['p']['url']} {req['method']} 请求头{obj2json(req['p']['headers'])} "
f"参数{obj2json(req['p']['body'])}\n响应\n{req['r'].text}\n")
if req['log_level'] in (2, 3):
title = ['time', 'status', 'elapsed(s)', 'url', 'headers', 'method', 'body', 'response']
row = [current_time(),
req['r'].status_code,
req['elapsed'],
req['p']['url'],
obj2json(req['p']['headers']),
req['method'],
obj2json(req['p']['body']),
req['r'].text,
' '] # 多写一列空格,response列数据显示不会溢出单元格
write_record(csv_path, title, row)
@_sender
def post():
"""post"""
@_sender
def get():
"""get"""
@_sender
def put():
"""put"""
dao
Data Access Object,目前仅支持mysql。
借助pandas和sqlalchemy做了极大的简化
class Dao:
def __init__(self, address, user, password):
self.engine = create_engine(f'mysql+pymysql://{user}:{password}@{address}')
def select(self, sql):
"""
查询
:param sql:
:return: DataFrame 可以按二维数组来查值 data['col_name']['row_num']
"""
logger.info(f'执行sql:{sql}')
return pd.read_sql(sql, self.engine)
randomer
专门用来产生随机数据的
def p_sample(seq, n):
"""
postar的sample,防止越界
:param seq:
:param n:
:return:
"""
return random.sample(seq, n if n < len(seq) else len(seq))
def random_hanzi(n):
"""随机中文汉字"""
return ''.join(random.sample(hanzi.replace('\r', '').replace('\n', ''), n))
def random_id(n):
"""随机id"""
s = ""
for i in range(n):
s += str(random.randint(0, 9))
return int(s)
def random_01(n):
"""随机01编码 '1010100' """
return ''.join([random.choice('01') for i in range(n)])
def _timestamp(t):
"""时间戳"""
y, m, d = tuple(int(x) for x in t.split('-'))
m = (y, m, d, 0, 0, 0, 0, 0, 0)
return time.mktime(m)
def random_date(start, end, n):
"""默认返回n个日期 列表"""
start = _timestamp(start)
end = _timestamp(end)
return [time.strftime("%Y-%m-%d", time.localtime(random.randint(start, end))) for i in range(n)]
def range_num(a, b, interval=1.0):
if interval == 1:
return [i for i in range(a, b)]
if interval == 0.5:
out = []
for i in range(a, b):
out.append(i)
out.append(i + 0.5)
return out
def range_str(a, b, random_n=None):
out = [str(i) for i in range(a, b)]
return out if random_n is None else p_sample(out, random_n)
log
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 设置日志级别,默认为WARN
_formatter = logging.Formatter('\n[%(asctime)s]%(message)s', '%Y-%m-%d %H:%M:%S')
# 文件
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setFormatter(_formatter)
logger.addHandler(fh)
# 控制台
stdout = sys.stdout
ch = logging.StreamHandler(stdout)
ch.setFormatter(_formatter)
logger.addHandler(ch)
def stdout_write(msg):
stdout.write(msg)
stdout.flush()
if __name__ == '__main__':
logger.info('hello')
func
一些公共函数
def encrypt(pwd):
"""base64加密"""
return base64.b64encode(pwd.encode()).decode()
def current_time(n=0):
time_format = ['%Y-%m-%d %H:%M:%S', # 时间
'%Y-%m-%d', # 日期
'%Y%m%d%H%M%S'] # 数字
return time.strftime(time_format[n], time.localtime(time.time()))
def last_natural_day():
"""
上个自然日
:return:
"""
return (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
def obj2json(x):
"""
obj to json
:param x:
:return: json
"""
if isinstance(x, dict):
return json.dumps(x, ensure_ascii=False)
elif isinstance(x, str):
return ast.literal_eval(x.replace('null', '\'\'').replace('false', '\'\''))
def lower_id(s):
"""report/downExcel to report_down_excel"""
s = ''.join(['_' + c.lower() if c.isupper()
else c for c in s.replace('/', '_')])
return s[1:] if s[0] == '_' else s
def upper_id(s):
"""national_market_index to NationalMarketIndex"""
return ''.join(x[0].upper() + x[1:] for x in s.replace(' ', '').replace('/', '_').split('_'))
def bar(i):
"""
进度条
:param i: 百分比
:return:
"""
c = int(i / 10)
jd = '\r %2d%% [%s%s]'
a = '■' * c
b = '□' * (10 - c)
stdout_write(jd % (i, a, b))
def json2form(body):
"""
json转表单
:param body: {"a": 1, "b": 2}
:return: a=1&b=2
"""
return '&'.join([f"{k}={v}" for k, v in body.items()])
def form2json(form):
"""
表单转json
:param form: a=1&b=2
:return: {"a": 1, "b": 2}
"""
body = {}
for kv in form.split('&'):
k, v = kv.split('=')
body[k] = v
return json.dumps(body, indent=4, ensure_ascii=False)
在实际工作中,发现还有不少优化的余地,期待在未来做到3.0,再出个“手把手教你用AIM做接口自动化”。