UserActivateDegreeAnalyze.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-22 13:47:11
  4. # @Last Modified time: 2024-02-02 13:20:51
  5. import pandas as pd
  6. # 用户活跃度分析
  7. # 第一个功能:统计指定时间范围内的访问次数最多的10个用户
  8. # 第二个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户
  9. class UserActivateDegreeAnalyzeSpace(object):
  10. """用户访问分析
  11. 接收用户创建的分析任务
  12. 时间范围:起始时间~结束时间
  13. 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
  14. 执行submit,将taskid作为参数传递给submit
  15. """
  16. def __init__(self, user: str):
  17. self.user = user
  18. self.pool_size = 10
  19. self.DataStream = pd.DataFrame(columns=['time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat', 'sta_cod', 'ifr_url', 'but_txt', 'is_op', 'is_bus', 'is_bias'])
  20. # 设置变量名称无关化
  21. def push(self, item: dict) -> None:
  22. self.DataStream.loc[len(self.DataStream.index)] = {
  23. "time": item['record_time'], # 请求时间
  24. "cur_url": item['current_url'], # 当前URL
  25. "tar_url": item['target_url'], # 目标URL
  26. "ifr_url": item['iframe_url'], # 内嵌URL
  27. "req_met": item['request_method'], # 请求方法
  28. "for_dat": item['form_data'], # 表单数据
  29. "but_txt": item['button_text'], # 按钮文本
  30. "is_op": False, # 是否为人工操作
  31. "is_bus": False, # 是否为业务项
  32. "is_bias": False, # 是否偏离
  33. }
  34. # 获取指定日期范围内的用户访问行为数据
  35. def getActionByDateRange(self) -> pd.DataFrame:
  36. """获取指定日期范围内的用户访问行为数据
  37. """
  38. # 1、拷贝数据,保存数据并清除数据
  39. CopyDataStream = self.DataStream.copy(deep=True)
  40. # 清除数据
  41. self.DataStream.drop(self.DataStream.index, inplace=True)
  42. # 保存数据到 HDF
  43. # CopyDataStream.to_hdf('DataStream.h5', key=self.user)
  44. CopyDataStream.to_json(f'DataStream-{self.user}.h5', orient='records', lines=True, force_ascii=False)
  45. # 2、URL分解,将URL的地址和参数进行分解
  46. # print(CopyDataStream)
  47. # 当前URL参数分离
  48. CopyDataStream['cur_que'] = CopyDataStream['cur_url'].apply(lambda x: x.split("?")[-1])
  49. CopyDataStream['cur_url'] = CopyDataStream['cur_url'].apply(lambda x: x.split("?")[0])
  50. # 目标URL参数分离
  51. CopyDataStream['tar_que'] = CopyDataStream['tar_url'].apply(lambda x: x.split("?")[-1])
  52. CopyDataStream['tar_url'] = CopyDataStream['tar_url'].apply(lambda x: x.split("?")[0])
  53. # 内嵌URL参数分离
  54. CopyDataStream['ifr_que'] = CopyDataStream['ifr_url'].apply(lambda x: x.split("?")[-1])
  55. CopyDataStream['ifr_url'] = CopyDataStream['ifr_url'].apply(lambda x: x.split("?")[0])
  56. # print(CopyDataStream)
  57. # 3、时序排列,将请求按照时间进行排序
  58. # 设置时间格式
  59. CopyDataStream['time'] = pd.to_datetime(CopyDataStream['time'])
  60. CopyDataStream['idx'] = pd.to_datetime(CopyDataStream['time'])
  61. # 时间排序
  62. # CopyDataStream = CopyDataStream.sort_values(by='time')
  63. # CopyDataStream = CopyDataStream.reset_index()
  64. CopyDataStream = CopyDataStream.set_index('idx')
  65. CopyDataStream.to_csv('debug.csv', index=True, columns=['cur_url', 'tar_url'])
  66. # 4、功能判定
  67. # 设定当前时间
  68. curtime = CopyDataStream.iloc[0]['time']
  69. # 设定当前URL
  70. # current = CopyDataStream.iloc[0]['cur_url']
  71. current = ''
  72. # 设定目标 URL 集
  73. target = dict()
  74. # 结果缓存
  75. result = pd.DataFrame()
  76. # 将第一个请求放入结果
  77. result = result.append([CopyDataStream.iloc[0]])
  78. result.iloc[ len(result.index) -1 ]['is_op'] = True
  79. # 遍历数据
  80. for idx, item in enumerate(CopyDataStream.itertuples()):
  81. # 判断是否营销域
  82. if ('10.150.23.1:8010' in item.cur_url) and ('_INVOKE_FUNC_TITLE_' in item.for_dat):
  83. current = item.cur_url
  84. result = result.append([CopyDataStream.iloc[idx]])
  85. # 判断是否重新上线
  86. elif (item.time - curtime).seconds > 300: # 请求时间距离上次请求相隔大于300秒,判断上次请求以离线
  87. result = result.append([CopyDataStream.iloc[idx -1]]) # 将上次的请求记录下来
  88. result.iloc[ len(result.index) -1 ]['is_op'] = True # 变更记录为人工
  89. result = result.append([CopyDataStream.iloc[idx]]) # 将新请求记录下来(新记录为人工操作)
  90. result.iloc[ len(result.index) -1 ]['is_op'] = True # 变更记录为人工
  91. current = item.cur_url # 当前地址修改
  92. target = dict() # 清除历史记录
  93. target[item.tar_url] = idx # 添加当前记录
  94. elif item.cur_url == current: # 当前地址没变,页面无跳转(需判断AJAX,需判断请求时间)
  95. if item.tar_url not in target: # 目标URL不在临时目录
  96. target[item.tar_url] = idx # 添加目标URL到临时目录
  97. else: # 当前地址发生变化,页面跳转
  98. if target.get(current): # 查看是否有请求当前URL的历史记录,有历史记录则该历史记录就为页面变更记录
  99. result = result.append([CopyDataStream.iloc[target.get(current)]])
  100. result.iloc[ len(result.index) -1 ]['is_op'] = True # 变更记录为人工
  101. elif '4a.gd.csg.local' in item.cur_url: # 未找到历史记录,该页面并非由请求获得,判定是否首次登录
  102. print("*********是首次登录*********") # 首次登录
  103. result = result.append([CopyDataStream.iloc[idx]]) # 记录当前变更项
  104. result.iloc[ len(result.index) -1 ]['is_op'] = True # 变更记录为人工
  105. else: # 非正常跳转(既不是由统一认证平台进入、也不是直接点击跳转进入)
  106. print("*********非正常跳转*********") # 非正常跳转
  107. target = dict() # 清除历史记录
  108. target[item.tar_url] = idx # 添加当前记录
  109. current = item.cur_url # 当前地址修改
  110. curtime = item.time # 修正时间戳
  111. # 结果返回
  112. print(result)
  113. return result
  114. def getSession2Action(self):
  115. """获取sessionid到访问行为数据的映射的
  116. """
  117. pass
  118. def aggregateBySession(self):
  119. """对行为数据按session粒度进行聚合,并将用户信息join后聚合
  120. """
  121. pass
  122. def filterSessionAndAggrStat(self):
  123. """过滤session数据,并进行计数器值累加
  124. """
  125. pass
  126. def getSession2detail(self):
  127. """获取通过筛选条件的session的访问明细数据
  128. """
  129. pass
  130. def getTopKCategory(self):
  131. """获取topK热门分类
  132. 第一步:获取符合条件的访问过的所有品类
  133. 第二步:计算各品类的点击的次数
  134. 第三步:自定义二次排序key
  135. 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
  136. 第五步:取出top10热门品类,并写入DM
  137. """
  138. pass
  139. def getClickCategory2Count(self):
  140. """获取各分类点击次数
  141. 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
  142. """
  143. pass
  144. def getTopKSession(self):
  145. """获取每个分类top10活跃用户
  146. 1、将topK热门分类的id,生成DF
  147. 2、计算topK分类被各用户访问的次数
  148. 3、分组取TopK算法实现,获取每个分类的topK活跃用户
  149. 4、获取topK活跃用户的明细数据,并写入DM
  150. """
  151. pass