#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-18 15:29:28 # @Last Modified time: 2024-01-31 11:43:10 # import re # import json # import unittest # from base64 import b64encode # from flask import url_for # from app import create_app, db # from app.models import * # class APITestCase(unittest.TestCase): # def setUp(self): # self.app = create_app('testing') # self.app_context = self.app.app_context() # self.app_context.push() # db.create_all() # self.client = self.app.test_client() # def tearDown(self): # db.session.remove() # db.drop_all() # self.app_context.pop() # def get_api_headers(self, username, password): # return { # 'Authorization': 'Basic ' + b64encode( # (username + ':' + password).encode('utf-8')).decode('utf-8'), # 'Accept': 'application/json', # 'Content-Type': 'application/json' # } # def test_404(self): # response = self.client.get( # '/wrong/url', # headers=self.get_api_headers('email', 'password')) # self.assertTrue(response.status_code == 404) # json_response = json.loads(response.data.decode('utf-8')) # self.assertTrue(json_response['error'] == 'not found') # def test_no_auth(self): # response = self.client.get(url_for('api.get_posts'), # content_type='application/json') # self.assertTrue(response.status_code == 401) # def test_bad_auth(self): # # add a user # r = Role.query.filter_by(name='User').first() # self.assertIsNotNone(r) # u = User(email='john@example.com', password='cat', confirmed=True, # role=r) # db.session.add(u) # db.session.commit() # # authenticate with bad password # response = self.client.get( # url_for('api.get_posts'), # headers=self.get_api_headers('john@example.com', 'dog')) # self.assertTrue(response.status_code == 401) # def test_token_auth(self): # # add a user # r = Role.query.filter_by(name='User').first() # self.assertIsNotNone(r) # u = User(email='john@example.com', password='cat', confirmed=True, # role=r) # db.session.add(u) # db.session.commit() # # issue a request with a bad token # response = self.client.get( # url_for('api.get_posts'), # headers=self.get_api_headers('bad-token', '')) # self.assertTrue(response.status_code == 401) # # get a token # response = self.client.get( # url_for('api.get_token'), # headers=self.get_api_headers('john@example.com', 'cat')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertIsNotNone(json_response.get('token')) # token = json_response['token'] # # issue a request with the token # response = self.client.get( # url_for('api.get_posts'), # headers=self.get_api_headers(token, '')) # self.assertTrue(response.status_code == 200) # def test_anonymous(self): # response = self.client.get( # url_for('api.get_posts'), # headers=self.get_api_headers('', '')) # self.assertTrue(response.status_code == 200) # def test_unconfirmed_account(self): # # add an unconfirmed user # r = Role.query.filter_by(name='User').first() # self.assertIsNotNone(r) # u = User(email='john@example.com', password='cat', confirmed=False, # role=r) # db.session.add(u) # db.session.commit() # # get list of posts with the unconfirmed account # response = self.client.get( # url_for('api.get_posts'), # headers=self.get_api_headers('john@example.com', 'cat')) # self.assertTrue(response.status_code == 403) # def test_posts(self): # # add a user # r = Role.query.filter_by(name='User').first() # self.assertIsNotNone(r) # u = User(email='john@example.com', password='cat', confirmed=True, # role=r) # db.session.add(u) # db.session.commit() # # write an empty post # response = self.client.post( # url_for('api.new_post'), # headers=self.get_api_headers('john@example.com', 'cat'), # data=json.dumps({'body': ''})) # self.assertTrue(response.status_code == 400) # # write a post # response = self.client.post( # url_for('api.new_post'), # headers=self.get_api_headers('john@example.com', 'cat'), # data=json.dumps({'body': 'body of the *blog* post'})) # self.assertTrue(response.status_code == 201) # url = response.headers.get('Location') # self.assertIsNotNone(url) # # get the new post # response = self.client.get( # url, # headers=self.get_api_headers('john@example.com', 'cat')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertTrue(json_response['url'] == url) # self.assertTrue(json_response['body'] == 'body of the *blog* post') # self.assertTrue(json_response['body_html'] == # '
body of the blog post
') # json_post = json_response # # get the post from the user # response = self.client.get( # url_for('api.get_user_posts', id=u.id), # headers=self.get_api_headers('john@example.com', 'cat')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertIsNotNone(json_response.get('posts')) # self.assertTrue(json_response.get('count', 0) == 1) # self.assertTrue(json_response['posts'][0] == json_post) # # get the post from the user as a follower # response = self.client.get( # url_for('api.get_user_followed_posts', id=u.id), # headers=self.get_api_headers('john@example.com', 'cat')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertIsNotNone(json_response.get('posts')) # self.assertTrue(json_response.get('count', 0) == 1) # self.assertTrue(json_response['posts'][0] == json_post) # # edit post # response = self.client.put( # url, # headers=self.get_api_headers('john@example.com', 'cat'), # data=json.dumps({'body': 'updated body'})) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertTrue(json_response['url'] == url) # self.assertTrue(json_response['body'] == 'updated body') # self.assertTrue(json_response['body_html'] == 'updated body
') # def test_users(self): # # add two users # r = Role.query.filter_by(name='User').first() # self.assertIsNotNone(r) # u1 = User(email='john@example.com', username='john', # password='cat', confirmed=True, role=r) # u2 = User(email='susan@example.com', username='susan', # password='dog', confirmed=True, role=r) # db.session.add_all([u1, u2]) # db.session.commit() # # get users # response = self.client.get( # url_for('api.get_user', id=u1.id), # headers=self.get_api_headers('susan@example.com', 'dog')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertTrue(json_response['username'] == 'john') # response = self.client.get( # url_for('api.get_user', id=u2.id), # headers=self.get_api_headers('susan@example.com', 'dog')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertTrue(json_response['username'] == 'susan') # def test_comments(self): # # add two users # r = Role.query.filter_by(name='User').first() # self.assertIsNotNone(r) # u1 = User(email='john@example.com', username='john', # password='cat', confirmed=True, role=r) # u2 = User(email='susan@example.com', username='susan', # password='dog', confirmed=True, role=r) # db.session.add_all([u1, u2]) # db.session.commit() # # add a post # post = Post(body='body of the post', author=u1) # db.session.add(post) # db.session.commit() # # write a comment # response = self.client.post( # url_for('api.new_post_comment', id=post.id), # headers=self.get_api_headers('susan@example.com', 'dog'), # data=json.dumps({'body': 'Good [post](http://example.com)!'})) # self.assertTrue(response.status_code == 201) # json_response = json.loads(response.data.decode('utf-8')) # url = response.headers.get('Location') # self.assertIsNotNone(url) # self.assertTrue(json_response['body'] == # 'Good [post](http://example.com)!') # self.assertTrue( # re.sub('<.*?>', '', json_response['body_html']) == 'Good post!') # # get the new comment # response = self.client.get( # url, # headers=self.get_api_headers('john@example.com', 'cat')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertTrue(json_response['url'] == url) # self.assertTrue(json_response['body'] == # 'Good [post](http://example.com)!') # # add another comment # comment = Comment(body='Thank you!', author=u1, post=post) # db.session.add(comment) # db.session.commit() # # get the two comments from the post # response = self.client.get( # url_for('api.get_post_comments', id=post.id), # headers=self.get_api_headers('susan@example.com', 'dog')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertIsNotNone(json_response.get('posts')) # self.assertTrue(json_response.get('count', 0) == 2) # # get all the comments # response = self.client.get( # url_for('api.get_comments', id=post.id), # headers=self.get_api_headers('susan@example.com', 'dog')) # self.assertTrue(response.status_code == 200) # json_response = json.loads(response.data.decode('utf-8')) # self.assertIsNotNone(json_response.get('posts')) # self.assertTrue(json_response.get('count', 0) == 2) import json, time from datetime import datetime import requests with open("../fuli_data.json", "r", encoding="utf-8") as fp: # with open("../liguiwen@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../houhuiteng@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../lijie1013@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../zengjiyi@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../zengjunhao@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../houyue@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../longruifan@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../wangyongqiang@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../liuhuan0828@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../heweiting@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../ganwenfeng@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../chenweichang0607@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../caiwujiancha1@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: # with open("../wenyongxian@mz.gd.csg.cn_data.json", "r", encoding="utf-8") as fp: num = 0 for line in fp.readlines(): json_obj = json.loads(line) # json_obj['record_Time'] = json_obj['record_Time']/1000 json_obj['device_Id'] = 'did' json_obj['full_Url'] = json_obj['current_Url'] response = requests.post('http://localhost:7788/record', json=json_obj) print(response.text) time.sleep(0.2) num += 1 if num > 100: break response = requests.get('http://localhost:7788/screen3') # response = requests.get('http://localhost:7788/screen3?user_id=liguiwen@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=houhuiteng@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=lijie1013@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=zengjiyi@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=zengjunhao@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=houyue@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=longruifan@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=wangyongqiang@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=liuhuan0828@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=heweiting@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=ganwenfeng@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=chenweichang0607@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=caiwujiancha1@mz.gd.csg.cn') # response = requests.get('http://localhost:7788/screen3?user_id=wenyongxian@mz.gd.csg.cn')