UserActivateDegreeAnalyze.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-22 13:47:11
  4. # @Last Modified time: 2024-01-23 16:46:12
  5. import pandas as pd
  6. from pprint import pprint
  7. # 用户活跃度分析
  8. # 第一个功能:统计指定时间范围内的访问次数最多的10个用户
  9. # 第二个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户
  10. class UserActivateDegreeAnalyzeSpace(object):
  11. """用户访问分析
  12. 接收用户创建的分析任务
  13. 时间范围:起始时间~结束时间
  14. 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
  15. 执行submit,将taskid作为参数传递给submit
  16. """
  17. def __init__(self, user: str):
  18. self.user = user
  19. self.pool_size = 10
  20. self.DataStream = pd.DataFrame(columns=['time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat', 'sta_cod', 'ifr_url', 'but_txt'])
  21. def push(self, item: dict) -> None:
  22. self.DataStream = self.DataStream.append({
  23. "time": item['Record_Time'],
  24. "cur_url": item['Current_Url'],
  25. "tar_url": item['Target_Url'],
  26. },
  27. ignore_index=True)
  28. def getActionByDateRange(self) -> pd.DataFrame:
  29. """获取指定日期范围内的用户访问行为数据
  30. """
  31. current = self.DataStream.iloc[0]['cur_url']
  32. target = dict()
  33. result = pd.DataFrame()
  34. # 拷贝数据,清除数据
  35. CopyDataStream = self.DataStream.copy(deep=True)
  36. self.DataStream.drop(self.DataStream.index, inplace=True)
  37. # 设置时间格式
  38. CopyDataStream['time'] = pd.to_datetime(CopyDataStream['time'])
  39. # 时间排序
  40. CopyDataStream = CopyDataStream.set_index('time')
  41. # 遍历数据
  42. for idx, item in enumerate(CopyDataStream.itertuples()):
  43. if item.cur_url == current: # 当前地址没变,页面无跳转(需判断AJAX)
  44. if item.tar_url not in target: # 目标url不在临时目录
  45. target[item.tar_url] = idx # 添加目标URL到临时目录
  46. else: # 当前地址发生变化,页面跳转
  47. current = item.cur_url # 当前地址修改
  48. if target.get(current): # 查看是否有请求当前URL的历史记录
  49. print(target.get(current))
  50. # pprint({
  51. # "源地址": CopyDataStream.loc[target.get(current)]['cur_url'],
  52. # "跳转地": CopyDataStream.loc[target.get(current)]['tar_url']
  53. # }) # 查看历史记录
  54. result = result.append([CopyDataStream.iloc[target.get(current)]])
  55. else: # 为找到历史记录
  56. print("*********非正常跳转*********") # 非正常跳转
  57. target = dict() # 清除历史记录
  58. target[item.tar_url] = idx # 添加当前记录
  59. return result
  60. def getSession2Action(self):
  61. """获取sessionid到访问行为数据的映射的
  62. """
  63. pass
  64. def aggregateBySession(self):
  65. """对行为数据按session粒度进行聚合,并将用户信息join后聚合
  66. """
  67. pass
  68. def filterSessionAndAggrStat(self):
  69. """过滤session数据,并进行计数器值累加
  70. """
  71. pass
  72. def getSession2detail(self):
  73. """获取通过筛选条件的session的访问明细数据
  74. """
  75. pass
  76. def getTopKCategory(self):
  77. """获取topK热门分类
  78. 第一步:获取符合条件的访问过的所有品类
  79. 第二步:计算各品类的点击的次数
  80. 第三步:自定义二次排序key
  81. 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
  82. 第五步:取出top10热门品类,并写入DM
  83. """
  84. pass
  85. def getClickCategory2Count(self):
  86. """获取各分类点击次数
  87. 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
  88. """
  89. pass
  90. def getTopKSession(self):
  91. """获取每个分类top10活跃用户
  92. 1、将topK热门分类的id,生成DF
  93. 2、计算topK分类被各用户访问的次数
  94. 3、分组取TopK算法实现,获取每个分类的topK活跃用户
  95. 4、获取topK活跃用户的明细数据,并写入DM
  96. """
  97. pass