#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-18 13:15:22 # @Last Modified time: 2024-02-21 13:16:17 class BusinessClickRealTimeAnalyzeSpace(object): def __init__(self): self.aggregatedDStream = pd.DataFrame(columns=['word', 'count']) def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame: """计算业务访问流量实时统计 计算每天业务的访问量 设计维度:日期时间、用户、业务、访问次数 可以看到当天所有的实时数据, 通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份 """ """ 我们要对原始数据进行map,映射成格式 然后,对上述格式的数据,执行updateStateByKey算子 """ def _filter(data: dict) -> tuple: return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1} mappedDStream = _filter(sourceDStream) self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True) self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count") # 将计算出来的结果同步一份到DM session = DM8Session() for row in self.aggregatedDStream.itertuples(): try: result = session.execute("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count)) except Exception as e: print(e) return self.aggregatedDStream def calculateTopKBusiness(self): """计算每天用户的TopK访问业务 计算出每天各业务的点击量 """ pass def calculateBusinessCountByWindow(self): """计算最近1小时滑动窗口内的业务访问趋势 """ pass