views.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-08 15:46:48
  4. # @Last Modified time: 2024-02-23 13:02:59
  5. import json
  6. import pandas as pd
  7. from typing import Optional, List
  8. from datetime import datetime
  9. from pydantic import BaseModel
  10. from flask import request, jsonify, current_app
  11. from flask_pydantic import validate
  12. from . import main
  13. from ..models import *
  14. from ..tasks import tasks
  15. g = dict()
  16. class RecodModel(BaseModel):
  17. record_Time: datetime
  18. current_Url: str
  19. full_Url: Optional[str] = None
  20. sim_Url: Optional[str] = None
  21. user_Id: str
  22. device_Id: Optional[str] = None
  23. request_Method: str
  24. event_Type: str
  25. form_Data: Optional[str] = None
  26. json_Data: Optional[str] = None
  27. value_Data: Optional[str] = None
  28. status_Code: Optional[int] = None
  29. target_Url: Optional[str] = None
  30. iframe_Url: Optional[str] = None
  31. button_Text: Optional[str] = None
  32. class RocordListModel(BaseModel):
  33. items: List[RecodModel]
  34. @main.route('/batch', methods=['GET', 'POST'])
  35. @validate()
  36. def batch_record(body: RocordListModel) -> dict:
  37. # 1、先收集原始数据,BaseModel接收,dict传输,dataframe缓存,DM存储
  38. records = []
  39. for item in body.items:
  40. tasks.predict(g, item=item.dict())
  41. records.append(Record(Record_Time = item.record_Time, Current_Url = item.current_Url, User_Id = item.user_Id, Request_Method = item.request_Method,
  42. Event_Type = item.event_Type, Target_Url = item.target_Url, Button_Text = item.button_Text))
  43. try:
  44. db.session.add_all(records)
  45. db.session.commit()
  46. except Exception as e:
  47. db.session.rollback()
  48. print('--->', e)
  49. # 2、收集用户ID和时间,Tendis存储,每日清零
  50. redis_store = current_app.redis
  51. if not redis_store.get(body.items[0].user_Id):
  52. redis_store.set(body.items[0].user_Id, json.dumps({"start": body.items[0].record_Time.strftime('%Y-%m-%d %H:%M:%S'),
  53. "end": body.items[0].record_Time.strftime('%Y-%m-%d %H:%M:%S')}))
  54. else:
  55. value = json.loads(redis_store.get(body.items[0].user_Id))
  56. value["end"] = body.items[-1].record_Time.strftime('%Y-%m-%d %H:%M:%S')
  57. redis_store.set(body.items[0].user_Id, json.dumps(value))
  58. # 3、Tendis设置当前缓存取占用大小
  59. if not redis_store.get(body.items[0].user_Id + '_size'):
  60. redis_store.set(body.items[0].user_Id + '_size', len(body.items))
  61. else:
  62. size = redis_store.get(body.items[0].user_Id + '_size')
  63. size = int(size) + len(body.items)
  64. redis_store.set(body.items[0].user_Id + '_size', size)
  65. return jsonify({"status": "success"})
  66. @main.route('/record', methods=['GET', 'POST'])
  67. @validate()
  68. def record(body: RecodModel) -> dict:
  69. """接收采集数据,记录临时表
  70. """
  71. # 1、先收集原始数据,BaseModel接收,dict传输,dataframe缓存,DM存储
  72. tasks.predict(g, item=body.dict())
  73. # 2、收集用户ID和时间,Tendis存储,每日清零
  74. redis_store = current_app.redis
  75. if not redis_store.get(body.user_Id):
  76. redis_store.set(body.user_Id, json.dumps({"start": body.record_Time.strftime('%Y-%m-%d %H:%M:%S'),
  77. "end": body.record_Time.strftime('%Y-%m-%d %H:%M:%S')}))
  78. else:
  79. value = json.loads(redis_store.get(body.user_Id))
  80. value["end"] = body.record_Time.strftime('%Y-%m-%d %H:%M:%S')
  81. redis_store.set(body.user_Id, json.dumps(value))
  82. try:
  83. # --- 采集原始表 --- #
  84. record = Record()
  85. record.Record_Time = body.record_Time
  86. record.Current_Url = body.current_Url
  87. record.Full_Url = body.full_Url
  88. record.User_Id = body.user_Id
  89. record.Request_Method = body.request_Method
  90. record.Event_Type = body.event_Type
  91. record.Target_Url = body.target_Url
  92. record.Button_Text = body.button_Text
  93. db.session.add(record)
  94. db.session.commit()
  95. # print(body.dict())
  96. # client.send(body.dict())
  97. # response = client.recv()
  98. # # --- 采集总量 +1 --- #
  99. # try:
  100. # tc = TotalCount.query.filter_by(Name = '采集量').one()
  101. # tc.add_one()
  102. # db.session.add(tc)
  103. # db.session.commit()
  104. # except Exception as e:
  105. # db.session.rollback()
  106. # print('--->', e)
  107. # # --- 解析访问域 --- #
  108. # df = pd.DataFrame([body.dict()])
  109. # result = parser.process(df)
  110. # try:
  111. # if result.get("domain") == "人资域":
  112. # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"RZ_Visit": True})
  113. # elif result.get("domain") == "资产域":
  114. # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"ZC_Visit": True})
  115. # elif result.get("domain") == "财务域":
  116. # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"CW_Visit": True})
  117. # elif result.get("domain") == "营销域":
  118. # PersonLogin.query.filter_by(User_Id = body.user_Id).update({"YX_Visit": True})
  119. # db.session.commit()
  120. # except Exception as e:
  121. # db.session.rollback()
  122. # print('--->', e)
  123. except Exception as e:
  124. db.session.rollback()
  125. print('--->', e)
  126. return jsonify({"status": "failed"})
  127. return jsonify({"status": "success"})
  128. @main.route('/analyse', methods=['GET', 'POST'])
  129. def analyse():
  130. # 处理某个用户 ID 的数据
  131. user_id = request.args.get('user_id')
  132. # 处理时间片内用户记录
  133. for item in g[user_id].getActionByDateRange().itertuples():
  134. row = {
  135. "Record_Time": item.time, "Current_Url": item.cur_url ,# + '?' + item.cur_que,
  136. "Target_Url": item.tar_url ,# + '?' + item.tar_que,
  137. "User_Id": user_id,
  138. "Request_Method": item.req_met, "Form_Data": item.for_dat,
  139. "Button_Text": item.but_txt, "Is_Op": item.is_op,
  140. }
  141. # 获取概率
  142. prob = g[user_id].getHeatMapValue(item.cur_url.split("?")[0] + '-' + item.tar_url.split("?")[0] + '-' + item.acc)
  143. print(prob)
  144. # 后处理,URL
  145. parser = tasks.UrlParser()
  146. result = parser.process(row)
  147. # 后处理成功,返回数据,后处理失败,抛弃数据
  148. if result.get('time'):
  149. json_result = tasks.ResultJson(
  150. time=result['time'], user_id=result['user'], domain=result['domain'],
  151. title1=result['一级标题'], title2=result['二级标题'], title3=result['三级标题'],
  152. text=result['text'], is_op=item.is_op, is_bus=item.is_bus, is_bias=True if item.is_bias or (prob < 1e-3) else False
  153. ).to_json()
  154. print(json_result)
  155. tasks.postback(json_result)
  156. return jsonify({})
  157. @main.route('/heatmap', methods=['GET', 'POST'])
  158. def heatmap():
  159. # 处理某个用户 ID 的数据
  160. user_id = request.args.get('user_id')
  161. # 获取某个用户的行为路径
  162. g[user_id].getActionByDateRange()
  163. # 生成行为路径概率图
  164. g[user_id].getClickCategory2Count()
  165. return jsonify({})
  166. @main.route('/test', methods=['GET'])
  167. def test():
  168. user_id = request.args.get('user_id')
  169. from ..tasks import UserActivateDegreeAnalyze
  170. g[user_id] = UserActivateDegreeAnalyze.UserActivateDegreeAnalyzeSpace(user_id)
  171. return jsonify({"status": "success"})
  172. @main.route('/testpush', methods=['GET'])
  173. def testpush():
  174. user_id = request.args.get('user_id')
  175. g[user_id].read_from_json()
  176. return jsonify({"status": "success"})