UserVisitAnalyze.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-19 13:13:17
  4. # @Last Modified time: 2024-02-21 13:30:33
  5. from threading import Thread
  6. import pandas as pd
  7. class UserVisitAnalyzeSpace(object):
  8. """用户访问分析
  9. 接收用户创建的分析任务
  10. 时间范围:起始时间~结束时间
  11. 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
  12. 执行submit,将taskid作为参数传递给submit
  13. """
  14. def __init__(self, user: str):
  15. self.user = user
  16. self.pool_size = 10
  17. self.start_point = 0
  18. self.end_point = 0
  19. # 一级缓存
  20. self.cache_item = dict()
  21. # 二级缓存数据
  22. self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay'])
  23. def __repr__(self):
  24. return f'<User: {self.user} DF: {self.df_queue}>'
  25. def push(self, item: dict):
  26. # 消息进队列
  27. self.df_queue.loc[self.end_point] = item
  28. self.end_point = (self.end_point + 1) % self.pool_size
  29. def pop(self):
  30. # 消息出队列
  31. item = self.df_queue.loc[self.start_point]
  32. self.start_point = (self.start_point + 1) % self.pool_size
  33. return item
  34. def send(self, item: dict):
  35. # 当前缓存无数据
  36. if not self.cache_item:
  37. self.cache_item = item
  38. else:
  39. # 判断缓存内数据和新数据哪个更新
  40. if item['time'] > self.cache_item['time']:
  41. # 当前数据较新,计算页面持续时间
  42. delay = (item['time'] - self.cache_item['time']).seconds
  43. if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']):
  44. self.cache_item['stay'] += delay
  45. else:
  46. self.push(self.cache_item)
  47. self.cache_item = item
  48. t = Thread(target=func, args=(self.pop(),))
  49. t.start()
  50. else:
  51. # 老数据较新,数据发送产生异常,数据产生积压
  52. pass
  53. # self.df = pd.concat([self.df, pd.DataFrame([item])])
  54. # if self.df.shape[0] >= self.pool_size:
  55. # # 新建临时表
  56. # dt = self.df[0:1]
  57. # index = 0
  58. # for i in range(1, self.pool_size):
  59. # d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i])
  60. # if isinstance(d2, pd.DataFrame):
  61. # dt.iloc[index] = d1
  62. # index += 1
  63. # dt.iloc[index] = d2
  64. # session = DM8Session()
  65. # result = session.execute("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""",
  66. # args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay']))
  67. # else:
  68. # dt.iloc[index] = d1
  69. # self.df = pd.concat([dt, self.df[self.pool_size:]])
  70. def getActionByDateRange(self):
  71. """获取指定日期范围内的用户访问行为数据
  72. """
  73. pass
  74. def getSession2Action(self) -> DataFrame:
  75. """获取sessionid到访问行为数据的映射的
  76. """
  77. pass
  78. def aggregateBySession(self):
  79. """对行为数据按session粒度进行聚合,并将用户信息join后聚合
  80. """
  81. pass
  82. def filterSsessionAndAggrStat(self):
  83. """过滤session数据,并进行计数器值累加
  84. """
  85. pass
  86. def getSession2detail(self):
  87. """获取通过筛选条件的session的访问明细数据
  88. """
  89. pass
  90. def getTopKCategory(self):
  91. """获取topK热门分类
  92. 第一步:获取符合条件的访问过的所有品类
  93. 第二步:计算各品类的点击的次数
  94. 第三步:自定义二次排序key
  95. 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
  96. 第五步:取出top10热门品类,并写入DM
  97. """
  98. pass
  99. def getClickCategory2Count(self) -> DataFrame:
  100. """获取各分类点击次数
  101. 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
  102. """
  103. pass
  104. def getTopKSession(self):
  105. """获取每个分类top10活跃用户
  106. 1、将topK热门分类的id,生成DF
  107. 2、计算topK分类被各用户访问的次数
  108. 3、分组取TopK算法实现,获取每个分类的topK活跃用户
  109. 4、获取topK活跃用户的明细数据,并写入DM
  110. """
  111. pass
  112. # --- 池化 --- #
  113. def pooling(self, line1: dict, line2: dict):
  114. if line1['time'] > line2['time']:
  115. deltat = (line1['time'] - line2['time']).seconds
  116. if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
  117. line2['stay'] += deltat
  118. return line2, None
  119. else:
  120. return line2, line1
  121. else:
  122. deltat = (line2['time'] - line1['time']).seconds
  123. if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
  124. line1['stay'] += deltat
  125. return line1, None
  126. else:
  127. return line1, line2