123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- """
- 根据依赖case的结果更改参数
- Author: songjian at <songjian12@baidu.com>
- Created: 2020/6/12
- """
- import sys
- import logging
- import json
- from readYaml import read_yaml
- import os
- import time
- import datetime
- import subprocess
- import random
- import uuid
- work_path = os.path.dirname(os.path.abspath(__file__))
- sdk_path = os.path.dirname(os.path.dirname(work_path))
- reload(sys)
- sys.setdefaultencoding('utf-8')
- func_list = ['to_timestamp', 'bw_date', 'bw_week', 'func_get_faq_list', 'get_index',
- 'compatible_api', 'timestamp_index', 'random_str', 'func_get_pattern_id',
- 'func_get_tree_node_list', 'random_id', 'date_index']
- def random_id():
- """
- 生成随机id
- :return:
- """
- dataid = str(uuid.uuid1()).replace("-", '')
- return {"dataid": dataid}
- def random_str():
- """
- 生成随机字符串
- :return:
- """
- num = random.randint(4, 12)
- name = random.sample('zyxwvutsrqponmlkjihgfedcba', num)
- name = ''.join(name)
- now = datetime.datetime.now()
- name = name + str(now.second)
- return {"random_name": name}
- def str_to_int(index):
- """
- 字符串转数字
- """
- for ch in index:
- if ch < '0' or ch > '9':
- return index
- return int(index)
- def func_get_faq_list(case_result):
- """
- 设置修改faq的参数
- """
- item = case_result["get_faq_list"]["data"]["items"][0]
- item["title"] = item["title"]
- item["answer"] = "China"
- now = int(time.time()*1000)
- item["updateTime"] = now
- item["@updateTime"] = now
- return {"item": item}
- def bw_date():
- """
- 获取 间隔30天的日期整数 例如 2020-07-27 00:00:00 对应的时间戳
- """
- now = datetime.datetime.now()
- en_date = datetime.date(now.year, now.month, now.day)
- be_date = en_date + datetime.timedelta(days=-30)
- return {"beginDate": int(time.mktime(be_date.timetuple())*1000),
- "endDate": int(time.mktime(en_date.timetuple())*1000)}
- def bw_week():
- """
- 获取间隔7天的日期整数,包括当天
- """
- now = datetime.datetime.now()
- en_date = datetime.datetime(now.year, now.month, now.day, 23, 59, 59)
- be_date = en_date + datetime.timedelta(days=-8)
- return {"beginDate": int(time.mktime(be_date.timetuple()) * 1000),
- "endDate": int(time.mktime(en_date.timetuple()) * 1000)}
- def get_index(case_data, indice):
- """
- 获取指定类目的索引
- """
- host = case_data['host']
- url = host + "/_cat/indices"
- cmd = "curl %s |grep %s |awk '{print $3}'" % (url, indice)
- ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
- out, err = ret.communicate()
- indices = out.split()[-1]
- return {"index": indices}
- def to_timestamp():
- """
- 返回时间戳
- """
- return {"to_timestamp": int(round(time.time() * 1000))}
- def date_index(n, k):
- """
- 返回当前日期前后几天的时间格式
- 假设今天是21号,n=-3,k=3则返回16-18号的时间格式,2022-01-16 00:00:00 2022-01-18 23:59:59
- :param n: 可填正负数,正数为往后推日期,负数为往前推日期,0为当天
- :param k: 可填正整数,表示时间间隔
- :return:
- """
- now = datetime.datetime.now()
- en_date = datetime.datetime(now.year, now.month, now.day, 23, 59, 59) + datetime.timedelta(days=n)
- be_date = datetime.datetime(now.year, now.month, now.day, 0, 0, 0) + datetime.timedelta(days=n - k + 1)
- return {"beginDate": str(be_date), "endDate": str(en_date)}
- def timestamp_index(n, k):
- """
- 返回当前日期前后几天的时间戳
- 假设今天是21号,n=-3,k=3则返回16-18号的时间戳
- :param n: 可填正负数,正数为往后推日期,负数为往前推日期,0为当天
- :param k: 可填正整数,表示时间间隔
- :return:
- """
- now = datetime.datetime.now()
- en_date = datetime.datetime(now.year, now.month, now.day, 23, 59, 59) + datetime.timedelta(days=n)
- be_date = datetime.datetime(now.year, now.month, now.day, 0, 0, 0) + datetime.timedelta(days=n - k + 1)
- return {"beginDate": int(time.mktime(be_date.timetuple()) * 1000),
- "endDate": int(time.mktime(en_date.timetuple()) * 1000)}
- def compatible_api(case_name, case_result):
- """
- 兼容api列表
- :param case_name:
- :param case_result:
- :return:
- """
- result = case_result[case_name]
- if isinstance(result['data'], list):
- temp = result['data']
- del result['data']
- result['data'] = {}
- result['data']['list'] = temp
- return result
- def find_key(data_list, reg):
- """
- 在数组中寻找符合要求的键值
- """
- # reg = reg.strip('[').strip(']')
- reg = reg[1:-1]
- tree_node = reg.split('#')
- for data in data_list:
- key = []
- temp = data
- for node in tree_node:
- if ':' in node:
- key = node.split(':', 1)
- else:
- try:
- temp = temp[str_to_int(node)]
- except Exception as e:
- logging.info('该条结果缺少该字段:%s' % e)
- break
- if len(key) != 2 or key[0] not in temp.keys():
- continue
- elif temp[key[0]] == str_to_int(key[1]):
- return data
- return None
- def func_get_pattern_id(case_name, case_result, pattern_string):
- """
- 从上一个case 里获取 pattern_id
- """
- result = case_result[case_name]
- pattern_string = unicode(pattern_string, 'utf-8')
- for part in result['result']['pattern_body']:
- if pattern_string == part['pattern_string']:
- return part['pattern_id'], True
- return 0, False
- def func_get_tree_node_list(case_name, case_result):
- """
- 从之前 case 的结果里,获取知识树节点叶子节点 list
- """
- result = case_result[case_name]
- result_list = []
- for part in result['data']['list']:
- result_list.append(part['id'])
- return result_list, False
- def get_prop_map(case_name, pre_prop, back_prop, case_result, cache_data):
- """
- 获取替换的参数值映射表
- # 本文件测试用例存储在case_result
- # 其他文件的结果存储在cache_data
- """
- prop_map = {}
- case_name_list = case_name.split('||')
- pre_props = pre_prop.split('||')
- back_props = back_prop.split('||')
- if len(case_name_list) != len(pre_props) or len(case_name_list) != len(back_props):
- logging.error('parameter length not equal')
- return prop_map, False
-
- for n in range(len(case_name_list)):
- func_name = case_name_list[n].split('(')[0]
- if case_name_list[n] not in case_result.keys() and func_name not in func_list:
- # 模式2:外部输入的方式
- conf_list = func_name.split('.')
- if conf_list[0] == 'input':
- try:
- result = cache_data[".".join(conf_list[1:])]
- except Exception as e:
- logging.info('case_name:%s is not exist or input para by conf file' % case_name_list[n])
- continue
- else:
- logging.error('case_name:%s is not exist' % case_name_list[n])
- return prop_map, False
- elif func_name in func_list: # 模式1:自定义函数
- func = case_name_list[n]
- if case_name_list[n].find('(') < 0:
- func = '%s()' % case_name_list[n]
- result = eval(func)
- else: # 模式2:本文件测试用例来源
- result = case_result[case_name_list[n]]
-
- # 生成替换的dict
- pre_prop_list = pre_props[n].split('|')
- back_prop_list = back_props[n].split('|')
- if len(pre_prop_list) != len(back_prop_list):
- logging.error('pre_prop and back_prop length not equal')
- return prop_map, False
- for i in range(len(pre_prop_list)):
- try:
- temp = result
- tree_list = pre_prop_list[i].split('>')
- for item in tree_list:
- if '[' in item:
- temp = find_key(temp, item)
- else:
- temp = temp[str_to_int(item)]
- prop_map[back_prop_list[i]] = temp
- except Exception as e:
- logging.error(e)
- logging.error("无法替换对应的参数,请检查测试用例替换部分")
- return prop_map, False
- return prop_map, True
- def init_relevance(case_data, case_result, cache_data):
- """
- 支持2个模式,
- 模式1:自定义函数
- 模式2:直接依赖其他case的结果;
- 1)依赖本文件的测试用例结果,case_name直接写明即可,由case_result传入
- 2)依赖其他case文件结果,以input为开头;由cache_data结果传入
- 模式1:可以将case_result传入,随意处理
- 模式2:不支持嵌套的获取值,若想复杂情况,请直接func函数
- 根据依赖case结果替换当前case参数
- eg. {"error":0,"data":[{"id":1},{"id":2, "name":"demo"},{"id":3, "test":{"id_in":33}, "name":"demo2"}]}
- 得到demo的值:data>[id:2]>name
- 得到demo2的值:data>[test#id_in:33]>name
- 支持通过index位置获取,得到demo2的值:data>2>name
- >表示dict进入下一层
- #表示列表进入下一层
- """
- # step1: get_para
- # 若不依赖多余的参数
- if 'precondition' not in case_data.keys():
- return case_data, True
- try:
- # 判断参数是否预期
- case_name = case_data['precondition']['case_name'].split(';')
- pre_prop = case_data['precondition']['pre_prop'].split(';')
- back_prop = case_data['precondition']['back_prop'].split(';')
- if 'replace_key' not in case_data['precondition'].keys() \
- or not case_data['precondition']['replace_key']:
- replace_key = ['parameter']
- else:
- replace_key = case_data['precondition']['replace_key'].split(';')
- except Exception as e:
- logging.error(e)
- return case_data, False
- if len(case_name) != len(pre_prop) or len(case_name) != len(back_prop) \
- or len(case_name) != len(replace_key):
- logging.error('precondition parameter length not equal')
- return case_data, False
- replace_list = case_data
- for round in range(len(case_name)):
- # STEP2: get prop_map
- prop_map, code = get_prop_map(case_name[round], pre_prop[round], back_prop[round], case_result, cache_data)
- if code is False:
- logging.error('无法获取正确的参数替换映射')
- return case_data, False
- # Step3: replace para
- try:
- parameter = ""
- if not isinstance(replace_list[replace_key[round]], dict) \
- and not isinstance(replace_list[replace_key[round]], list):
- parameter = replace_list[replace_key[round]]
- else:
- parameter = json.dumps(replace_list[replace_key[round]], ensure_ascii=False)
- for back_prop_key, back_prop_value in prop_map.items():
- if isinstance(back_prop_value, str) or isinstance(back_prop_value, unicode) \
- or replace_key[round] == 'check' or replace_key[round] == 'skip':
- parameter = parameter.replace('%s' % back_prop_key, str(back_prop_value))
- elif isinstance(back_prop_value, int):
- if '\\"%s\\"' % back_prop_key in parameter:
- parameter = parameter.replace('\\"%s\\"' % back_prop_key, str(back_prop_value))
- else:
- parameter = parameter.replace('"%s"' % back_prop_key, str(back_prop_value))
- else:
- parameter = parameter.replace('"%s"' % back_prop_key,
- json.dumps(back_prop_value, ensure_ascii=False))
- if not isinstance(replace_list[replace_key[round]], dict) \
- and not isinstance(replace_list[replace_key[round]], list):
- replace_list[replace_key[round]] = parameter
- else:
- replace_list[replace_key[round]] = json.loads(parameter, strict=False)
- except Exception as e:
- logging.error(e)
- return case_data, False
- return replace_list, True
|