123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-08 15:46:48
- # @Last Modified time: 2024-02-23 13:02:59
- import json
- import pandas as pd
- from typing import Optional, List
- from datetime import datetime
- from pydantic import BaseModel
- from flask import request, jsonify, current_app
- from flask_pydantic import validate
- from . import main
- from ..models import *
- from ..tasks import tasks
- g = dict()
- class RecodModel(BaseModel):
- record_Time: datetime
- current_Url: str
- full_Url: Optional[str] = None
- sim_Url: Optional[str] = None
- user_Id: str
- device_Id: Optional[str] = None
- request_Method: str
- event_Type: str
- form_Data: Optional[str] = None
- json_Data: Optional[str] = None
- value_Data: Optional[str] = None
- status_Code: Optional[int] = None
- target_Url: Optional[str] = None
- iframe_Url: Optional[str] = None
- button_Text: Optional[str] = None
- class RocordListModel(BaseModel):
- items: List[RecodModel]
- @main.route('/batch', methods=['GET', 'POST'])
- @validate()
- def batch_record(body: RocordListModel) -> dict:
- # 1、先收集原始数据,BaseModel接收,dict传输,dataframe缓存,DM存储
- records = []
- for item in body.items:
- tasks.predict(g, item=item.dict())
- records.append(Record(Record_Time = item.record_Time, Current_Url = item.current_Url, User_Id = item.user_Id, Request_Method = item.request_Method,
- Event_Type = item.event_Type, Target_Url = item.target_Url, Button_Text = item.button_Text))
- try:
- db.session.add_all(records)
- db.session.commit()
- except Exception as e:
- db.session.rollback()
- print('--->', e)
- # 2、收集用户ID和时间,Tendis存储,每日清零
- redis_store = current_app.redis
- if not redis_store.get(body.items[0].user_Id):
- redis_store.set(body.items[0].user_Id, json.dumps({"start": body.items[0].record_Time.strftime('%Y-%m-%d %H:%M:%S'),
- "end": body.items[0].record_Time.strftime('%Y-%m-%d %H:%M:%S')}))
- else:
- value = json.loads(redis_store.get(body.items[0].user_Id))
- value["end"] = body.items[-1].record_Time.strftime('%Y-%m-%d %H:%M:%S')
- redis_store.set(body.items[0].user_Id, json.dumps(value))
- # 3、Tendis设置当前缓存取占用大小
- if not redis_store.get(body.items[0].user_Id + '_size'):
- redis_store.set(body.items[0].user_Id + '_size', len(body.items))
- else:
- size = redis_store.get(body.items[0].user_Id + '_size')
- size = int(size) + len(body.items)
- redis_store.set(body.items[0].user_Id + '_size', size)
- return jsonify({"status": "success"})
- @main.route('/record', methods=['GET', 'POST'])
- @validate()
- def record(body: RecodModel) -> dict:
- """接收采集数据,记录临时表
- """
- # 1、先收集原始数据,BaseModel接收,dict传输,dataframe缓存,DM存储
- tasks.predict(g, item=body.dict())
- # 2、收集用户ID和时间,Tendis存储,每日清零
- redis_store = current_app.redis
- if not redis_store.get(body.user_Id):
- redis_store.set(body.user_Id, json.dumps({"start": body.record_Time.strftime('%Y-%m-%d %H:%M:%S'),
- "end": body.record_Time.strftime('%Y-%m-%d %H:%M:%S')}))
- else:
- value = json.loads(redis_store.get(body.user_Id))
- value["end"] = body.record_Time.strftime('%Y-%m-%d %H:%M:%S')
- redis_store.set(body.user_Id, json.dumps(value))
- try:
- # --- 采集原始表 --- #
- record = Record()
- record.Record_Time = body.record_Time
- record.Current_Url = body.current_Url
- record.Full_Url = body.full_Url
- record.User_Id = body.user_Id
- record.Request_Method = body.request_Method
- record.Event_Type = body.event_Type
- record.Target_Url = body.target_Url
- record.Button_Text = body.button_Text
- db.session.add(record)
- db.session.commit()
- # print(body.dict())
- # client.send(body.dict())
- # response = client.recv()
- # # --- 采集总量 +1 --- #
- # try:
- # tc = TotalCount.query.filter_by(Name = '采集量').one()
- # tc.add_one()
- # db.session.add(tc)
- # db.session.commit()
- # except Exception as e:
- # db.session.rollback()
- # print('--->', e)
- # # --- 解析访问域 --- #
- # df = pd.DataFrame([body.dict()])
- # result = parser.process(df)
- # try:
- # if result.get("domain") == "人资域":
- # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"RZ_Visit": True})
- # elif result.get("domain") == "资产域":
- # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"ZC_Visit": True})
- # elif result.get("domain") == "财务域":
- # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"CW_Visit": True})
- # elif result.get("domain") == "营销域":
- # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"YX_Visit": True})
- # db.session.commit()
- # except Exception as e:
- # db.session.rollback()
- # print('--->', e)
- except Exception as e:
- db.session.rollback()
- print('--->', e)
- return jsonify({"status": "failed"})
- return jsonify({"status": "success"})
- @main.route('/analyse', methods=['GET', 'POST'])
- def analyse():
- # 处理某个用户 ID 的数据
- user_id = request.args.get('user_id')
- # 处理时间片内用户记录
- for item in g[user_id].getActionByDateRange().itertuples():
- row = {
- "Record_Time": item.time, "Current_Url": item.cur_url ,# + '?' + item.cur_que,
- "Target_Url": item.tar_url ,# + '?' + item.tar_que,
- "User_Id": user_id,
- "Request_Method": item.req_met, "Form_Data": item.for_dat,
- "Button_Text": item.but_txt, "Is_Op": item.is_op,
- }
- # 获取概率
- prob = g[user_id].getHeatMapValue(item.cur_url.split("?")[0] + '-' + item.tar_url.split("?")[0] + '-' + item.acc)
- print(prob)
- # 后处理,URL
- parser = tasks.UrlParser()
- result = parser.process(row)
- # 后处理成功,返回数据,后处理失败,抛弃数据
- if result.get('time'):
- json_result = tasks.ResultJson(
- time=result['time'], user_id=result['user'], domain=result['domain'],
- title1=result['一级标题'], title2=result['二级标题'], title3=result['三级标题'],
- text=result['text'], is_op=item.is_op, is_bus=item.is_bus, is_bias=True if item.is_bias or (prob < 1e-3) else False
- ).to_json()
- print(json_result)
- tasks.postback(json_result)
- return jsonify({})
- @main.route('/heatmap', methods=['GET', 'POST'])
- def heatmap():
- # 处理某个用户 ID 的数据
- user_id = request.args.get('user_id')
- # 获取某个用户的行为路径
- g[user_id].getActionByDateRange()
- # 生成行为路径概率图
- g[user_id].getClickCategory2Count()
- return jsonify({})
- @main.route('/test', methods=['GET'])
- def test():
- user_id = request.args.get('user_id')
- from ..tasks import UserActivateDegreeAnalyze
- g[user_id] = UserActivateDegreeAnalyze.UserActivateDegreeAnalyzeSpace(user_id)
- return jsonify({"status": "success"})
- @main.route('/testpush', methods=['GET'])
- def testpush():
- user_id = request.args.get('user_id')
- g[user_id].read_from_json()
- return jsonify({"status": "success"})
|