test_api.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-18 15:29:28
  4. # @Last Modified time: 2024-01-31 11:43:10
  5. # import re
  6. # import json
  7. # import unittest
  8. # from base64 import b64encode
  9. # from flask import url_for
  10. # from app import create_app, db
  11. # from app.models import *
  12. # class APITestCase(unittest.TestCase):
  13. # def setUp(self):
  14. # self.app = create_app('testing')
  15. # self.app_context = self.app.app_context()
  16. # self.app_context.push()
  17. # db.create_all()
  18. # self.client = self.app.test_client()
  19. # def tearDown(self):
  20. # db.session.remove()
  21. # db.drop_all()
  22. # self.app_context.pop()
  23. # def get_api_headers(self, username, password):
  24. # return {
  25. # 'Authorization': 'Basic ' + b64encode(
  26. # (username + ':' + password).encode('utf-8')).decode('utf-8'),
  27. # 'Accept': 'application/json',
  28. # 'Content-Type': 'application/json'
  29. # }
  30. # def test_404(self):
  31. # response = self.client.get(
  32. # '/wrong/url',
  33. # headers=self.get_api_headers('email', 'password'))
  34. # self.assertTrue(response.status_code == 404)
  35. # json_response = json.loads(response.data.decode('utf-8'))
  36. # self.assertTrue(json_response['error'] == 'not found')
  37. # def test_no_auth(self):
  38. # response = self.client.get(url_for('api.get_posts'),
  39. # content_type='application/json')
  40. # self.assertTrue(response.status_code == 401)
  41. # def test_bad_auth(self):
  42. # # add a user
  43. # r = Role.query.filter_by(name='User').first()
  44. # self.assertIsNotNone(r)
  45. # u = User(email='john@example.com', password='cat', confirmed=True,
  46. # role=r)
  47. # db.session.add(u)
  48. # db.session.commit()
  49. # # authenticate with bad password
  50. # response = self.client.get(
  51. # url_for('api.get_posts'),
  52. # headers=self.get_api_headers('john@example.com', 'dog'))
  53. # self.assertTrue(response.status_code == 401)
  54. # def test_token_auth(self):
  55. # # add a user
  56. # r = Role.query.filter_by(name='User').first()
  57. # self.assertIsNotNone(r)
  58. # u = User(email='john@example.com', password='cat', confirmed=True,
  59. # role=r)
  60. # db.session.add(u)
  61. # db.session.commit()
  62. # # issue a request with a bad token
  63. # response = self.client.get(
  64. # url_for('api.get_posts'),
  65. # headers=self.get_api_headers('bad-token', ''))
  66. # self.assertTrue(response.status_code == 401)
  67. # # get a token
  68. # response = self.client.get(
  69. # url_for('api.get_token'),
  70. # headers=self.get_api_headers('john@example.com', 'cat'))
  71. # self.assertTrue(response.status_code == 200)
  72. # json_response = json.loads(response.data.decode('utf-8'))
  73. # self.assertIsNotNone(json_response.get('token'))
  74. # token = json_response['token']
  75. # # issue a request with the token
  76. # response = self.client.get(
  77. # url_for('api.get_posts'),
  78. # headers=self.get_api_headers(token, ''))
  79. # self.assertTrue(response.status_code == 200)
  80. # def test_anonymous(self):
  81. # response = self.client.get(
  82. # url_for('api.get_posts'),
  83. # headers=self.get_api_headers('', ''))
  84. # self.assertTrue(response.status_code == 200)
  85. # def test_unconfirmed_account(self):
  86. # # add an unconfirmed user
  87. # r = Role.query.filter_by(name='User').first()
  88. # self.assertIsNotNone(r)
  89. # u = User(email='john@example.com', password='cat', confirmed=False,
  90. # role=r)
  91. # db.session.add(u)
  92. # db.session.commit()
  93. # # get list of posts with the unconfirmed account
  94. # response = self.client.get(
  95. # url_for('api.get_posts'),
  96. # headers=self.get_api_headers('john@example.com', 'cat'))
  97. # self.assertTrue(response.status_code == 403)
  98. # def test_posts(self):
  99. # # add a user
  100. # r = Role.query.filter_by(name='User').first()
  101. # self.assertIsNotNone(r)
  102. # u = User(email='john@example.com', password='cat', confirmed=True,
  103. # role=r)
  104. # db.session.add(u)
  105. # db.session.commit()
  106. # # write an empty post
  107. # response = self.client.post(
  108. # url_for('api.new_post'),
  109. # headers=self.get_api_headers('john@example.com', 'cat'),
  110. # data=json.dumps({'body': ''}))
  111. # self.assertTrue(response.status_code == 400)
  112. # # write a post
  113. # response = self.client.post(
  114. # url_for('api.new_post'),
  115. # headers=self.get_api_headers('john@example.com', 'cat'),
  116. # data=json.dumps({'body': 'body of the *blog* post'}))
  117. # self.assertTrue(response.status_code == 201)
  118. # url = response.headers.get('Location')
  119. # self.assertIsNotNone(url)
  120. # # get the new post
  121. # response = self.client.get(
  122. # url,
  123. # headers=self.get_api_headers('john@example.com', 'cat'))
  124. # self.assertTrue(response.status_code == 200)
  125. # json_response = json.loads(response.data.decode('utf-8'))
  126. # self.assertTrue(json_response['url'] == url)
  127. # self.assertTrue(json_response['body'] == 'body of the *blog* post')
  128. # self.assertTrue(json_response['body_html'] ==
  129. # '<p>body of the <em>blog</em> post</p>')
  130. # json_post = json_response
  131. # # get the post from the user
  132. # response = self.client.get(
  133. # url_for('api.get_user_posts', id=u.id),
  134. # headers=self.get_api_headers('john@example.com', 'cat'))
  135. # self.assertTrue(response.status_code == 200)
  136. # json_response = json.loads(response.data.decode('utf-8'))
  137. # self.assertIsNotNone(json_response.get('posts'))
  138. # self.assertTrue(json_response.get('count', 0) == 1)
  139. # self.assertTrue(json_response['posts'][0] == json_post)
  140. # # get the post from the user as a follower
  141. # response = self.client.get(
  142. # url_for('api.get_user_followed_posts', id=u.id),
  143. # headers=self.get_api_headers('john@example.com', 'cat'))
  144. # self.assertTrue(response.status_code == 200)
  145. # json_response = json.loads(response.data.decode('utf-8'))
  146. # self.assertIsNotNone(json_response.get('posts'))
  147. # self.assertTrue(json_response.get('count', 0) == 1)
  148. # self.assertTrue(json_response['posts'][0] == json_post)
  149. # # edit post
  150. # response = self.client.put(
  151. # url,
  152. # headers=self.get_api_headers('john@example.com', 'cat'),
  153. # data=json.dumps({'body': 'updated body'}))
  154. # self.assertTrue(response.status_code == 200)
  155. # json_response = json.loads(response.data.decode('utf-8'))
  156. # self.assertTrue(json_response['url'] == url)
  157. # self.assertTrue(json_response['body'] == 'updated body')
  158. # self.assertTrue(json_response['body_html'] == '<p>updated body</p>')
  159. # def test_users(self):
  160. # # add two users
  161. # r = Role.query.filter_by(name='User').first()
  162. # self.assertIsNotNone(r)
  163. # u1 = User(email='john@example.com', username='john',
  164. # password='cat', confirmed=True, role=r)
  165. # u2 = User(email='susan@example.com', username='susan',
  166. # password='dog', confirmed=True, role=r)
  167. # db.session.add_all([u1, u2])
  168. # db.session.commit()
  169. # # get users
  170. # response = self.client.get(
  171. # url_for('api.get_user', id=u1.id),
  172. # headers=self.get_api_headers('susan@example.com', 'dog'))
  173. # self.assertTrue(response.status_code == 200)
  174. # json_response = json.loads(response.data.decode('utf-8'))
  175. # self.assertTrue(json_response['username'] == 'john')
  176. # response = self.client.get(
  177. # url_for('api.get_user', id=u2.id),
  178. # headers=self.get_api_headers('susan@example.com', 'dog'))
  179. # self.assertTrue(response.status_code == 200)
  180. # json_response = json.loads(response.data.decode('utf-8'))
  181. # self.assertTrue(json_response['username'] == 'susan')
  182. # def test_comments(self):
  183. # # add two users
  184. # r = Role.query.filter_by(name='User').first()
  185. # self.assertIsNotNone(r)
  186. # u1 = User(email='john@example.com', username='john',
  187. # password='cat', confirmed=True, role=r)
  188. # u2 = User(email='susan@example.com', username='susan',
  189. # password='dog', confirmed=True, role=r)
  190. # db.session.add_all([u1, u2])
  191. # db.session.commit()
  192. # # add a post
  193. # post = Post(body='body of the post', author=u1)
  194. # db.session.add(post)
  195. # db.session.commit()
  196. # # write a comment
  197. # response = self.client.post(
  198. # url_for('api.new_post_comment', id=post.id),
  199. # headers=self.get_api_headers('susan@example.com', 'dog'),
  200. # data=json.dumps({'body': 'Good [post](http://example.com)!'}))
  201. # self.assertTrue(response.status_code == 201)
  202. # json_response = json.loads(response.data.decode('utf-8'))
  203. # url = response.headers.get('Location')
  204. # self.assertIsNotNone(url)
  205. # self.assertTrue(json_response['body'] ==
  206. # 'Good [post](http://example.com)!')
  207. # self.assertTrue(
  208. # re.sub('<.*?>', '', json_response['body_html']) == 'Good post!')
  209. # # get the new comment
  210. # response = self.client.get(
  211. # url,
  212. # headers=self.get_api_headers('john@example.com', 'cat'))
  213. # self.assertTrue(response.status_code == 200)
  214. # json_response = json.loads(response.data.decode('utf-8'))
  215. # self.assertTrue(json_response['url'] == url)
  216. # self.assertTrue(json_response['body'] ==
  217. # 'Good [post](http://example.com)!')
  218. # # add another comment
  219. # comment = Comment(body='Thank you!', author=u1, post=post)
  220. # db.session.add(comment)
  221. # db.session.commit()
  222. # # get the two comments from the post
  223. # response = self.client.get(
  224. # url_for('api.get_post_comments', id=post.id),
  225. # headers=self.get_api_headers('susan@example.com', 'dog'))
  226. # self.assertTrue(response.status_code == 200)
  227. # json_response = json.loads(response.data.decode('utf-8'))
  228. # self.assertIsNotNone(json_response.get('posts'))
  229. # self.assertTrue(json_response.get('count', 0) == 2)
  230. # # get all the comments
  231. # response = self.client.get(
  232. # url_for('api.get_comments', id=post.id),
  233. # headers=self.get_api_headers('susan@example.com', 'dog'))
  234. # self.assertTrue(response.status_code == 200)
  235. # json_response = json.loads(response.data.decode('utf-8'))
  236. # self.assertIsNotNone(json_response.get('posts'))
  237. # self.assertTrue(json_response.get('count', 0) == 2)
  238. import json, time
  239. from datetime import datetime
  240. import requests
  241. with open("../fuli_data.json", "r", encoding="utf-8") as fp:
  242. # with open("../liguiwen@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  243. # with open("../houhuiteng@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  244. # with open("../lijie1013@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  245. # with open("../zengjiyi@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  246. # with open("../zengjunhao@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  247. # with open("../houyue@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  248. # with open("../longruifan@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  249. # with open("../wangyongqiang@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  250. # with open("../liuhuan0828@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  251. # with open("../heweiting@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  252. # with open("../ganwenfeng@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  253. # with open("../chenweichang0607@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  254. # with open("../caiwujiancha1@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  255. # with open("../wenyongxian@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp:
  256. num = 0
  257. for line in fp.readlines():
  258. json_obj = json.loads(line)
  259. # json_obj['record_Time'] = json_obj['record_Time']/1000
  260. json_obj['device_Id'] = 'did'
  261. json_obj['full_Url'] = json_obj['current_Url']
  262. response = requests.post('http://localhost:7788/record', json=json_obj)
  263. print(response.text)
  264. time.sleep(0.2)
  265. num += 1
  266. if num > 100:
  267. break
  268. response = requests.get('http://localhost:7788/screen3')
  269. # response = requests.get('http://localhost:7788/screen3?user_id=liguiwen@mz.gd.csg.cn')
  270. # response = requests.get('http://localhost:7788/screen3?user_id=houhuiteng@mz.gd.csg.cn')
  271. # response = requests.get('http://localhost:7788/screen3?user_id=lijie1013@mz.gd.csg.cn')
  272. # response = requests.get('http://localhost:7788/screen3?user_id=zengjiyi@mz.gd.csg.cn')
  273. # response = requests.get('http://localhost:7788/screen3?user_id=zengjunhao@mz.gd.csg.cn')
  274. # response = requests.get('http://localhost:7788/screen3?user_id=houyue@mz.gd.csg.cn')
  275. # response = requests.get('http://localhost:7788/screen3?user_id=longruifan@mz.gd.csg.cn')
  276. # response = requests.get('http://localhost:7788/screen3?user_id=wangyongqiang@mz.gd.csg.cn')
  277. # response = requests.get('http://localhost:7788/screen3?user_id=liuhuan0828@mz.gd.csg.cn')
  278. # response = requests.get('http://localhost:7788/screen3?user_id=heweiting@mz.gd.csg.cn')
  279. # response = requests.get('http://localhost:7788/screen3?user_id=ganwenfeng@mz.gd.csg.cn')
  280. # response = requests.get('http://localhost:7788/screen3?user_id=chenweichang0607@mz.gd.csg.cn')
  281. # response = requests.get('http://localhost:7788/screen3?user_id=caiwujiancha1@mz.gd.csg.cn')
  282. # response = requests.get('http://localhost:7788/screen3?user_id=wenyongxian@mz.gd.csg.cn')