#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-19 13:13:17 # @Last Modified time: 2024-02-21 13:30:33 from threading import Thread import pandas as pd class UserVisitAnalyzeSpace(object): """用户访问分析 接收用户创建的分析任务 时间范围:起始时间~结束时间 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装 执行submit,将taskid作为参数传递给submit """ def __init__(self, user: str): self.user = user self.pool_size = 10 self.start_point = 0 self.end_point = 0 # 一级缓存 self.cache_item = dict() # 二级缓存数据 self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay']) def __repr__(self): return f'' def push(self, item: dict): # 消息进队列 self.df_queue.loc[self.end_point] = item self.end_point = (self.end_point + 1) % self.pool_size def pop(self): # 消息出队列 item = self.df_queue.loc[self.start_point] self.start_point = (self.start_point + 1) % self.pool_size return item def send(self, item: dict): # 当前缓存无数据 if not self.cache_item: self.cache_item = item else: # 判断缓存内数据和新数据哪个更新 if item['time'] > self.cache_item['time']: # 当前数据较新,计算页面持续时间 delay = (item['time'] - self.cache_item['time']).seconds if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']): self.cache_item['stay'] += delay else: self.push(self.cache_item) self.cache_item = item t = Thread(target=func, args=(self.pop(),)) t.start() else: # 老数据较新,数据发送产生异常,数据产生积压 pass # self.df = pd.concat([self.df, pd.DataFrame([item])]) # if self.df.shape[0] >= self.pool_size: # # 新建临时表 # dt = self.df[0:1] # index = 0 # for i in range(1, self.pool_size): # d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i]) # if isinstance(d2, pd.DataFrame): # dt.iloc[index] = d1 # index += 1 # dt.iloc[index] = d2 # session = DM8Session() # result = session.execute("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""", # args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay'])) # else: # dt.iloc[index] = d1 # self.df = pd.concat([dt, self.df[self.pool_size:]]) def getActionByDateRange(self): """获取指定日期范围内的用户访问行为数据 """ pass def getSession2Action(self) -> DataFrame: """获取sessionid到访问行为数据的映射的 """ pass def aggregateBySession(self): """对行为数据按session粒度进行聚合,并将用户信息join后聚合 """ pass def filterSsessionAndAggrStat(self): """过滤session数据,并进行计数器值累加 """ pass def getSession2detail(self): """获取通过筛选条件的session的访问明细数据 """ pass def getTopKCategory(self): """获取topK热门分类 第一步:获取符合条件的访问过的所有品类 第二步:计算各品类的点击的次数 第三步:自定义二次排序key 第四步:将数据映射成格式的RDD,然后进行二次排序(降序) 第五步:取出top10热门品类,并写入DM """ pass def getClickCategory2Count(self) -> DataFrame: """获取各分类点击次数 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀 """ pass def getTopKSession(self): """获取每个分类top10活跃用户 1、将topK热门分类的id,生成DF 2、计算topK分类被各用户访问的次数 3、分组取TopK算法实现,获取每个分类的topK活跃用户 4、获取topK活跃用户的明细数据,并写入DM """ pass # --- 池化 --- # def pooling(self, line1: dict, line2: dict): if line1['time'] > line2['time']: deltat = (line1['time'] - line2['time']).seconds if (deltat < 2) and (line1['三级标题'] == line2['三级标题']): line2['stay'] += deltat return line2, None else: return line2, line1 else: deltat = (line2['time'] - line1['time']).seconds if (deltat < 2) and (line1['三级标题'] == line2['三级标题']): line1['stay'] += deltat return line1, None else: return line1, line2