BusinessClickRealTimeAnalyze.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-18 13:15:22
  4. # @Last Modified time: 2024-02-21 13:16:17
  5. class BusinessClickRealTimeAnalyzeSpace(object):
  6. def __init__(self):
  7. self.aggregatedDStream = pd.DataFrame(columns=['word', 'count'])
  8. def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame:
  9. """计算业务访问流量实时统计
  10. 计算每天业务的访问量
  11. 设计维度:日期时间、用户、业务、访问次数
  12. 可以看到当天所有的实时数据,
  13. 通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份
  14. """
  15. """
  16. 我们要对原始数据进行map,映射成<date_user_domain,1>格式
  17. 然后,对上述格式的数据,执行updateStateByKey算子
  18. """
  19. def _filter(data: dict) -> tuple:
  20. return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1}
  21. mappedDStream = _filter(sourceDStream)
  22. self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True)
  23. self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count")
  24. # 将计算出来的结果同步一份到DM
  25. session = DM8Session()
  26. for row in self.aggregatedDStream.itertuples():
  27. try:
  28. result = session.execute("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count))
  29. except Exception as e:
  30. print(e)
  31. return self.aggregatedDStream
  32. def calculateTopKBusiness(self):
  33. """计算每天用户的TopK访问业务
  34. 计算出每天各业务的点击量
  35. """
  36. pass
  37. def calculateBusinessCountByWindow(self):
  38. """计算最近1小时滑动窗口内的业务访问趋势
  39. """
  40. pass