123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-18 15:29:28
- # @Last Modified time: 2024-01-23 15:54:23
- # import re
- # import json
- # import unittest
- # from base64 import b64encode
- # from flask import url_for
- # from app import create_app, db
- # from app.models import *
- # class APITestCase(unittest.TestCase):
- # def setUp(self):
- # self.app = create_app('testing')
- # self.app_context = self.app.app_context()
- # self.app_context.push()
- # db.create_all()
- # self.client = self.app.test_client()
- # def tearDown(self):
- # db.session.remove()
- # db.drop_all()
- # self.app_context.pop()
- # def get_api_headers(self, username, password):
- # return {
- # 'Authorization': 'Basic ' + b64encode(
- # (username + ':' + password).encode('utf-8')).decode('utf-8'),
- # 'Accept': 'application/json',
- # 'Content-Type': 'application/json'
- # }
- # def test_404(self):
- # response = self.client.get(
- # '/wrong/url',
- # headers=self.get_api_headers('email', 'password'))
- # self.assertTrue(response.status_code == 404)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertTrue(json_response['error'] == 'not found')
- # def test_no_auth(self):
- # response = self.client.get(url_for('api.get_posts'),
- # content_type='application/json')
- # self.assertTrue(response.status_code == 401)
- # def test_bad_auth(self):
- # # add a user
- # r = Role.query.filter_by(name='User').first()
- # self.assertIsNotNone(r)
- # u = User(email='john@example.com', password='cat', confirmed=True,
- # role=r)
- # db.session.add(u)
- # db.session.commit()
- # # authenticate with bad password
- # response = self.client.get(
- # url_for('api.get_posts'),
- # headers=self.get_api_headers('john@example.com', 'dog'))
- # self.assertTrue(response.status_code == 401)
- # def test_token_auth(self):
- # # add a user
- # r = Role.query.filter_by(name='User').first()
- # self.assertIsNotNone(r)
- # u = User(email='john@example.com', password='cat', confirmed=True,
- # role=r)
- # db.session.add(u)
- # db.session.commit()
- # # issue a request with a bad token
- # response = self.client.get(
- # url_for('api.get_posts'),
- # headers=self.get_api_headers('bad-token', ''))
- # self.assertTrue(response.status_code == 401)
- # # get a token
- # response = self.client.get(
- # url_for('api.get_token'),
- # headers=self.get_api_headers('john@example.com', 'cat'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertIsNotNone(json_response.get('token'))
- # token = json_response['token']
- # # issue a request with the token
- # response = self.client.get(
- # url_for('api.get_posts'),
- # headers=self.get_api_headers(token, ''))
- # self.assertTrue(response.status_code == 200)
- # def test_anonymous(self):
- # response = self.client.get(
- # url_for('api.get_posts'),
- # headers=self.get_api_headers('', ''))
- # self.assertTrue(response.status_code == 200)
- # def test_unconfirmed_account(self):
- # # add an unconfirmed user
- # r = Role.query.filter_by(name='User').first()
- # self.assertIsNotNone(r)
- # u = User(email='john@example.com', password='cat', confirmed=False,
- # role=r)
- # db.session.add(u)
- # db.session.commit()
- # # get list of posts with the unconfirmed account
- # response = self.client.get(
- # url_for('api.get_posts'),
- # headers=self.get_api_headers('john@example.com', 'cat'))
- # self.assertTrue(response.status_code == 403)
- # def test_posts(self):
- # # add a user
- # r = Role.query.filter_by(name='User').first()
- # self.assertIsNotNone(r)
- # u = User(email='john@example.com', password='cat', confirmed=True,
- # role=r)
- # db.session.add(u)
- # db.session.commit()
- # # write an empty post
- # response = self.client.post(
- # url_for('api.new_post'),
- # headers=self.get_api_headers('john@example.com', 'cat'),
- # data=json.dumps({'body': ''}))
- # self.assertTrue(response.status_code == 400)
- # # write a post
- # response = self.client.post(
- # url_for('api.new_post'),
- # headers=self.get_api_headers('john@example.com', 'cat'),
- # data=json.dumps({'body': 'body of the *blog* post'}))
- # self.assertTrue(response.status_code == 201)
- # url = response.headers.get('Location')
- # self.assertIsNotNone(url)
- # # get the new post
- # response = self.client.get(
- # url,
- # headers=self.get_api_headers('john@example.com', 'cat'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertTrue(json_response['url'] == url)
- # self.assertTrue(json_response['body'] == 'body of the *blog* post')
- # self.assertTrue(json_response['body_html'] ==
- # '<p>body of the <em>blog</em> post</p>')
- # json_post = json_response
- # # get the post from the user
- # response = self.client.get(
- # url_for('api.get_user_posts', id=u.id),
- # headers=self.get_api_headers('john@example.com', 'cat'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertIsNotNone(json_response.get('posts'))
- # self.assertTrue(json_response.get('count', 0) == 1)
- # self.assertTrue(json_response['posts'][0] == json_post)
- # # get the post from the user as a follower
- # response = self.client.get(
- # url_for('api.get_user_followed_posts', id=u.id),
- # headers=self.get_api_headers('john@example.com', 'cat'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertIsNotNone(json_response.get('posts'))
- # self.assertTrue(json_response.get('count', 0) == 1)
- # self.assertTrue(json_response['posts'][0] == json_post)
- # # edit post
- # response = self.client.put(
- # url,
- # headers=self.get_api_headers('john@example.com', 'cat'),
- # data=json.dumps({'body': 'updated body'}))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertTrue(json_response['url'] == url)
- # self.assertTrue(json_response['body'] == 'updated body')
- # self.assertTrue(json_response['body_html'] == '<p>updated body</p>')
- # def test_users(self):
- # # add two users
- # r = Role.query.filter_by(name='User').first()
- # self.assertIsNotNone(r)
- # u1 = User(email='john@example.com', username='john',
- # password='cat', confirmed=True, role=r)
- # u2 = User(email='susan@example.com', username='susan',
- # password='dog', confirmed=True, role=r)
- # db.session.add_all([u1, u2])
- # db.session.commit()
- # # get users
- # response = self.client.get(
- # url_for('api.get_user', id=u1.id),
- # headers=self.get_api_headers('susan@example.com', 'dog'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertTrue(json_response['username'] == 'john')
- # response = self.client.get(
- # url_for('api.get_user', id=u2.id),
- # headers=self.get_api_headers('susan@example.com', 'dog'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertTrue(json_response['username'] == 'susan')
- # def test_comments(self):
- # # add two users
- # r = Role.query.filter_by(name='User').first()
- # self.assertIsNotNone(r)
- # u1 = User(email='john@example.com', username='john',
- # password='cat', confirmed=True, role=r)
- # u2 = User(email='susan@example.com', username='susan',
- # password='dog', confirmed=True, role=r)
- # db.session.add_all([u1, u2])
- # db.session.commit()
- # # add a post
- # post = Post(body='body of the post', author=u1)
- # db.session.add(post)
- # db.session.commit()
- # # write a comment
- # response = self.client.post(
- # url_for('api.new_post_comment', id=post.id),
- # headers=self.get_api_headers('susan@example.com', 'dog'),
- # data=json.dumps({'body': 'Good [post](http://example.com)!'}))
- # self.assertTrue(response.status_code == 201)
- # json_response = json.loads(response.data.decode('utf-8'))
- # url = response.headers.get('Location')
- # self.assertIsNotNone(url)
- # self.assertTrue(json_response['body'] ==
- # 'Good [post](http://example.com)!')
- # self.assertTrue(
- # re.sub('<.*?>', '', json_response['body_html']) == 'Good post!')
- # # get the new comment
- # response = self.client.get(
- # url,
- # headers=self.get_api_headers('john@example.com', 'cat'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertTrue(json_response['url'] == url)
- # self.assertTrue(json_response['body'] ==
- # 'Good [post](http://example.com)!')
- # # add another comment
- # comment = Comment(body='Thank you!', author=u1, post=post)
- # db.session.add(comment)
- # db.session.commit()
- # # get the two comments from the post
- # response = self.client.get(
- # url_for('api.get_post_comments', id=post.id),
- # headers=self.get_api_headers('susan@example.com', 'dog'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertIsNotNone(json_response.get('posts'))
- # self.assertTrue(json_response.get('count', 0) == 2)
- # # get all the comments
- # response = self.client.get(
- # url_for('api.get_comments', id=post.id),
- # headers=self.get_api_headers('susan@example.com', 'dog'))
- # self.assertTrue(response.status_code == 200)
- # json_response = json.loads(response.data.decode('utf-8'))
- # self.assertIsNotNone(json_response.get('posts'))
- # self.assertTrue(json_response.get('count', 0) == 2)
- import json, time
- from datetime import datetime
- import requests
- with open("../fuli_data.json", "r", encoding="utf-8") as fp:
- num = 0
- for line in fp.readlines():
- json_obj = json.loads(line)
- # json_obj['record_Time'] = json_obj['record_Time']/1000
- json_obj['device_Id'] = 'did'
- json_obj['full_Url'] = json_obj['current_Url']
- response = requests.post('http://localhost:7788/record', json=json_obj)
- print(response.text)
- time.sleep(0.2)
- num += 1
- if num > 40:
- break
|