#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-18 09:47:28 # @Last Modified time: 2024-01-23 11:53:14 import re import datetime import dmPython import pandas as pd from pandas import DataFrame from threading import Thread from .preprocess import parse_url from .UserActivateDegreeAnalyze import UserActivateDegreeAnalyzeSpace def func(item): print(item) session = DM8Session() try: result = session.insert("""INSERT INTO FRAGMENT("Time", "User_Id", "Method", "Has_Form", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?,?,?,?)""", (item['time'],item['user'],item['method'],item['hasform'],item['domain'],item['一级标题'],item['二级标题'],item['三级标题'],item['四级标题'],item['text'], item['stay'])) except Exception as e: print(e) # class filteredRealTimeLogDStream(object): # time: str # user_id: str # method: str # hasform: bool # domain: str # title1: str # title2: str # title3: str # title4: str # behavior: str # stay: int = 0 class DM8Session(object): def __init__(self): self.conn = dmPython.connect(user='SYSDBA', password='SYSDBA001', server='192.168.1.202', port=30236) def query(self, sql_cmd: str, args=None): '''以数据框形式返回查询据结果''' try: self.cursor = self.conn.cursor() if args: self.cursor.execute(sql_cmd, args) else: self.cursor.execute(sql_cmd) data = self.cursor.fetchall() # 以元组形式返回查询数据 header = [t[0] for t in self.cursor.description] df = pd.DataFrame(list(data), columns=header) # pd.DataFrame 对列表具有更好的兼容性 except Exception as e: print(e) finally: self.cursor.close() return df def insert(self, sql_cmd: str, args): try: self.cursor = self.conn.cursor() # 执行sql语句 self.cursor.execute(sql_cmd, args) self.conn.commit() status = True except Exception as e: # 发生错误时回滚 self.conn.rollback() print(e) status = False finally: self.cursor.close() return status def update(self, sql_cmd: str, args): try: self.cursor = self.conn.cursor() self.cursor.execute(sql_cmd, args) self.conn.commit() status = True except Exception as e: self.conn.rollback() print(e) status = False finally: self.cursor.close() return status def __del__(self): self.conn.close() class UrlParser(object): def __init__(self): self.rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8') self.zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8') self.cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8') self.yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8') def process(self, row: dict): # row['Current_Url'] = row['Current_Url'].apply(lambda x: (x)) row['Current_Url'] = parse_url(row['Current_Url']) row['Target_Url'] = parse_url(row['Target_Url']) # return self.process_record(row.iloc[0]) return self.process_record(row) def process_record(self, item: dict): i = item['Current_Url'] j = item['Target_Url'] # result = item result = None if i[1] == '10.10.21.23': result = {'domain': '人资域'} if i[5].get("appCode"): task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[1] == '10.10.21.28': result = {'domain': '资产域'} if i[5].get('appCode'): task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '资产域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[1] == 'fms.gmp.cloud.hq.iv.csg': result = {'domain': '财务域'} if i[5].get('appCode'): task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[1] == '10.150.23.1:8010': result = {'domain': '营销域'} fd = item['Form_Data'] if item['Form_Data'] and item['Form_Data'] != 'None': try: if item['Form_Data'][0] != "{": item["Params_Data"] = item['Form_Data'] form_data = None else: item['Form_Data'] = item['Form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[") item['Form_Data'] = item['Form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}") item['Form_Data'] = item['Form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"") item['Form_Data'] = item['Form_Data'].replace("\n", "") form_data = json.loads(item['Form_Data']) except Exception as e: logging.error(item['Form_Data']) logging.error(fd) form_data = None raise e else: form_data = None if form_data and '_INVOKE_FUNC_TITLE_' in form_data: title = form_data['_INVOKE_FUNC_TITLE_'][0] appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1] logging.debug(appcontext) task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''四级标题 == "{title}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}"''') try: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True, 'domain': '营销域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], '四级标题': task['四级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } except Exception as e: logging.error(task) logging.error(item['Form_Data']) logging.error(title) logging.error(e) logging.error(form_data['_INVOKE_FUNC_URL_'][0]) else: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'domain': '营销域', 'hasform': False, 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[1] == '4a.gd.csg.local': result = { 'time': item['Record_Time'], 'user': item['User_Id'], '域': '登录门户' } return result class UserVisitAnalyzeSpace(object): """用户访问分析 接收用户创建的分析任务 时间范围:起始时间~结束时间 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装 执行submit,将taskid作为参数传递给submit """ def __init__(self, user: str): self.user = user self.pool_size = 10 self.start_point = 0 self.end_point = 0 # 一级缓存 self.cache_item = dict() # 二级缓存数据 self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay']) def __repr__(self): return f'' def push(self, item: dict): # 消息进队列 self.df_queue.loc[self.end_point] = item self.end_point = (self.end_point + 1) % self.pool_size def pop(self): # 消息出队列 item = self.df_queue.loc[self.start_point] self.start_point = (self.start_point + 1) % self.pool_size return item def send(self, item: dict): # 当前缓存无数据 if not self.cache_item: self.cache_item = item else: # 判断缓存内数据和新数据哪个更新 if item['time'] > self.cache_item['time']: # 当前数据较新,计算页面持续时间 delay = (item['time'] - self.cache_item['time']).seconds if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']): self.cache_item['stay'] += delay else: self.push(self.cache_item) self.cache_item = item t = Thread(target=func, args=(self.pop(),)) t.start() else: # 老数据较新,数据发送产生异常,数据产生积压 pass # self.df = pd.concat([self.df, pd.DataFrame([item])]) # if self.df.shape[0] >= self.pool_size: # # 新建临时表 # dt = self.df[0:1] # index = 0 # for i in range(1, self.pool_size): # d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i]) # if isinstance(d2, pd.DataFrame): # dt.iloc[index] = d1 # index += 1 # dt.iloc[index] = d2 # session = DM8Session() # result = session.insert("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""", # args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay'])) # else: # dt.iloc[index] = d1 # self.df = pd.concat([dt, self.df[self.pool_size:]]) def getActionByDateRange(self): """获取指定日期范围内的用户访问行为数据 """ pass def getSession2Action(self) -> DataFrame: """获取sessionid到访问行为数据的映射的 """ pass def aggregateBySession(self): """对行为数据按session粒度进行聚合,并将用户信息join后聚合 """ pass def filterSsessionAndAggrStat(self): """过滤session数据,并进行计数器值累加 """ pass def getSession2detail(self): """获取通过筛选条件的session的访问明细数据 """ pass def getTopKCategory(self): """获取topK热门分类 第一步:获取符合条件的访问过的所有品类 第二步:计算各品类的点击的次数 第三步:自定义二次排序key 第四步:将数据映射成格式的RDD,然后进行二次排序(降序) 第五步:取出top10热门品类,并写入DM """ pass def getClickCategory2Count(self):# -> DataFrame """获取各分类点击次数 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀 """ pass def getTopKSession(self): """获取每个分类top10活跃用户 1、将topK热门分类的id,生成DF 2、计算topK分类被各用户访问的次数 3、分组取TopK算法实现,获取每个分类的topK活跃用户 4、获取topK活跃用户的明细数据,并写入DM """ pass # --- 池化 --- # def pooling(self, line1: dict, line2: dict): if line1['time'] > line2['time']: deltat = (line1['time'] - line2['time']).seconds if (deltat < 2) and (line1['三级标题'] == line2['三级标题']): line2['stay'] += deltat return line2, None else: return line2, line1 else: deltat = (line2['time'] - line1['time']).seconds if (deltat < 2) and (line1['三级标题'] == line2['三级标题']): line1['stay'] += deltat return line1, None else: return line1, line2 class BusinessClickRealTimeSpace(object): def __init__(self): self.aggregatedDStream = pd.DataFrame(columns=['word', 'count']) def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame: """计算业务访问流量实时统计 计算每天业务的访问量 设计维度:日期时间、用户、业务、访问次数 可以看到当天所有的实时数据, 通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份 """ """ 我们要对原始数据进行map,映射成格式 然后,对上述格式的数据,执行updateStateByKey算子 """ def _filter(data: dict) -> tuple: return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1} mappedDStream = _filter(sourceDStream) self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True) self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count") # 将计算出来的结果同步一份到DM session = DM8Session() for row in self.aggregatedDStream.itertuples(): try: result = session.insert("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count)) except Exception as e: print(e) return self.aggregatedDStream def calculateTopKBusiness(self): """计算每天用户的TopK访问业务 计算出每天各业务的点击量 """ pass def calculateBusinessCountByWindow(self): """计算最近1小时滑动窗口内的业务访问趋势 """ pass # class PageOneStepSpace(object): # def __init__(self): # pass # def getSession2Action(self): # """获取格式的数据 # """ # pass # def persistConvertRate(self): # """持久转化率 # """ # pass def capital_to_higher(dict_info): new_dict = {} for i, j in dict_info.items(): new_dict[re.sub("([a-zA-Z])", lambda x: x.groups()[0].upper(), i, 1)] = j return new_dict def predict(g: dict, item: dict): item = capital_to_higher(item) if g.get(item['User_Id']): g[item['User_Id']].push(item) else: g[item['User_Id']] = UserActivateDegreeAnalyzeSpace(item['User_Id']) g[item['User_Id']].push(item) # session = DM8Session() # # 添加采集量统计 # try: # df = session.query("""SELECT "ID", "Count" FROM TOTALCOUNT WHERE "Name" = ?""", args="采集量") # if df.shape[0] == 1: # result = session.update("""UPDATE TOTALCOUNT SET "Count"=? WHERE TOTALCOUNT.ID = ?""", (int(df['Count'])+1, int(df['ID']))) # except Exception as e: # print(e) # # URL预处理存储 # parser = UrlParser() # frag = parser.process(item) # frag['stay'] = 0 # 添加序列 # if g.get(item['User_Id']): # # g[item['User_Id']].push(item) # g[item['User_Id']].send(frag) # else: # g[item['User_Id']] = UserSpace(item['User_Id']) # # g[item['User_Id']].push(item) # g[item['User_Id']].send(frag) # if g.get('BusinessClickRealTimeSpace'): # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag) # else: # g['BusinessClickRealTimeSpace'] = BusinessClickRealTimeSpace() # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag) # print(result) """ from sqlalchemy import Column, Integer, String, Date, Numeric, Text from sqlalchemy.ext.declarative import declarative_base # 创建对象的基类: Base = declarative_base() class Product(Base): # 表的名字: __tablename__ = 'product' # 表的结构: PRODUCTID = Column(Integer,autoincrement=True, primary_key=True) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from Product import Base, Product def fun_select_all(DBSession): # 创建Session session = DBSession() # 查询所有的 list_product = session.query(Product).all() print('查询所有结果:') for product in list_product: print(product.NAME)# , product.AUTHOR, product.PUBLISHER ) print('') session.close() def fun_insert(DBSession): # 创建Session session = DBSession() new_product = Product() new_product.NAME = '水浒传' session.add(new_product) session.commit() print('插入成功') session.close() def fun_update(DBSession): # 创建Session session = DBSession() product = session.query(Product).filter(Product.NAME == '水浒传').one() product.NAME = '水浒' session.commit() print('更新成功') session.close() def fun_delete(DBSession): # 创建Session session = DBSession() session.query(Product).filter(Product.NAME == '水浒').delete() session.commit() print('删除成功') session.close() def main(): #dialect 是SQLAlchemy用来与各种类型的DBAPI和数据库通信的系统。 conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236' #创建Engine对象 engine = create_engine(conn_url) #创建DBSession对象 DBSession = sessionmaker(bind=engine) Base.metadata.create_all(engine) # 创建表结构 fun_select_all(DBSession) # # 插入 fun_insert(DBSession) fun_select_all(DBSession) # # 插入 fun_insert1(DBSession) fun_select_all(DBSession) # # 更新 fun_update(DBSession) fun_select_all(DBSession) # # 删除 fun_delete(DBSession) fun_select_all(DBSession) """