#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-08 15:46:48 # @Last Modified time: 2024-02-23 13:02:59 import json import pandas as pd from typing import Optional, List from datetime import datetime from pydantic import BaseModel from flask import request, jsonify, current_app from flask_pydantic import validate from . import main from ..models import * from ..tasks import tasks g = dict() class RecodModel(BaseModel): record_Time: datetime current_Url: str full_Url: Optional[str] = None sim_Url: Optional[str] = None user_Id: str device_Id: Optional[str] = None request_Method: str event_Type: str form_Data: Optional[str] = None json_Data: Optional[str] = None value_Data: Optional[str] = None status_Code: Optional[int] = None target_Url: Optional[str] = None iframe_Url: Optional[str] = None button_Text: Optional[str] = None class RocordListModel(BaseModel): items: List[RecodModel] @main.route('/batch', methods=['GET', 'POST']) @validate() def batch_record(body: RocordListModel) -> dict: # 1、先收集原始数据,BaseModel接收,dict传输,dataframe缓存,DM存储 records = [] for item in body.items: tasks.predict(g, item=item.dict()) records.append(Record(Record_Time = item.record_Time, Current_Url = item.current_Url, User_Id = item.user_Id, Request_Method = item.request_Method, Event_Type = item.event_Type, Target_Url = item.target_Url, Button_Text = item.button_Text)) try: db.session.add_all(records) db.session.commit() except Exception as e: db.session.rollback() print('--->', e) # 2、收集用户ID和时间,Tendis存储,每日清零 redis_store = current_app.redis if not redis_store.get(body.items[0].user_Id): redis_store.set(body.items[0].user_Id, json.dumps({"start": body.items[0].record_Time.strftime('%Y-%m-%d %H:%M:%S'), "end": body.items[0].record_Time.strftime('%Y-%m-%d %H:%M:%S')})) else: value = json.loads(redis_store.get(body.items[0].user_Id)) value["end"] = body.items[-1].record_Time.strftime('%Y-%m-%d %H:%M:%S') redis_store.set(body.items[0].user_Id, json.dumps(value)) # 3、Tendis设置当前缓存取占用大小 if not redis_store.get(body.items[0].user_Id + '_size'): redis_store.set(body.items[0].user_Id + '_size', len(body.items)) else: size = redis_store.get(body.items[0].user_Id + '_size') size = int(size) + len(body.items) redis_store.set(body.items[0].user_Id + '_size', size) return jsonify({"status": "success"}) @main.route('/record', methods=['GET', 'POST']) @validate() def record(body: RecodModel) -> dict: """接收采集数据,记录临时表 """ # 1、先收集原始数据,BaseModel接收,dict传输,dataframe缓存,DM存储 tasks.predict(g, item=body.dict()) # 2、收集用户ID和时间,Tendis存储,每日清零 redis_store = current_app.redis if not redis_store.get(body.user_Id): redis_store.set(body.user_Id, json.dumps({"start": body.record_Time.strftime('%Y-%m-%d %H:%M:%S'), "end": body.record_Time.strftime('%Y-%m-%d %H:%M:%S')})) else: value = json.loads(redis_store.get(body.user_Id)) value["end"] = body.record_Time.strftime('%Y-%m-%d %H:%M:%S') redis_store.set(body.user_Id, json.dumps(value)) try: # --- 采集原始表 --- # record = Record() record.Record_Time = body.record_Time record.Current_Url = body.current_Url record.Full_Url = body.full_Url record.User_Id = body.user_Id record.Request_Method = body.request_Method record.Event_Type = body.event_Type record.Target_Url = body.target_Url record.Button_Text = body.button_Text db.session.add(record) db.session.commit() # print(body.dict()) # client.send(body.dict()) # response = client.recv() # # --- 采集总量 +1 --- # # try: # tc = TotalCount.query.filter_by(Name = '采集量').one() # tc.add_one() # db.session.add(tc) # db.session.commit() # except Exception as e: # db.session.rollback() # print('--->', e) # # --- 解析访问域 --- # # df = pd.DataFrame([body.dict()]) # result = parser.process(df) # try: # if result.get("domain") == "人资域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"RZ_Visit": True}) # elif result.get("domain") == "资产域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"ZC_Visit": True}) # elif result.get("domain") == "财务域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"CW_Visit": True}) # elif result.get("domain") == "营销域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"YX_Visit": True}) # db.session.commit() # except Exception as e: # db.session.rollback() # print('--->', e) except Exception as e: db.session.rollback() print('--->', e) return jsonify({"status": "failed"}) return jsonify({"status": "success"}) @main.route('/analyse', methods=['GET', 'POST']) def analyse(): # 处理某个用户 ID 的数据 user_id = request.args.get('user_id') # 处理时间片内用户记录 for item in g[user_id].getActionByDateRange().itertuples(): row = { "Record_Time": item.time, "Current_Url": item.cur_url ,# + '?' + item.cur_que, "Target_Url": item.tar_url ,# + '?' + item.tar_que, "User_Id": user_id, "Request_Method": item.req_met, "Form_Data": item.for_dat, "Button_Text": item.but_txt, "Is_Op": item.is_op, } # 获取概率 prob = g[user_id].getHeatMapValue(item.cur_url.split("?")[0] + '-' + item.tar_url.split("?")[0] + '-' + item.acc) print(prob) # 后处理,URL parser = tasks.UrlParser() result = parser.process(row) # 后处理成功,返回数据,后处理失败,抛弃数据 if result.get('time'): json_result = tasks.ResultJson( time=result['time'], user_id=result['user'], domain=result['domain'], title1=result['一级标题'], title2=result['二级标题'], title3=result['三级标题'], text=result['text'], is_op=item.is_op, is_bus=item.is_bus, is_bias=True if item.is_bias or (prob < 1e-3) else False ).to_json() print(json_result) tasks.postback(json_result) return jsonify({}) @main.route('/heatmap', methods=['GET', 'POST']) def heatmap(): # 处理某个用户 ID 的数据 user_id = request.args.get('user_id') # 获取某个用户的行为路径 g[user_id].getActionByDateRange() # 生成行为路径概率图 g[user_id].getClickCategory2Count() return jsonify({}) @main.route('/test', methods=['GET']) def test(): user_id = request.args.get('user_id') from ..tasks import UserActivateDegreeAnalyze g[user_id] = UserActivateDegreeAnalyze.UserActivateDegreeAnalyzeSpace(user_id) return jsonify({"status": "success"}) @main.route('/testpush', methods=['GET']) def testpush(): user_id = request.args.get('user_id') g[user_id].read_from_json() return jsonify({"status": "success"})