#!/usr/bin/env python # -*- coding:utf-8 -*- """ 根据依赖case的结果更改参数 Author: songjian at Created: 2020/6/12 """ import sys import logging import json from readYaml import read_yaml import os import time import datetime import subprocess import random import uuid work_path = os.path.dirname(os.path.abspath(__file__)) sdk_path = os.path.dirname(os.path.dirname(work_path)) reload(sys) sys.setdefaultencoding('utf-8') func_list = ['to_timestamp', 'bw_date', 'bw_week', 'func_get_faq_list', 'get_index', 'compatible_api', 'timestamp_index', 'random_str', 'func_get_pattern_id', 'func_get_tree_node_list', 'random_id', 'date_index'] def random_id(): """ 生成随机id :return: """ dataid = str(uuid.uuid1()).replace("-", '') return {"dataid": dataid} def random_str(): """ 生成随机字符串 :return: """ num = random.randint(4, 12) name = random.sample('zyxwvutsrqponmlkjihgfedcba', num) name = ''.join(name) now = datetime.datetime.now() name = name + str(now.second) return {"random_name": name} def str_to_int(index): """ 字符串转数字 """ for ch in index: if ch < '0' or ch > '9': return index return int(index) def func_get_faq_list(case_result): """ 设置修改faq的参数 """ item = case_result["get_faq_list"]["data"]["items"][0] item["title"] = item["title"] item["answer"] = "China" now = int(time.time()*1000) item["updateTime"] = now item["@updateTime"] = now return {"item": item} def bw_date(): """ 获取 间隔30天的日期整数 例如 2020-07-27 00:00:00 对应的时间戳 """ now = datetime.datetime.now() en_date = datetime.date(now.year, now.month, now.day) be_date = en_date + datetime.timedelta(days=-30) return {"beginDate": int(time.mktime(be_date.timetuple())*1000), "endDate": int(time.mktime(en_date.timetuple())*1000)} def bw_week(): """ 获取间隔7天的日期整数,包括当天 """ now = datetime.datetime.now() en_date = datetime.datetime(now.year, now.month, now.day, 23, 59, 59) be_date = en_date + datetime.timedelta(days=-8) return {"beginDate": int(time.mktime(be_date.timetuple()) * 1000), "endDate": int(time.mktime(en_date.timetuple()) * 1000)} def get_index(case_data, indice): """ 获取指定类目的索引 """ host = case_data['host'] url = host + "/_cat/indices" cmd = "curl %s |grep %s |awk '{print $3}'" % (url, indice) ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) out, err = ret.communicate() indices = out.split()[-1] return {"index": indices} def to_timestamp(): """ 返回时间戳 """ return {"to_timestamp": int(round(time.time() * 1000))} def date_index(n, k): """ 返回当前日期前后几天的时间格式 假设今天是21号,n=-3,k=3则返回16-18号的时间格式,2022-01-16 00:00:00 2022-01-18 23:59:59 :param n: 可填正负数,正数为往后推日期,负数为往前推日期,0为当天 :param k: 可填正整数,表示时间间隔 :return: """ now = datetime.datetime.now() en_date = datetime.datetime(now.year, now.month, now.day, 23, 59, 59) + datetime.timedelta(days=n) be_date = datetime.datetime(now.year, now.month, now.day, 0, 0, 0) + datetime.timedelta(days=n - k + 1) return {"beginDate": str(be_date), "endDate": str(en_date)} def timestamp_index(n, k): """ 返回当前日期前后几天的时间戳 假设今天是21号,n=-3,k=3则返回16-18号的时间戳 :param n: 可填正负数,正数为往后推日期,负数为往前推日期,0为当天 :param k: 可填正整数,表示时间间隔 :return: """ now = datetime.datetime.now() en_date = datetime.datetime(now.year, now.month, now.day, 23, 59, 59) + datetime.timedelta(days=n) be_date = datetime.datetime(now.year, now.month, now.day, 0, 0, 0) + datetime.timedelta(days=n - k + 1) return {"beginDate": int(time.mktime(be_date.timetuple()) * 1000), "endDate": int(time.mktime(en_date.timetuple()) * 1000)} def compatible_api(case_name, case_result): """ 兼容api列表 :param case_name: :param case_result: :return: """ result = case_result[case_name] if isinstance(result['data'], list): temp = result['data'] del result['data'] result['data'] = {} result['data']['list'] = temp return result def find_key(data_list, reg): """ 在数组中寻找符合要求的键值 """ # reg = reg.strip('[').strip(']') reg = reg[1:-1] tree_node = reg.split('#') for data in data_list: key = [] temp = data for node in tree_node: if ':' in node: key = node.split(':', 1) else: try: temp = temp[str_to_int(node)] except Exception as e: logging.info('该条结果缺少该字段:%s' % e) break if len(key) != 2 or key[0] not in temp.keys(): continue elif temp[key[0]] == str_to_int(key[1]): return data return None def func_get_pattern_id(case_name, case_result, pattern_string): """ 从上一个case 里获取 pattern_id """ result = case_result[case_name] pattern_string = unicode(pattern_string, 'utf-8') for part in result['result']['pattern_body']: if pattern_string == part['pattern_string']: return part['pattern_id'], True return 0, False def func_get_tree_node_list(case_name, case_result): """ 从之前 case 的结果里,获取知识树节点叶子节点 list """ result = case_result[case_name] result_list = [] for part in result['data']['list']: result_list.append(part['id']) return result_list, False def get_prop_map(case_name, pre_prop, back_prop, case_result, cache_data): """ 获取替换的参数值映射表 # 本文件测试用例存储在case_result # 其他文件的结果存储在cache_data """ prop_map = {} case_name_list = case_name.split('||') pre_props = pre_prop.split('||') back_props = back_prop.split('||') if len(case_name_list) != len(pre_props) or len(case_name_list) != len(back_props): logging.error('parameter length not equal') return prop_map, False for n in range(len(case_name_list)): func_name = case_name_list[n].split('(')[0] if case_name_list[n] not in case_result.keys() and func_name not in func_list: # 模式2:外部输入的方式 conf_list = func_name.split('.') if conf_list[0] == 'input': try: result = cache_data[".".join(conf_list[1:])] except Exception as e: logging.info('case_name:%s is not exist or input para by conf file' % case_name_list[n]) continue else: logging.error('case_name:%s is not exist' % case_name_list[n]) return prop_map, False elif func_name in func_list: # 模式1:自定义函数 func = case_name_list[n] if case_name_list[n].find('(') < 0: func = '%s()' % case_name_list[n] result = eval(func) else: # 模式2:本文件测试用例来源 result = case_result[case_name_list[n]] # 生成替换的dict pre_prop_list = pre_props[n].split('|') back_prop_list = back_props[n].split('|') if len(pre_prop_list) != len(back_prop_list): logging.error('pre_prop and back_prop length not equal') return prop_map, False for i in range(len(pre_prop_list)): try: temp = result tree_list = pre_prop_list[i].split('>') for item in tree_list: if '[' in item: temp = find_key(temp, item) else: temp = temp[str_to_int(item)] prop_map[back_prop_list[i]] = temp except Exception as e: logging.error(e) logging.error("无法替换对应的参数,请检查测试用例替换部分") return prop_map, False return prop_map, True def init_relevance(case_data, case_result, cache_data): """ 支持2个模式, 模式1:自定义函数 模式2:直接依赖其他case的结果; 1)依赖本文件的测试用例结果,case_name直接写明即可,由case_result传入 2)依赖其他case文件结果,以input为开头;由cache_data结果传入 模式1:可以将case_result传入,随意处理 模式2:不支持嵌套的获取值,若想复杂情况,请直接func函数 根据依赖case结果替换当前case参数 eg. {"error":0,"data":[{"id":1},{"id":2, "name":"demo"},{"id":3, "test":{"id_in":33}, "name":"demo2"}]} 得到demo的值:data>[id:2]>name 得到demo2的值:data>[test#id_in:33]>name 支持通过index位置获取,得到demo2的值:data>2>name >表示dict进入下一层 #表示列表进入下一层 """ # step1: get_para # 若不依赖多余的参数 if 'precondition' not in case_data.keys(): return case_data, True try: # 判断参数是否预期 case_name = case_data['precondition']['case_name'].split(';') pre_prop = case_data['precondition']['pre_prop'].split(';') back_prop = case_data['precondition']['back_prop'].split(';') if 'replace_key' not in case_data['precondition'].keys() \ or not case_data['precondition']['replace_key']: replace_key = ['parameter'] else: replace_key = case_data['precondition']['replace_key'].split(';') except Exception as e: logging.error(e) return case_data, False if len(case_name) != len(pre_prop) or len(case_name) != len(back_prop) \ or len(case_name) != len(replace_key): logging.error('precondition parameter length not equal') return case_data, False replace_list = case_data for round in range(len(case_name)): # STEP2: get prop_map prop_map, code = get_prop_map(case_name[round], pre_prop[round], back_prop[round], case_result, cache_data) if code is False: logging.error('无法获取正确的参数替换映射') return case_data, False # Step3: replace para try: parameter = "" if not isinstance(replace_list[replace_key[round]], dict) \ and not isinstance(replace_list[replace_key[round]], list): parameter = replace_list[replace_key[round]] else: parameter = json.dumps(replace_list[replace_key[round]], ensure_ascii=False) for back_prop_key, back_prop_value in prop_map.items(): if isinstance(back_prop_value, str) or isinstance(back_prop_value, unicode) \ or replace_key[round] == 'check' or replace_key[round] == 'skip': parameter = parameter.replace('%s' % back_prop_key, str(back_prop_value)) elif isinstance(back_prop_value, int): if '\\"%s\\"' % back_prop_key in parameter: parameter = parameter.replace('\\"%s\\"' % back_prop_key, str(back_prop_value)) else: parameter = parameter.replace('"%s"' % back_prop_key, str(back_prop_value)) else: parameter = parameter.replace('"%s"' % back_prop_key, json.dumps(back_prop_value, ensure_ascii=False)) if not isinstance(replace_list[replace_key[round]], dict) \ and not isinstance(replace_list[replace_key[round]], list): replace_list[replace_key[round]] = parameter else: replace_list[replace_key[round]] = json.loads(parameter, strict=False) except Exception as e: logging.error(e) return case_data, False return replace_list, True