#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-22 13:47:11 # @Last Modified time: 2024-01-23 16:46:12 import pandas as pd from pprint import pprint # 用户活跃度分析 # 第一个功能:统计指定时间范围内的访问次数最多的10个用户 # 第二个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户 class UserActivateDegreeAnalyzeSpace(object): """用户访问分析 接收用户创建的分析任务 时间范围:起始时间~结束时间 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装 执行submit,将taskid作为参数传递给submit """ def __init__(self, user: str): self.user = user self.pool_size = 10 self.DataStream = pd.DataFrame(columns=['time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat', 'sta_cod', 'ifr_url', 'but_txt']) def push(self, item: dict) -> None: self.DataStream = self.DataStream.append({ "time": item['Record_Time'], "cur_url": item['Current_Url'], "tar_url": item['Target_Url'], }, ignore_index=True) def getActionByDateRange(self) -> pd.DataFrame: """获取指定日期范围内的用户访问行为数据 """ current = self.DataStream.iloc[0]['cur_url'] target = dict() result = pd.DataFrame() # 拷贝数据,清除数据 CopyDataStream = self.DataStream.copy(deep=True) self.DataStream.drop(self.DataStream.index, inplace=True) # 设置时间格式 CopyDataStream['time'] = pd.to_datetime(CopyDataStream['time']) # 时间排序 CopyDataStream = CopyDataStream.set_index('time') # 遍历数据 for idx, item in enumerate(CopyDataStream.itertuples()): if item.cur_url == current: # 当前地址没变,页面无跳转(需判断AJAX) if item.tar_url not in target: # 目标url不在临时目录 target[item.tar_url] = idx # 添加目标URL到临时目录 else: # 当前地址发生变化,页面跳转 current = item.cur_url # 当前地址修改 if target.get(current): # 查看是否有请求当前URL的历史记录 print(target.get(current)) # pprint({ # "源地址": CopyDataStream.loc[target.get(current)]['cur_url'], # "跳转地": CopyDataStream.loc[target.get(current)]['tar_url'] # }) # 查看历史记录 result = result.append([CopyDataStream.iloc[target.get(current)]]) else: # 为找到历史记录 print("*********非正常跳转*********") # 非正常跳转 target = dict() # 清除历史记录 target[item.tar_url] = idx # 添加当前记录 return result def getSession2Action(self): """获取sessionid到访问行为数据的映射的 """ pass def aggregateBySession(self): """对行为数据按session粒度进行聚合,并将用户信息join后聚合 """ pass def filterSessionAndAggrStat(self): """过滤session数据,并进行计数器值累加 """ pass def getSession2detail(self): """获取通过筛选条件的session的访问明细数据 """ pass def getTopKCategory(self): """获取topK热门分类 第一步:获取符合条件的访问过的所有品类 第二步:计算各品类的点击的次数 第三步:自定义二次排序key 第四步:将数据映射成格式的RDD,然后进行二次排序(降序) 第五步:取出top10热门品类,并写入DM """ pass def getClickCategory2Count(self): """获取各分类点击次数 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀 """ pass def getTopKSession(self): """获取每个分类top10活跃用户 1、将topK热门分类的id,生成DF 2、计算topK分类被各用户访问的次数 3、分组取TopK算法实现,获取每个分类的topK活跃用户 4、获取topK活跃用户的明细数据,并写入DM """ pass