123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-19 13:13:17
- # @Last Modified time: 2024-02-21 13:30:33
- from threading import Thread
- import pandas as pd
- class UserVisitAnalyzeSpace(object):
- """用户访问分析
- 接收用户创建的分析任务
- 时间范围:起始时间~结束时间
- 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
- 执行submit,将taskid作为参数传递给submit
- """
- def __init__(self, user: str):
- self.user = user
- self.pool_size = 10
- self.start_point = 0
- self.end_point = 0
- # 一级缓存
- self.cache_item = dict()
- # 二级缓存数据
- self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay'])
- def __repr__(self):
- return f'<User: {self.user} DF: {self.df_queue}>'
- def push(self, item: dict):
- # 消息进队列
- self.df_queue.loc[self.end_point] = item
- self.end_point = (self.end_point + 1) % self.pool_size
- def pop(self):
- # 消息出队列
- item = self.df_queue.loc[self.start_point]
- self.start_point = (self.start_point + 1) % self.pool_size
- return item
- def send(self, item: dict):
- # 当前缓存无数据
- if not self.cache_item:
- self.cache_item = item
- else:
- # 判断缓存内数据和新数据哪个更新
- if item['time'] > self.cache_item['time']:
- # 当前数据较新,计算页面持续时间
- delay = (item['time'] - self.cache_item['time']).seconds
- if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']):
- self.cache_item['stay'] += delay
- else:
- self.push(self.cache_item)
- self.cache_item = item
- t = Thread(target=func, args=(self.pop(),))
- t.start()
- else:
- # 老数据较新,数据发送产生异常,数据产生积压
- pass
- # self.df = pd.concat([self.df, pd.DataFrame([item])])
- # if self.df.shape[0] >= self.pool_size:
- # # 新建临时表
- # dt = self.df[0:1]
- # index = 0
- # for i in range(1, self.pool_size):
- # d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i])
- # if isinstance(d2, pd.DataFrame):
- # dt.iloc[index] = d1
- # index += 1
- # dt.iloc[index] = d2
- # session = DM8Session()
- # result = session.execute("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""",
- # args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay']))
- # else:
- # dt.iloc[index] = d1
- # self.df = pd.concat([dt, self.df[self.pool_size:]])
- def getActionByDateRange(self):
- """获取指定日期范围内的用户访问行为数据
- """
- pass
- def getSession2Action(self) -> DataFrame:
- """获取sessionid到访问行为数据的映射的
- """
- pass
- def aggregateBySession(self):
- """对行为数据按session粒度进行聚合,并将用户信息join后聚合
- """
- pass
- def filterSsessionAndAggrStat(self):
- """过滤session数据,并进行计数器值累加
- """
- pass
- def getSession2detail(self):
- """获取通过筛选条件的session的访问明细数据
- """
- pass
- def getTopKCategory(self):
- """获取topK热门分类
- 第一步:获取符合条件的访问过的所有品类
- 第二步:计算各品类的点击的次数
- 第三步:自定义二次排序key
- 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
- 第五步:取出top10热门品类,并写入DM
- """
- pass
- def getClickCategory2Count(self) -> DataFrame:
- """获取各分类点击次数
- 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
- """
- pass
- def getTopKSession(self):
- """获取每个分类top10活跃用户
- 1、将topK热门分类的id,生成DF
- 2、计算topK分类被各用户访问的次数
- 3、分组取TopK算法实现,获取每个分类的topK活跃用户
- 4、获取topK活跃用户的明细数据,并写入DM
- """
- pass
- # --- 池化 --- #
- def pooling(self, line1: dict, line2: dict):
- if line1['time'] > line2['time']:
- deltat = (line1['time'] - line2['time']).seconds
- if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
- line2['stay'] += deltat
- return line2, None
- else:
- return line2, line1
- else:
- deltat = (line2['time'] - line1['time']).seconds
- if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
- line1['stay'] += deltat
- return line1, None
- else:
- return line1, line2
|