case_lib.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. """
  4. Created: 2021/01/29
  5. """
  6. import sys
  7. import pytest
  8. import os
  9. import logging
  10. import time
  11. import json
  12. from bin.utils.sendApi import send_request
  13. from bin.utils.checkResult import check_result
  14. from bin.utils.initRelevance import init_relevance
  15. from bin.user import customer
  16. reload(sys)
  17. sys.setdefaultencoding('utf-8')
  18. def run_case(case_data, request, case_result):
  19. """
  20. 单个测试用例运行函数
  21. case_data: 表示为运行的函数
  22. request: 表示缓存区的内容
  23. case_result : 表示case的结果
  24. """
  25. logging.info('[%s]info:%s' % (case_data['test_name'], case_data['info']))
  26. logging.info('[%s]request_data:%s' % (case_data['test_name'], json.dumps(case_data, ensure_ascii=False)))
  27. if 'detail' in case_data.keys():
  28. logging.info(case_data['detail'])
  29. #step1 若为无效的用例,则直接跳过,基本废弃
  30. if case_data['invalid']:
  31. assert case_data['invalid'] == True
  32. return
  33. if 'skip' in case_data.keys() \
  34. and check_result(case_data['skip'],
  35. [(200, case_result[case_data['skip']['case_name']])]) == case_data['skip']['type']:
  36. assert True
  37. return
  38. # step2 有效测试用例的判断,动态替换参数
  39. cache_data = request.config.cache.get('cache_data', {})
  40. case_data, code = init_relevance(case_data, case_result, cache_data)
  41. assert code == True
  42. result = {}
  43. # step3: 自定义函数
  44. if case_data['custom']:
  45. func = 'customer.%s' % case_data['custom']
  46. try:
  47. result = eval(func)
  48. case_result[case_data['test_name']] = result[0]
  49. code = result[1]
  50. except Exception as e:
  51. logging.error(e)
  52. code = False
  53. finally:
  54. assert code == True
  55. else:#非用户自定义函数
  56. result = send_request(case_data)
  57. case_result[case_data['test_name']] = result[0][1]
  58. assert check_result(case_data['check'], result) == True
  59. #step4:断言执行成功,获取正确的结果,将其存储cache
  60. #存储的模式 [output][case_name]
  61. data = {}
  62. if 'output' in case_data.keys():
  63. key = "%s.%s" % (case_data['output'], case_data['test_name'])
  64. if key not in cache_data.keys():
  65. cache_data[key] = {}
  66. if not case_data['custom']:
  67. data = result[0][1]
  68. else:
  69. data = result[0]
  70. cache_data[key] = data
  71. request.config.cache.set('cache_data', cache_data)
  72. time.sleep(case_data['waittime'])