1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-18 13:15:22
- # @Last Modified time: 2024-02-21 13:16:17
- class BusinessClickRealTimeAnalyzeSpace(object):
- def __init__(self):
- self.aggregatedDStream = pd.DataFrame(columns=['word', 'count'])
- def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame:
- """计算业务访问流量实时统计
- 计算每天业务的访问量
- 设计维度:日期时间、用户、业务、访问次数
- 可以看到当天所有的实时数据,
- 通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份
- """
- """
- 我们要对原始数据进行map,映射成<date_user_domain,1>格式
- 然后,对上述格式的数据,执行updateStateByKey算子
- """
- def _filter(data: dict) -> tuple:
- return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1}
- mappedDStream = _filter(sourceDStream)
- self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True)
- self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count")
- # 将计算出来的结果同步一份到DM
- session = DM8Session()
- for row in self.aggregatedDStream.itertuples():
- try:
- result = session.execute("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count))
- except Exception as e:
- print(e)
- return self.aggregatedDStream
- def calculateTopKBusiness(self):
- """计算每天用户的TopK访问业务
- 计算出每天各业务的点击量
- """
- pass
- def calculateBusinessCountByWindow(self):
- """计算最近1小时滑动窗口内的业务访问趋势
- """
- pass
|