123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-22 13:47:11
- # @Last Modified time: 2024-02-23 10:28:27
- import os, json
- import pandas as pd
- # 用户活跃度分析
- # 第一个功能:统计指定时间范围内的访问次数最多的10个用户
- # 第二个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户
- class UserActivateDegreeAnalyzeSpace(object):
- """用户访问分析
- 接收用户创建的分析任务
- 时间范围:起始时间~结束时间
- 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
- 执行submit,将taskid作为参数传递给submit
- """
- def __init__(self, user: str):
- self.user = user
- self.pool_size = 10
- self.DataStream = pd.DataFrame(columns=[
- 'time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat',
- 'sta_cod', 'ifr_url', 'but_txt', 'is_op', 'is_bus', 'is_bias']
- )
- self.loadHeatMapByJson()
- # 设置变量名称无关化
- def push(self, item: dict) -> None:
- self.DataStream.loc[len(self.DataStream.index)] = {
- "time": item['record_time'], # 请求时间
- "cur_url": item['current_url'], # 当前URL
- "tar_url": item['target_url'], # 目标URL
- "ifr_url": item['iframe_url'], # 内嵌URL
- "req_met": item['request_method'], # 请求方法
- "for_dat": item['form_data'], # 表单数据
- "but_txt": item['button_text'], # 按钮文本
- "is_op": False, # 是否为人工操作
- "is_bus": False, # 是否为业务项
- "is_bias": False, # 是否偏离
- }
- # 从文件读取数据(测试用)
- def read_from_json(self) -> None:
- self.DataStream = pd.read_json(f'/root/mzinfo/V2/{self.user}_data.json', orient='records', lines=True)
- self.DataStream['time'] = self.DataStream['record_Time']
- self.DataStream['cur_url'] = self.DataStream['current_Url']
- self.DataStream['tar_url'] = self.DataStream['target_Url']
- self.DataStream['ifr_url'] = self.DataStream['iframe_Url']
- self.DataStream['req_met'] = self.DataStream['request_Method']
- self.DataStream['for_dat'] = self.DataStream['form_Data']
- self.DataStream['but_txt'] = self.DataStream['button_Text']
- self.DataStream['is_op'] = False
- self.DataStream['is_bus'] = False
- self.DataStream['is_bias'] = False
- # 获取指定日期范围内的用户访问行为数据
- def getActionByDateRange(self) -> pd.DataFrame:
- """获取指定日期范围内的用户访问行为数据
- """
- # 1、拷贝数据,保存数据并清除数据
- CopyDataStream = self.DataStream.copy(deep=True)
- # 清除数据
- self.DataStream.drop(self.DataStream.index, inplace=True)
- # 保存数据到 HDF
- # CopyDataStream.to_hdf('DataStream.h5', key=self.user)
- CopyDataStream.to_json(f'DataStream-{self.user}.h5', orient='records', lines=True, force_ascii=False)
- # 2、URL分解,将URL的地址和参数进行分解
- # print(CopyDataStream)
- # 当前URL参数分离
- # CopyDataStream['cur_que'] = CopyDataStream['cur_url'].apply(lambda x: x.split("?")[-1])
- # CopyDataStream['cur_url'] = CopyDataStream['cur_url'].apply(lambda x: x.split("?")[0])
- # # 目标URL参数分离
- # CopyDataStream['tar_que'] = CopyDataStream['tar_url'].apply(lambda x: x.split("?")[-1])
- # CopyDataStream['tar_url'] = CopyDataStream['tar_url'].apply(lambda x: x.split("?")[0])
- # # 内嵌URL参数分离
- # CopyDataStream['ifr_que'] = CopyDataStream['ifr_url'].apply(lambda x: x.split("?")[-1])
- # CopyDataStream['ifr_url'] = CopyDataStream['ifr_url'].apply(lambda x: x.split("?")[0])
- # print(CopyDataStream)
- # 3、时序排列,将请求按照时间进行排序
- # 设置时间格式
- CopyDataStream['time'] = pd.to_datetime(CopyDataStream['time'])
- CopyDataStream['idx'] = pd.to_datetime(CopyDataStream['time'])
- # 时间排序
- CopyDataStream = CopyDataStream.set_index('idx')
- CopyDataStream.to_csv('debug.csv', index=True, columns=['cur_url', 'tar_url'])
- # 4、功能判定
- # 设定当前时间
- curtime = CopyDataStream.iloc[0]['time']
- # 设定当前URL
- # current = CopyDataStream.iloc[0]['cur_url']
- current = ''
- # 设定目标 URL 集
- target = dict()
- # 设定按键文本
- curbut = ''
- # 结果缓存
- result = pd.DataFrame(columns=['time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat', 'sta_cod', 'ifr_url', 'but_txt', 'is_op', 'is_bus', 'is_bias', 'cur_que', 'tar_que', 'ifr_que', 'acc'])
- # 遍历数据
- for idx, item in enumerate(CopyDataStream.itertuples()):
- # 当前地址发生变化,页面跳转(真实可靠)
- if item.cur_url != current: # 当前地址发生变化,页面跳转
- if target.get(current): # 查看是否有请求当前URL的历史记录,有历史记录则该历史记录就为页面变更记录
- result.loc[len(result.index)] = CopyDataStream.iloc[target.get(current)]
- result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
- elif '4a.gd.csg.local' in item.cur_url: # 未找到历史记录,该页面并非由请求获得,判定是否首次登录
- # print("*********是首次登录*********") # 首次登录
- result.loc[len(result.index)] = CopyDataStream.iloc[idx]# 记录当前变更项
- result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
- # print(result.loc[ len(result.index) -1 ])
- else: # 非正常跳转(既不是由统一认证平台进入、也不是直接点击跳转进入)
- # print("*********非正常跳转*********") # 非正常跳转
- result.loc[len(result.index)] = CopyDataStream.iloc[idx]# 记录当前变更项
- # print(result.loc[ len(result.index) -1 ])
- result.at[len(result.index) - 1, 'acc'] = 'url'
- curbut = item.but_txt # 当前文本修改
- target = dict() # 清除历史记录
- target[item.tar_url] = idx # 添加当前记录
- current = item.cur_url # 当前地址修改
- # 点击文本发生变动,操作发生变化(无法判断第一条数据的真实可靠性)
- elif item.but_txt != curbut:
- curbut = item.but_txt # 当前文本修改
- result.loc[len(result.index)] = CopyDataStream.iloc[idx] # 记录当前变更项
- result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
- result.at[len(result.index) - 1, 'acc'] = 'btx'
- # 判断是否重新上线(无法保证数据的可靠性)
- elif (item.time - curtime).seconds > 600: # 请求时间距离上次请求相隔大于600秒,判断上次请求以离线
- result.loc[len(result.index)] = CopyDataStream.iloc[idx -1] # 将上次的请求记录下来
- result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
- result.at[len(result.index) - 1, 'is_bias'] = True # 用户超时行为偏离
- result.at[len(result.index) - 1, 'acc'] = 'tim'
- # print(result.loc[ len(result.index) -1 ])
- result.loc[len(result.index)] = CopyDataStream.iloc[idx] # 将新请求记录下来(新记录为人工操作)
- result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
- result.at[len(result.index) - 1, 'acc'] = 'tim'
- # print(result.loc[ len(result.index) -1 ])
- curbut = item.but_txt # 当前文本修改
- current = item.cur_url # 当前地址修改
- target = dict() # 清除历史记录
- target[item.tar_url] = idx # 添加当前记录
- # 判断是否营销域(真实可靠)
- elif '_INVOKE_FUNC_TITLE_' in item.for_dat:
- current = item.cur_url
- curbut = item.but_txt # 当前文本修改
- result.loc[len(result.index)] = CopyDataStream.iloc[idx] # 记录当前变更项
- result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
- result.at[len(result.index) - 1, 'is_bus'] = True # 变更记录为人工
- result.at[len(result.index) - 1, 'acc'] = 'form'
- # 判断是否appCode
- # elif 'appCode' in item.for_dat and item.but_txt != curbut:
- # current = item.cur_url
- # result.loc[len(result.index)] = CopyDataStream.iloc[idx]
- # result.at[len(result.index) - 1, 'is_op'] = True # 变更记录为人工
- # result.at[len(result.index) - 1, 'is_bus'] = True # 变更记录为人工
- # 当前地址没变,页面无跳转(需判断AJAX,需判断请求时间)
- elif item.cur_url == current: # 当前地址没变,页面无跳转(需判断AJAX,需判断请求时间)
- if item.tar_url not in target: # 目标URL不在临时目录
- target[item.tar_url] = idx # 添加目标URL到临时目录
- curtime = item.time # 修正时间戳
- # # 保存文件
- # result.to_csv(f'{self.user}.csv', sep=',')
- # 结果暂存
- self.result = result
- # 结果返回
- return result
- # 生成行为概率图
- def getClickCategory2Count(self) -> None:
- """获取各分类点击次数
- 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
- 2、对点击行为进行计数
- 3、保存计数器
- """
- for row in self.result.itertuples():
- # 1、过滤出点击行为数据
- key = row.cur_url.split("?")[0] + '-' + row.tar_url.split("?")[0] + '-' + row.acc
- # 2、对点击行为进行计数
- if self.heatmap.get(key):
- self.heatmap[key] += 1
- self.heatmap['total'] += 1
- else:
- self.heatmap[key] = 1
- self.heatmap['total'] += 1
- # 3、保存计数器
- self.saveHeatMap2Json()
- # 加载或初始化行为概率图
- def loadHeatMapByJson(self) -> None:
- """加载点击行为计数器
- 如果有概率图,就加载概率图,如果没有,就初始化概率图
- """
- if os.path.exists('heatmap.json'):
- with open('heatmap.json', 'r', encoding='utf-8') as fp:
- self.heatmap = json.load(fp)
- else:
- self.heatmap = dict()
- # 设置非零数保证除法正常
- self.heatmap['total'] = 1
- # 保存行为概率图
- def saveHeatMap2Json(self) -> None:
- """保存点击行为计数器
- """
- with open('heatmap.json', 'w', encoding='utf-8') as fp:
- json.dump(self.heatmap, fp)
- # 获取某个行为的概率(无权重计算)
- def getHeatMapValue(self, key: str) -> float:
- """获取某个行为的概率
- """
- value = self.heatmap.get(key)
- total = self.heatmap['total']
- if value:
- return value / total
- else:
- return 0.
- def getTopKSession(self):
- """获取每个分类top10活跃用户
- 1、将topK热门分类的id,生成DF
- 2、计算topK分类被各用户访问的次数
- 3、分组取TopK算法实现,获取每个分类的topK活跃用户
- 4、获取topK活跃用户的明细数据,并写入DM
- """
- pass
- def getSession2Action(self):
- """获取sessionid到访问行为数据的映射的
- """
- pass
- def aggregateBySession(self):
- """对行为数据按session粒度进行聚合,并将用户信息join后聚合
- """
- pass
- def filterSessionAndAggrStat(self):
- """过滤session数据,并进行计数器值累加
- """
- pass
- def getSession2detail(self):
- """获取通过筛选条件的session的访问明细数据
- """
- pass
- def getTopKCategory(self):
- """获取topK热门分类
- 第一步:获取符合条件的访问过的所有品类
- 第二步:计算各品类的点击的次数
- 第三步:自定义二次排序key
- 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
- 第五步:取出top10热门品类,并写入DM
- """
- pass
|