#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-18 09:47:28 # @Last Modified time: 2024-02-23 13:25:42 import os, re, json import datetime import requests import pandas as pd from pandas import DataFrame import logging from .UserActivateDegreeAnalyze import UserActivateDegreeAnalyzeSpace from ..preprocess import parse_url class ResultJson: def __init__(self, time, user_id, domain, title1, title2, title3, device_id=None, text=None, is_op=True, is_bus=False, is_bias=False, status="正常"): # 执行时间 self.time = time.strftime('%Y-%m-%d %H:%M:%S') # 执行用户 self.user_id = user_id # 执行域 self.domain = domain self.title1 = title1 self.title2 = title2 # 业务项 self.title3 = title3 # 设备 self.device_id = device_id # 操作步骤 self.text = text if text else "" # 是否用户操作 self.is_op = is_op # 是否业务 self.is_bus = is_bus # 是否偏离 self.is_bias = is_bias # 执行结果 self.status = status def to_json(self) -> dict: return { "real": { # 实时信息 "time": self.time, # 记录时间 "userId": self.user_id, # 用户ID "deviceId": self.device_id, # 设备ID "domain": self.domain, # 访问域 "business": self.title3, # 业务类型 "option": self.text, # 执行操作 "isOp": self.is_op, # 是否是一条操作 "isBus": self.is_bus, # 是否是一项业务 "isBias": self.is_bias, # 是否偏离 "status": self.status # 执行结果 }, "base": { # 定时信息 "用户ID": self.user_id, "常用时段": "", "所在部门": "", "使用设备": "", "操作系统": "", "常用业务": "" } } def postback(json_data: dict) -> None: url = os.environ.get('BKURL') response = requests.post(url, json={'parameter': json_data}) if response.status_code == 200: print('success') else: print(response.text) class UrlParser(object): def __init__(self): self.rzdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/人资域.json', orient='records', lines=True, encoding='utf-8') self.zcdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/资产域.json', orient='records', lines=True, encoding='utf-8') self.cwdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/财务域.json', orient='records', lines=True, encoding='utf-8') self.yxdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/营销域.json', orient='records', lines=True, encoding='utf-8') def process(self, row: dict): row['Current_Url'] = parse_url(row['Current_Url']) row['Target_Url'] = parse_url(row['Target_Url']) return self.process_record(row) def process_record(self, item: dict): i = item['Current_Url'] result = None if i[1] == '10.10.21.23': result = {'domain': '人资域'} if i[2] == '/gmp/login.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域登录', '二级标题': '人资域登录', '三级标题': '人资域登录', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[2] == '/gmp/static/gmpweb/hrIndex.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域首页', '二级标题': '人资域首页', '三级标题': '人资域首页', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[2] == '/gmp/static/gmpweb/workbench/todo/TodoCenter.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域', '二级标题': '公共应用', '三级标题': '代办中心', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[2] == 'hrcore_web/labormanagement/contractagreementview/myAgreement.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域', '二级标题': '个人应用', '三级标题': '我的协议', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[2] == '/gmp/static/hr/peixun/user/index.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': '人资域', '二级标题': '培训管理', '三级标题': '培训管理', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[5].get("appCode"): task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') try: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '人资域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } except Exception as e: print(f"\033[0;33;40m 错误 code {i[5].get('appCode')} {task}\033[0m") else: # 后续使用 iframe 处理 pass elif i[1] == '10.10.21.28': result = {'domain': '资产域'} if i[2] == '/gmp/static/gmpweb/index.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '资产域', '一级标题': '资产域首页', '二级标题': '资产域首页', '三级标题': '资产域首页', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[5].get('appCode'): task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') try: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '资产域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } except Exception as e: print(f"\033[0;33;40m 错误 code {i[5].get('appCode')} {task}\033[0m") elif i[1] == 'fms.gmp.cloud.hq.iv.csg': result = {'domain': '财务域'} if i[2] == '/gmp/login.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': '登录财务域', '二级标题': '登录财务域', '三级标题': '登录财务域', 'text': item['Button_Text'] } elif i[2] == '/gmp/index.html': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': '财务域首页', '二级标题': '财务域首页', '三级标题': '财务域首页', 'text': item['Button_Text'] } elif i[2] == '/gmp/': # 待确认 result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': '财务域认证', '二级标题': '财务域认证', '三级标题': '财务域认证', 'text': item['Button_Text'] } elif i[5].get('appCode'): try: task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '财务域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['Button_Text'] } except Exception as e: print(f"\033[0;33;40m 错误 code {i[5].get('appCode')} {task}\033[0m") elif i[1] == '10.150.23.1:8010': result = {'domain': '营销管理'} fd = item['Form_Data'] if item['Form_Data'] and item['Form_Data'] != 'None': try: if item['Form_Data'][0] != "{": item["Params_Data"] = item['Form_Data'] form_data = None else: item['Form_Data'] = item['Form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[") item['Form_Data'] = item['Form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}") item['Form_Data'] = item['Form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"") item['Form_Data'] = item['Form_Data'].replace("\n", "") form_data = json.loads(item['Form_Data']) except Exception as e: logging.error(item['Form_Data']) logging.error(fd) form_data = None raise e else: form_data = None if form_data and '_INVOKE_FUNC_TITLE_' in form_data: title = form_data['_INVOKE_FUNC_TITLE_'][0] appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1] print(title, appcontext) task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''四级标题 == "{title}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}"''') try: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True, 'domain': '营销管理', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], '四级标题': task['四级标题'].values[0], 'text': item['Button_Text'] if item['Button_Text'] else None } except Exception as e: logging.error(task) logging.error(item['Form_Data']) logging.error(title) logging.error(e) logging.error(form_data['_INVOKE_FUNC_URL_'][0]) else: result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': False, 'domain': '营销管理', '一级标题': '登录门户', '二级标题': '登录门户', '三级标题': '登录门户', 'text': item['Button_Text'] if item['Button_Text'] else None } elif i[1] == '4a.gd.csg.local': result = { 'time': item['Record_Time'], 'user': item['User_Id'], 'method': item['Request_Method'], 'hasform': True if item['Form_Data'] else False, 'domain': '登录门户', '一级标题': '登录门户', '二级标题': '登录门户', '三级标题': '登录门户', 'text': item['Button_Text'] if item['Button_Text'] else None } return result # 字典键首字母大写 # def capital_to_higher(dict_info: dict) -> dict: # new_dict = {} # for i, j in dict_info.items(): # new_dict[re.sub("([a-zA-Z])", lambda x: x.groups()[0].upper(), i, 1)] = j # return new_dict # 字典键全改小写 def capital_to_lower(dict_info: dict) -> dict: new_dict = {} for i, j in dict_info.items(): new_dict[i.lower()] = j return new_dict def predict(g: dict, item: dict): # 字典键全改小写 item = capital_to_lower(item) # 放入全局 if g.get(item['user_id']): g[item['user_id']].push(item) else: g[item['user_id']] = UserActivateDegreeAnalyzeSpace(item['user_id']) g[item['user_id']].push(item) # if g.get('BusinessClickRealTimeSpace'): # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag) # else: # g['BusinessClickRealTimeSpace'] = BusinessClickRealTimeSpace() # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag) # print(result)