#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-08 15:46:48 # @Last Modified time: 2024-01-31 11:27:23 import threading import pandas as pd from typing import Optional from datetime import datetime from pydantic import BaseModel from flask import request, jsonify from flask_pydantic import validate from multiprocessing.connection import Client from . import main from .preprocess import parse_url from ..models import * from ..tasks import tasks # client = Client(('127.0.0.1', 35010)) g = dict() class RecodModel(BaseModel): record_Time: datetime current_Url: str full_Url: Optional[str] = None sim_Url: Optional[str] = None user_Id: str device_Id: Optional[str] = None request_Method: str event_Type: str form_Data: Optional[str] = None json_Data: Optional[str] = None value_Data: Optional[str] = None status_Code: Optional[int] = None target_Url: Optional[str] = None iframe_Url: Optional[str] = None button_Text: Optional[str] = None @main.route('/record', methods=['GET', 'POST']) @validate() def record(body: RecodModel): """接收采集数据,记录临时表 """ # 1、先收集原始数据,BaseModel接收,dict传输,dataframe缓存,DM存储 tasks.predict(g, item=body.dict()) # try: # # --- 采集原始表 --- # # record = Record() # record.Record_Time = body.record_Time # record.Current_Url = body.current_Url # record.Full_Url = body.full_Url # record.User_Id = body.user_Id # record.Request_Method = body.request_Method # record.Event_Type = body.event_Type # record.Target_Url = body.target_Url # record.Button_Text = body.button_Text # db.session.add(record) # db.session.commit() # print(body.dict()) # client.send(body.dict()) # response = client.recv() # # --- 采集总量 +1 --- # # try: # tc = TotalCount.query.filter_by(Name = '采集量').one() # tc.add_one() # db.session.add(tc) # db.session.commit() # except Exception as e: # db.session.rollback() # print('--->', e) # # --- 登录人数 +1 --- # # pl = PersonLogin.query.filter_by(User_Id = body.user_Id) # if pl.count() == 0: # try: # db.session.add(PersonLogin(user_id = body.user_Id)) # db.session.commit() # except Exception as e: # db.session.rollback() # print('--->', e) # # --- 解析访问域 --- # # df = pd.DataFrame([body.dict()]) # result = parser.process(df) # try: # if result.get("domain") == "人资域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"RZ_Visit": True}) # elif result.get("domain") == "资产域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"ZC_Visit": True}) # elif result.get("domain") == "财务域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"CW_Visit": True}) # elif result.get("domain") == "营销域": # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"YX_Visit": True}) # db.session.commit() # except Exception as e: # db.session.rollback() # print('--->', e) # except Exception as e: # db.session.rollback() # print('--->', e) # return jsonify({"status": "failed"}) return jsonify({"status": "success"}) @main.route('/screen3', methods=['GET', 'POST']) def screen3(): user_id = request.args.get('user_id') or 'fuli@mz.gd.csg.cn' for item in g[user_id].getActionByDateRange().itertuples(): row = { "Record_Time": item[0], "Current_Url": item.cur_url + '?' + item.cur_que, "Target_Url": item.tar_url + '?' + item.tar_que, "User_Id": user_id, "Request_Method": item.req_met, "Form_Data": item.for_dat, "Button_Text": item.but_txt, "Is_Op": item.is_op, } print(item.cur_url, item.tar_url) parser = tasks.UrlParser() result = parser.process(row) try: print(tasks.ResultJson(time=result['time'], user_id=result['user'], domain=result['domain'], title1=result['一级标题'], title2=result['二级标题'], title3=result['三级标题'], text=result['text'], is_op=item.is_op, is_bus=False).to_json()) except Exception as e: print(e) # tasks.postback(result) return jsonify({})