123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- """
- 定义结果校验方法
- Author: songjian at <songjian12@baidu.com>
- Created: 2020/4/21
- """
- import sys
- import re
- import logging
- import os
- import json
- reload(sys)
- sys.setdefaultencoding('utf-8')
- def check_equal(check_data, golden):
- """
- 完全匹配校验
- """
- logging.info('golden:%s, check_data:%s' % (golden, check_data))
- logging.info('check result:%s' % (check_data == golden))
- return check_data == golden
- def check_re(check_data, golden):
- """
- 正则匹配校验
- """
- regex = ''
- if '&' in golden:
- re_words = golden.split('&')
- for word in re_words:
- regex = regex + "(?=.*" + word + ")"
- else:
- regex = golden
- logging.info('regex:%s, check_data:%s' % (regex, check_data))
- logging.info('check result:%s' % (len(re.findall(regex, check_data)) != 0))
- return len(re.findall(regex, check_data)) != 0
- def check_result(data, results):
- """
- 校验入口
- """
- if 'check_type' not in data.keys():
- data['check_type'] = 'regular'
- if not data['check_type']:
- logging.error('校验类型为空')
- return False
- logging.info('校验类型:%s' % data['check_type'])
- if data['check_type'] == 'regular':
- return check(data, results)
- elif data['check_type'] == 'not_in':
- return not check(data, results)
- elif data['check_type'] == 'check_http':
- return check_http(results)
- else:
- logging.error('check type is not define')
- return False
- def str_to_int(index):
- """
- 字符串转数字
- """
- for ch in index:
- if ch < '0' or ch > '9':
- return index
- return int(index)
- def find_key(data_list, reg):
- """
- 在数组中寻找符合要求的键值
- """
- # reg = reg.strip('[').strip(']')
- reg = reg[1:-1]
- tree_node = reg.split('#')
- for data in data_list:
- key = []
- temp = data
- for node in tree_node:
- if ':' in node:
- key = node.split(':', 1)
- else:
- try:
- temp = temp[str_to_int(node)]
- except Exception as e:
- logging.info('该条结果缺少该字段:%s' % e)
- break
- if len(key) != 2 or key[0] not in temp.keys():
- continue
- elif temp[key[0]] == str_to_int(key[1]):
- return data
- return {}
- def get_prop(data, path):
- """
- 获取指定字段
- :param data:
- :return:
- """
- try:
- temp = data
- tree_list = path.split('>')
- for item in tree_list:
- if '[' in item:
- temp = find_key(temp, item)
- else:
- temp = temp[str_to_int(item)]
- return temp, True
- except KeyError as e:
- logging.error('KeyError:%s' % e)
- return data, False
- except IndexError as e:
- logging.error('IndexError:%s' % e)
- return data, False
- def check(data, results):
- """
- 校验函数
- """
- lines = []
- if os.path.isfile(str(data['assert_words'])):
- with open(data['assert_words'], 'r') as f:
- lines = f.readlines()
- if len(lines) != len(results):
- logging.error('check file length not equal to result file')
- return False
- for i in range(len(results)):
- if 'path' not in data.keys():
- temp = (results[i][1], True)
- else:
- logging.info('校验路径:%s' % data['path'])
- temp = get_prop(results[i][1], data['path'])
- if not temp[1]:
- logging.error("获取指定参数失败")
- return False
- if isinstance(temp[0], int):
- parameter = str(temp[0])
- elif not isinstance(temp[0], dict) \
- and not isinstance(temp[0], list):
- parameter = temp[0]
- else:
- parameter = json.dumps(temp[0], ensure_ascii=False)
- if os.path.isfile(str(data['assert_words'])):
- if not check_re(parameter, lines[i].strip().decode('utf-8')):
- return False
- elif not check_re(parameter, data['assert_words']):
- return False
- return True
- def check_http(results):
- """
- 验证http 链接是否可达
- """
- if results[0][0] in [200, 304]:
- return True
- else:
- return False
- if __name__ == "__main__":
- print check_result({"check_type": "regular", "assert_words": "error\": 0"}, [[200, {"error": 0}]])
|