#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-18 09:47:28 # @Last Modified time: 2024-02-02 13:37:27 import re, json import datetime import dmPython import numpy as np import pandas as pd import requests from pandas import DataFrame from threading import Thread from .preprocess import parse_url from .UserActivateDegreeAnalyze import UserActivateDegreeAnalyzeSpace class ResultJson: def __init__(self, time, user_id, domain, title1, title2, title3, device_id=None, text=None, is_op=True, is_bus=False, is_bias=False, status="正常"): # 执行时间 self.time = time.strftime('%Y-%m-%d %H:%M:%S') # 执行用户 self.user_id = user_id # 执行域 self.domain = domain self.title1 = title1 self.title2 = title2 # 业务项 self.title3 = title3 # 设备 self.device_id = device_id # 操作步骤 self.text = text # 是否用户操作 self.is_op = is_op # 是否业务 self.is_bus = is_bus # 是否偏离 self.is_bias = is_bias # 执行结果 self.status = status def to_json(self): return { "real": { # 实时信息 "time": self.time, # 记录时间 "userId": self.user_id, # 用户ID "deviceId": self.device_id, # 设备ID "domain": self.domain, # 访问域 "business": self.title3, # 业务类型 "option": self.text, # 执行操作 "isOp": self.is_op, # 是否是一条操作 "isBus": self.is_bus, # 是否是一项业务 "isBias": self.is_bias, # 是否偏离 "status": self.status # 执行结果 }, "base": { # 定时信息 "用户ID": self.user_id, "常用时段": "", "所在部门": "", "使用设备": "", "操作系统": "", "常用业务": "" } } def func(item): print(item) session = DM8Session() try: result = session.execute("""INSERT INTO FRAGMENT("Time", "User_Id", "Method", "Has_Form", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?,?,?,?)""", (item['time'], item['user'], item['method'], item['hasform'], item['domain'], item['一级标题'], item['二级标题'], item['三级标题'], item['四级标题'], item['text'], item['stay'])) except Exception as e: print(e) def postback(json_data: dict) -> None: url = 'http://192.168.1.166:8080/statistics/algorithm' data = ResultJson(time=datetime.datetime.now(), user_id="fuli@mz.gd.csg.cn", domain="财务域", title1="计划预算管理", title2="预算查询分析", title3="编制过程查询", text="69", is_op=True, is_bus=True) print(data.to_json()) response = requests.post(url, json={'parameter': data.to_json()}) if response.status_code == 200: print('success') else: print(response.text()) class DM8Session(object): def __init__(self): self.conn = dmPython.connect(user='SYSDBA', password='SYSDBA001', server='192.168.1.202', port=30236) def query(self, sql_cmd: str, args=None): '''以数据框形式返回查询据结果''' try: self.cursor = self.conn.cursor() if args: self.cursor.execute(sql_cmd, args) else: self.cursor.execute(sql_cmd) data = self.cursor.fetchall() # 以元组形式返回查询数据 header = [t[0] for t in self.cursor.description] df = pd.DataFrame(list(data), columns=header) # pd.DataFrame 对列表具有更好的兼容性 except Exception as e: print(e) finally: self.cursor.close() return df def execute(self, sql_cmd: str, args): try: self.cursor = self.conn.cursor() # 执行sql语句 self.cursor.execute(sql_cmd, args) self.conn.commit() status = True except Exception as e: # 发生错误时回滚 self.conn.rollback() print(e) status = False finally: self.cursor.close() return status def __del__(self): self.conn.close() class UrlParser(object): def __init__(self): self.rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8') self.zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8') self.cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8') self.yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8') def process(self, row: dict): # row['Current_Url'] = row['Current_Url'].apply(lambda x: (x)) row['Current_Url'] = parse_url(row['Current_Url']) row['Target_Url'] = parse_url(row['Target_Url']) # return self.process_record(row.iloc[0]) return self.process_record(row) def process_record(self, item: dict): i = item['Current_Url'] result = None if i[1] == '10.10.21.23': result = {'domain': '人资域'} if i[2] == '/gmp/login.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域登录', '二级标题': '人资域登录', '三级标题': '人资域登录', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[2] == '/gmp/static/gmpweb/hrIndex.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域首页', '二级标题': '人资域首页', '三级标题': '人资域首页', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[2] == '/gmp/static/gmpweb/workbench/todo/TodoCenter.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域', '二级标题': '公共应用', '三级标题': '代办中心', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[2] == 'hrcore_web/labormanagement/contractagreementview/myAgreement.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域', '二级标题': '个人应用', '三级标题': '我的协议', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[5].get("appCode"): task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } else: # 后续使用 iframe 处理 pass elif i[1] == '10.10.21.28': result = {'domain': '资产域'} if i[2] == '/gmp/static/gmpweb/index.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '资产域', '一级标题': '资产域首页', '二级标题': '资产域首页', '三级标题': '资产域首页', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[5].get('appCode'): task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '资产域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[1] == 'fms.gmp.cloud.hq.iv.csg': result = {'domain': '财务域'} if i[2] == '/gmp/login.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': '登录财务域', '二级标题': '登录财务域', '三级标题': '登录财务域', 'text': item['Button_Text'] } elif i[2] == '/gmp/index.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': '财务域首页', '二级标题': '财务域首页', '三级标题': '财务域首页', 'text': item['Button_Text'] } elif i[2] == '/gmp/': # 待确认 result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': '财务域认证', '二级标题': '财务域认证', '三级标题': '财务域认证', 'text': item['Button_Text'] } elif i[5].get('appCode'): task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] } elif i[1] == '10.150.23.1:8010': result = {'domain': '营销域'} if i[2] == '/PMS/login.do': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '营销域', '一级标题': '营销域登录', '二级标题': '营销域登录', '三级标题': '营销域登录', 'text': item['Button_Text'] if item['Button_Text'] else None } else: fd = item['Form_Data'] if item['Form_Data'] and item['Form_Data'] != 'None': try: if item['Form_Data'][0] != "{": item["Params_Data"] = item['Form_Data'] form_data = None else: item['Form_Data'] = item['Form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[") item['Form_Data'] = item['Form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}") item['Form_Data'] = item['Form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"") item['Form_Data'] = item['Form_Data'].replace("\n", "") form_data = json.loads(item['Form_Data']) except Exception as e: logging.error(item['Form_Data']) logging.error(fd) form_data = None raise e else: form_data = None if form_data and '_INVOKE_FUNC_TITLE_' in form_data: title = form_data['_INVOKE_FUNC_TITLE_'][0] appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1] logging.debug(appcontext) task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''四级标题 == "{title}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}"''') try: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True, 'domain': '营销域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], '四级标题': task['四级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } except Exception as e: logging.error(task) logging.error(item['Form_Data']) logging.error(title) logging.error(e) logging.error(form_data['_INVOKE_FUNC_URL_'][0]) else: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': False, 'domain': '营销域', '一级标题': '登录门户', '二级标题': '登录门户', '三级标题': '登录门户', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[1] == '4a.gd.csg.local': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '登录门户', '一级标题': '登录门户', '二级标题': '登录门户', '三级标题': '登录门户', 'text': item['Button_Text'] if item['Button_Text'] else None } return result class UserVisitAnalyzeSpace(object): """用户访问分析 接收用户创建的分析任务 时间范围:起始时间~结束时间 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装 执行submit,将taskid作为参数传递给submit """ def __init__(self, user: str): self.user = user self.pool_size = 10 self.start_point = 0 self.end_point = 0 # 一级缓存 self.cache_item = dict() # 二级缓存数据 self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay']) def __repr__(self): return f'' def push(self, item: dict): # 消息进队列 self.df_queue.loc[self.end_point] = item self.end_point = (self.end_point + 1) % self.pool_size def pop(self): # 消息出队列 item = self.df_queue.loc[self.start_point] self.start_point = (self.start_point + 1) % self.pool_size return item def send(self, item: dict): # 当前缓存无数据 if not self.cache_item: self.cache_item = item else: # 判断缓存内数据和新数据哪个更新 if item['time'] > self.cache_item['time']: # 当前数据较新,计算页面持续时间 delay = (item['time'] - self.cache_item['time']).seconds if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']): self.cache_item['stay'] += delay else: self.push(self.cache_item) self.cache_item = item t = Thread(target=func, args=(self.pop(),)) t.start() else: # 老数据较新,数据发送产生异常,数据产生积压 pass # self.df = pd.concat([self.df, pd.DataFrame([item])]) # if self.df.shape[0] >= self.pool_size: # # 新建临时表 # dt = self.df[0:1] # index = 0 # for i in range(1, self.pool_size): # d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i]) # if isinstance(d2, pd.DataFrame): # dt.iloc[index] = d1 # index += 1 # dt.iloc[index] = d2 # session = DM8Session() # result = session.execute("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""", # args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay'])) # else: # dt.iloc[index] = d1 # self.df = pd.concat([dt, self.df[self.pool_size:]]) def getActionByDateRange(self): """获取指定日期范围内的用户访问行为数据 """ pass def getSession2Action(self) -> DataFrame: """获取sessionid到访问行为数据的映射的 """ pass def aggregateBySession(self): """对行为数据按session粒度进行聚合,并将用户信息join后聚合 """ pass def filterSsessionAndAggrStat(self): """过滤session数据,并进行计数器值累加 """ pass def getSession2detail(self): """获取通过筛选条件的session的访问明细数据 """ pass def getTopKCategory(self): """获取topK热门分类 第一步:获取符合条件的访问过的所有品类 第二步:计算各品类的点击的次数 第三步:自定义二次排序key 第四步:将数据映射成格式的RDD,然后进行二次排序(降序) 第五步:取出top10热门品类,并写入DM """ pass def getClickCategory2Count(self) -> DataFrame: """获取各分类点击次数 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀 """ pass def getTopKSession(self): """获取每个分类top10活跃用户 1、将topK热门分类的id,生成DF 2、计算topK分类被各用户访问的次数 3、分组取TopK算法实现,获取每个分类的topK活跃用户 4、获取topK活跃用户的明细数据,并写入DM """ pass # --- 池化 --- # def pooling(self, line1: dict, line2: dict): if line1['time'] > line2['time']: deltat = (line1['time'] - line2['time']).seconds if (deltat < 2) and (line1['三级标题'] == line2['三级标题']): line2['stay'] += deltat return line2, None else: return line2, line1 else: deltat = (line2['time'] - line1['time']).seconds if (deltat < 2) and (line1['三级标题'] == line2['三级标题']): line1['stay'] += deltat return line1, None else: return line1, line2 class BusinessClickRealTimeSpace(object): def __init__(self): self.aggregatedDStream = pd.DataFrame(columns=['word', 'count']) def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame: """计算业务访问流量实时统计 计算每天业务的访问量 设计维度:日期时间、用户、业务、访问次数 可以看到当天所有的实时数据, 通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份 """ """ 我们要对原始数据进行map,映射成格式 然后,对上述格式的数据,执行updateStateByKey算子 """ def _filter(data: dict) -> tuple: return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1} mappedDStream = _filter(sourceDStream) self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True) self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count") # 将计算出来的结果同步一份到DM session = DM8Session() for row in self.aggregatedDStream.itertuples(): try: result = session.execute("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count)) except Exception as e: print(e) return self.aggregatedDStream def calculateTopKBusiness(self): """计算每天用户的TopK访问业务 计算出每天各业务的点击量 """ pass def calculateBusinessCountByWindow(self): """计算最近1小时滑动窗口内的业务访问趋势 """ pass # 字典键首字母大写 # def capital_to_higher(dict_info): # new_dict = {} # for i, j in dict_info.items(): # new_dict[re.sub("([a-zA-Z])", lambda x: x.groups()[0].upper(), i, 1)] = j # return new_dict # 字典键全改小写 def capital_to_lower(dict_info): new_dict = {} for i, j in dict_info.items(): new_dict[i.lower()] = j return new_dict def predict(g: dict, item: dict): # 字典键全改小写 item = capital_to_lower(item) # 放入全局 if g.get(item['user_id']): g[item['user_id']].push(item) else: g[item['user_id']] = UserActivateDegreeAnalyzeSpace(item['user_id']) g[item['user_id']].push(item) # item = capital_to_higher(item) # if g.get(item['User_Id']): # g[item['User_Id']].push(item) # else: # g[item['User_Id']] = UserActivateDegreeAnalyzeSpace(item['User_Id']) # g[item['User_Id']].push(item) # session = DM8Session() # # 添加采集量统计 # try: # df = session.query("""SELECT "ID", "Count" FROM TOTALCOUNT WHERE "Name" = ?""", args="采集量") # if df.shape[0] == 1: # result = session.execute("""UPDATE TOTALCOUNT SET "Count"=? WHERE TOTALCOUNT.ID = ?""", (int(df['Count'])+1, int(df['ID']))) # except Exception as e: # print(e) # # URL预处理存储 # parser = UrlParser() # frag = parser.process(item) # frag['stay'] = 0 # 添加序列 # if g.get(item['User_Id']): # # g[item['User_Id']].push(item) # g[item['User_Id']].send(frag) # else: # g[item['User_Id']] = UserSpace(item['User_Id']) # # g[item['User_Id']].push(item) # g[item['User_Id']].send(frag) # if g.get('BusinessClickRealTimeSpace'): # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag) # else: # g['BusinessClickRealTimeSpace'] = BusinessClickRealTimeSpace() # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag) # print(result) """ from sqlalchemy import Column, Integer, String, Date, Numeric, Text from sqlalchemy.ext.declarative import declarative_base # 创建对象的基类: Base = declarative_base() class Product(Base): # 表的名字: __tablename__ = 'product' # 表的结构: PRODUCTID = Column(Integer,autoincrement=True, primary_key=True) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from Product import Base, Product def fun_select_all(DBSession): # 创建Session session = DBSession() # 查询所有的 list_product = session.query(Product).all() print('查询所有结果:') for product in list_product: print(product.NAME)# , product.AUTHOR, product.PUBLISHER ) print('') session.close() def fun_insert(DBSession): # 创建Session session = DBSession() new_product = Product() new_product.NAME = '水浒传' session.add(new_product) session.commit() print('插入成功') session.close() def fun_update(DBSession): # 创建Session session = DBSession() product = session.query(Product).filter(Product.NAME == '水浒传').one() product.NAME = '水浒' session.commit() print('更新成功') session.close() def fun_delete(DBSession): # 创建Session session = DBSession() session.query(Product).filter(Product.NAME == '水浒').delete() session.commit() print('删除成功') session.close() def main(): #dialect 是SQLAlchemy用来与各种类型的DBAPI和数据库通信的系统。 conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236' #创建Engine对象 engine = create_engine(conn_url) #创建DBSession对象 DBSession = sessionmaker(bind=engine) Base.metadata.create_all(engine) # 创建表结构 fun_select_all(DBSession) # # 插入 fun_insert(DBSession) fun_select_all(DBSession) # # 插入 fun_insert1(DBSession) fun_select_all(DBSession) # # 更新 fun_update(DBSession) fun_select_all(DBSession) # # 删除 fun_delete(DBSession) fun_select_all(DBSession) """