UserActivateDegreeAnalyze.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-22 13:47:11
  4. # @Last Modified time: 2024-02-23 10:28:27
  5. import os, json
  6. import pandas as pd
  7. # 用户活跃度分析
  8. # 第一个功能:统计指定时间范围内的访问次数最多的10个用户
  9. # 第二个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户
  10. class UserActivateDegreeAnalyzeSpace(object):
  11. """用户访问分析
  12. 接收用户创建的分析任务
  13. 时间范围:起始时间~结束时间
  14. 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
  15. 执行submit,将taskid作为参数传递给submit
  16. """
  17. def __init__(self, user: str):
  18. self.user = user
  19. self.pool_size = 10
  20. self.DataStream = pd.DataFrame(columns=[
  21. 'time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat',
  22. 'sta_cod', 'ifr_url', 'but_txt', 'is_op', 'is_bus', 'is_bias']
  23. )
  24. self.loadHeatMapByJson()
  25. # 设置变量名称无关化
  26. def push(self, item: dict) -> None:
  27. self.DataStream.loc[len(self.DataStream.index)] = {
  28. "time": item['record_time'], # 请求时间
  29. "cur_url": item['current_url'], # 当前URL
  30. "tar_url": item['target_url'], # 目标URL
  31. "ifr_url": item['iframe_url'], # 内嵌URL
  32. "req_met": item['request_method'], # 请求方法
  33. "for_dat": item['form_data'], # 表单数据
  34. "but_txt": item['button_text'], # 按钮文本
  35. "is_op": False, # 是否为人工操作
  36. "is_bus": False, # 是否为业务项
  37. "is_bias": False, # 是否偏离
  38. }
  39. # 从文件读取数据(测试用)
  40. def read_from_json(self) -> None:
  41. self.DataStream = pd.read_json(f'/root/mzinfo/V2/{self.user}_data.json', orient='records', lines=True)
  42. self.DataStream['time'] = self.DataStream['record_Time']
  43. self.DataStream['cur_url'] = self.DataStream['current_Url']
  44. self.DataStream['tar_url'] = self.DataStream['target_Url']
  45. self.DataStream['ifr_url'] = self.DataStream['iframe_Url']
  46. self.DataStream['req_met'] = self.DataStream['request_Method']
  47. self.DataStream['for_dat'] = self.DataStream['form_Data']
  48. self.DataStream['but_txt'] = self.DataStream['button_Text']
  49. self.DataStream['is_op'] = False
  50. self.DataStream['is_bus'] = False
  51. self.DataStream['is_bias'] = False
  52. # 获取指定日期范围内的用户访问行为数据
  53. def getActionByDateRange(self) -> pd.DataFrame:
  54. """获取指定日期范围内的用户访问行为数据
  55. """
  56. # 1、拷贝数据,保存数据并清除数据
  57. CopyDataStream = self.DataStream.copy(deep=True)
  58. # 清除数据
  59. self.DataStream.drop(self.DataStream.index, inplace=True)
  60. # 保存数据到 HDF
  61. # CopyDataStream.to_hdf('DataStream.h5', key=self.user)
  62. CopyDataStream.to_json(f'DataStream-{self.user}.h5', orient='records', lines=True, force_ascii=False)
  63. # 2、URL分解,将URL的地址和参数进行分解
  64. # print(CopyDataStream)
  65. # 当前URL参数分离
  66. # CopyDataStream['cur_que'] = CopyDataStream['cur_url'].apply(lambda x: x.split("?")[-1])
  67. # CopyDataStream['cur_url'] = CopyDataStream['cur_url'].apply(lambda x: x.split("?")[0])
  68. # # 目标URL参数分离
  69. # CopyDataStream['tar_que'] = CopyDataStream['tar_url'].apply(lambda x: x.split("?")[-1])
  70. # CopyDataStream['tar_url'] = CopyDataStream['tar_url'].apply(lambda x: x.split("?")[0])
  71. # # 内嵌URL参数分离
  72. # CopyDataStream['ifr_que'] = CopyDataStream['ifr_url'].apply(lambda x: x.split("?")[-1])
  73. # CopyDataStream['ifr_url'] = CopyDataStream['ifr_url'].apply(lambda x: x.split("?")[0])
  74. # print(CopyDataStream)
  75. # 3、时序排列,将请求按照时间进行排序
  76. # 设置时间格式
  77. CopyDataStream['time'] = pd.to_datetime(CopyDataStream['time'])
  78. CopyDataStream['idx'] = pd.to_datetime(CopyDataStream['time'])
  79. # 时间排序
  80. CopyDataStream = CopyDataStream.set_index('idx')
  81. CopyDataStream.to_csv('debug.csv', index=True, columns=['cur_url', 'tar_url'])
  82. # 4、功能判定
  83. # 设定当前时间
  84. curtime = CopyDataStream.iloc[0]['time']
  85. # 设定当前URL
  86. # current = CopyDataStream.iloc[0]['cur_url']
  87. current = ''
  88. # 设定目标 URL 集
  89. target = dict()
  90. # 设定按键文本
  91. curbut = ''
  92. # 结果缓存
  93. result = pd.DataFrame(columns=['time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat', 'sta_cod', 'ifr_url', 'but_txt', 'is_op', 'is_bus', 'is_bias', 'cur_que', 'tar_que', 'ifr_que', 'acc'])
  94. # 遍历数据
  95. for idx, item in enumerate(CopyDataStream.itertuples()):
  96. # 当前地址发生变化,页面跳转(真实可靠)
  97. if item.cur_url != current: # 当前地址发生变化,页面跳转
  98. if target.get(current): # 查看是否有请求当前URL的历史记录,有历史记录则该历史记录就为页面变更记录
  99. result.loc[len(result.index)] = CopyDataStream.iloc[target.get(current)]
  100. result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
  101. elif '4a.gd.csg.local' in item.cur_url: # 未找到历史记录,该页面并非由请求获得,判定是否首次登录
  102. # print("*********是首次登录*********") # 首次登录
  103. result.loc[len(result.index)] = CopyDataStream.iloc[idx]# 记录当前变更项
  104. result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
  105. # print(result.loc[ len(result.index) -1 ])
  106. else: # 非正常跳转(既不是由统一认证平台进入、也不是直接点击跳转进入)
  107. # print("*********非正常跳转*********") # 非正常跳转
  108. result.loc[len(result.index)] = CopyDataStream.iloc[idx]# 记录当前变更项
  109. # print(result.loc[ len(result.index) -1 ])
  110. result.at[len(result.index) - 1, 'acc'] = 'url'
  111. curbut = item.but_txt # 当前文本修改
  112. target = dict() # 清除历史记录
  113. target[item.tar_url] = idx # 添加当前记录
  114. current = item.cur_url # 当前地址修改
  115. # 点击文本发生变动,操作发生变化(无法判断第一条数据的真实可靠性)
  116. elif item.but_txt != curbut:
  117. curbut = item.but_txt # 当前文本修改
  118. result.loc[len(result.index)] = CopyDataStream.iloc[idx] # 记录当前变更项
  119. result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
  120. result.at[len(result.index) - 1, 'acc'] = 'btx'
  121. # 判断是否重新上线(无法保证数据的可靠性)
  122. elif (item.time - curtime).seconds > 600: # 请求时间距离上次请求相隔大于600秒,判断上次请求以离线
  123. result.loc[len(result.index)] = CopyDataStream.iloc[idx -1] # 将上次的请求记录下来
  124. result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
  125. result.at[len(result.index) - 1, 'is_bias'] = True # 用户超时行为偏离
  126. result.at[len(result.index) - 1, 'acc'] = 'tim'
  127. # print(result.loc[ len(result.index) -1 ])
  128. result.loc[len(result.index)] = CopyDataStream.iloc[idx] # 将新请求记录下来(新记录为人工操作)
  129. result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
  130. result.at[len(result.index) - 1, 'acc'] = 'tim'
  131. # print(result.loc[ len(result.index) -1 ])
  132. curbut = item.but_txt # 当前文本修改
  133. current = item.cur_url # 当前地址修改
  134. target = dict() # 清除历史记录
  135. target[item.tar_url] = idx # 添加当前记录
  136. # 判断是否营销域(真实可靠)
  137. elif '_INVOKE_FUNC_TITLE_' in item.for_dat:
  138. current = item.cur_url
  139. curbut = item.but_txt # 当前文本修改
  140. result.loc[len(result.index)] = CopyDataStream.iloc[idx] # 记录当前变更项
  141. result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
  142. result.at[len(result.index) - 1, 'is_bus'] = True # 变更记录为人工
  143. result.at[len(result.index) - 1, 'acc'] = 'form'
  144. # 判断是否appCode
  145. # elif 'appCode' in item.for_dat and item.but_txt != curbut:
  146. # current = item.cur_url
  147. # result.loc[len(result.index)] = CopyDataStream.iloc[idx]
  148. # result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
  149. # result.at[len(result.index) - 1, 'is_bus'] = True # 变更记录为人工
  150. # 当前地址没变,页面无跳转(需判断AJAX,需判断请求时间)
  151. elif item.cur_url == current: # 当前地址没变,页面无跳转(需判断AJAX,需判断请求时间)
  152. if item.tar_url not in target: # 目标URL不在临时目录
  153. target[item.tar_url] = idx # 添加目标URL到临时目录
  154. curtime = item.time # 修正时间戳
  155. # # 保存文件
  156. # result.to_csv(f'{self.user}.csv', sep=',')
  157. # 结果暂存
  158. self.result = result
  159. # 结果返回
  160. return result
  161. # 生成行为概率图
  162. def getClickCategory2Count(self) -> None:
  163. """获取各分类点击次数
  164. 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
  165. 2、对点击行为进行计数
  166. 3、保存计数器
  167. """
  168. for row in self.result.itertuples():
  169. # 1、过滤出点击行为数据
  170. key = row.cur_url.split("?")[0] + '-' + row.tar_url.split("?")[0] + '-' + row.acc
  171. # 2、对点击行为进行计数
  172. if self.heatmap.get(key):
  173. self.heatmap[key] += 1
  174. self.heatmap['total'] += 1
  175. else:
  176. self.heatmap[key] = 1
  177. self.heatmap['total'] += 1
  178. # 3、保存计数器
  179. self.saveHeatMap2Json()
  180. # 加载或初始化行为概率图
  181. def loadHeatMapByJson(self) -> None:
  182. """加载点击行为计数器
  183. 如果有概率图,就加载概率图,如果没有,就初始化概率图
  184. """
  185. if os.path.exists('heatmap.json'):
  186. with open('heatmap.json', 'r', encoding='utf-8') as fp:
  187. self.heatmap = json.load(fp)
  188. else:
  189. self.heatmap = dict()
  190. # 设置非零数保证除法正常
  191. self.heatmap['total'] = 1
  192. # 保存行为概率图
  193. def saveHeatMap2Json(self) -> None:
  194. """保存点击行为计数器
  195. """
  196. with open('heatmap.json', 'w', encoding='utf-8') as fp:
  197. json.dump(self.heatmap, fp)
  198. # 获取某个行为的概率(无权重计算)
  199. def getHeatMapValue(self, key: str) -> float:
  200. """获取某个行为的概率
  201. """
  202. value = self.heatmap.get(key)
  203. total = self.heatmap['total']
  204. if value:
  205. return value / total
  206. else:
  207. return 0.
  208. def getTopKSession(self):
  209. """获取每个分类top10活跃用户
  210. 1、将topK热门分类的id,生成DF
  211. 2、计算topK分类被各用户访问的次数
  212. 3、分组取TopK算法实现,获取每个分类的topK活跃用户
  213. 4、获取topK活跃用户的明细数据,并写入DM
  214. """
  215. pass
  216. def getSession2Action(self):
  217. """获取sessionid到访问行为数据的映射的
  218. """
  219. pass
  220. def aggregateBySession(self):
  221. """对行为数据按session粒度进行聚合,并将用户信息join后聚合
  222. """
  223. pass
  224. def filterSessionAndAggrStat(self):
  225. """过滤session数据,并进行计数器值累加
  226. """
  227. pass
  228. def getSession2detail(self):
  229. """获取通过筛选条件的session的访问明细数据
  230. """
  231. pass
  232. def getTopKCategory(self):
  233. """获取topK热门分类
  234. 第一步:获取符合条件的访问过的所有品类
  235. 第二步:计算各品类的点击的次数
  236. 第三步:自定义二次排序key
  237. 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
  238. 第五步:取出top10热门品类,并写入DM
  239. """
  240. pass