123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-18 09:47:28
- # @Last Modified time: 2024-02-23 13:25:42
- import os, re, json
- import datetime
- import requests
- import pandas as pd
- from pandas import DataFrame
- import logging
- from .UserActivateDegreeAnalyze import UserActivateDegreeAnalyzeSpace
- from ..preprocess import parse_url
- class ResultJson:
- def __init__(self, time, user_id, domain, title1, title2, title3, device_id=None, text=None, is_op=True, is_bus=False, is_bias=False, status="正常"):
- # 执行时间
- self.time = time.strftime('%Y-%m-%d %H:%M:%S')
- # 执行用户
- self.user_id = user_id
- # 执行域
- self.domain = domain
- self.title1 = title1
- self.title2 = title2
- # 业务项
- self.title3 = title3
- # 设备
- self.device_id = device_id
- # 操作步骤
- self.text = text if text else ""
- # 是否用户操作
- self.is_op = is_op
- # 是否业务
- self.is_bus = is_bus
- # 是否偏离
- self.is_bias = is_bias
- # 执行结果
- self.status = status
- def to_json(self) -> dict:
- return {
- "real": { # 实时信息
- "time": self.time, # 记录时间
- "userId": self.user_id, # 用户ID
- "deviceId": self.device_id, # 设备ID
- "domain": self.domain, # 访问域
- "business": self.title3, # 业务类型
- "option": self.text, # 执行操作
- "isOp": self.is_op, # 是否是一条操作
- "isBus": self.is_bus, # 是否是一项业务
- "isBias": self.is_bias, # 是否偏离
- "status": self.status # 执行结果
- },
- "base": { # 定时信息
- "用户ID": self.user_id,
- "常用时段": "",
- "所在部门": "",
- "使用设备": "",
- "操作系统": "",
- "常用业务": ""
- }
- }
- def postback(json_data: dict) -> None:
- url = os.environ.get('BKURL')
- response = requests.post(url, json={'parameter': json_data})
- if response.status_code == 200:
- print('success')
- else:
- print(response.text)
- class UrlParser(object):
- def __init__(self):
- self.rzdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/人资域.json', orient='records', lines=True, encoding='utf-8')
- self.zcdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/资产域.json', orient='records', lines=True, encoding='utf-8')
- self.cwdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/财务域.json', orient='records', lines=True, encoding='utf-8')
- self.yxdf = pd.read_json(path_or_buf='/root/mzinfo/V2/app/resources/营销域.json', orient='records', lines=True, encoding='utf-8')
- def process(self, row: dict):
- row['Current_Url'] = parse_url(row['Current_Url'])
- row['Target_Url'] = parse_url(row['Target_Url'])
- return self.process_record(row)
- def process_record(self, item: dict):
- i = item['Current_Url']
- result = None
- if i[1] == '10.10.21.23':
- result = {'domain': '人资域'}
- if i[2] == '/gmp/login.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '人资域',
- '一级标题': '人资域登录',
- '二级标题': '人资域登录',
- '三级标题': '人资域登录',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- elif i[2] == '/gmp/static/gmpweb/hrIndex.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '人资域',
- '一级标题': '人资域首页',
- '二级标题': '人资域首页',
- '三级标题': '人资域首页',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- elif i[2] == '/gmp/static/gmpweb/workbench/todo/TodoCenter.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '人资域',
- '一级标题': '人资域',
- '二级标题': '公共应用',
- '三级标题': '代办中心',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- elif i[2] == 'hrcore_web/labormanagement/contractagreementview/myAgreement.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '人资域',
- '一级标题': '人资域',
- '二级标题': '个人应用',
- '三级标题': '我的协议',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- elif i[2] == '/gmp/static/hr/peixun/user/index.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '人资域',
- '一级标题': '人资域',
- '二级标题': '培训管理',
- '三级标题': '培训管理',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- elif i[5].get("appCode"):
- task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
- try:
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '人资域',
- '一级标题': task['一级标题'].values[0],
- '二级标题': task['二级标题'].values[0],
- '三级标题': task['三级标题'].values[0],
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- except Exception as e:
- print(f"\033[0;33;40m 错误 code {i[5].get('appCode')} {task}\033[0m")
- else: # 后续使用 iframe 处理
- pass
- elif i[1] == '10.10.21.28':
- result = {'domain': '资产域'}
- if i[2] == '/gmp/static/gmpweb/index.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '资产域',
- '一级标题': '资产域首页',
- '二级标题': '资产域首页',
- '三级标题': '资产域首页',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- elif i[5].get('appCode'):
- task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
- try:
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '资产域',
- '一级标题': task['一级标题'].values[0],
- '二级标题': task['二级标题'].values[0],
- '三级标题': task['三级标题'].values[0],
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- except Exception as e:
- print(f"\033[0;33;40m 错误 code {i[5].get('appCode')} {task}\033[0m")
- elif i[1] == 'fms.gmp.cloud.hq.iv.csg':
- result = {'domain': '财务域'}
- if i[2] == '/gmp/login.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '财务域',
- '一级标题': '登录财务域',
- '二级标题': '登录财务域',
- '三级标题': '登录财务域',
- 'text': item['Button_Text']
- }
- elif i[2] == '/gmp/index.html':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '财务域',
- '一级标题': '财务域首页',
- '二级标题': '财务域首页',
- '三级标题': '财务域首页',
- 'text': item['Button_Text']
- }
- elif i[2] == '/gmp/': # 待确认
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '财务域',
- '一级标题': '财务域认证',
- '二级标题': '财务域认证',
- '三级标题': '财务域认证',
- 'text': item['Button_Text']
- }
- elif i[5].get('appCode'):
- try:
- task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '财务域',
- '一级标题': task['一级标题'].values[0],
- '二级标题': task['二级标题'].values[0],
- '三级标题': task['三级标题'].values[0],
- 'text': item['Button_Text']
- }
- except Exception as e:
- print(f"\033[0;33;40m 错误 code {i[5].get('appCode')} {task}\033[0m")
- elif i[1] == '10.150.23.1:8010':
- result = {'domain': '营销管理'}
- fd = item['Form_Data']
- if item['Form_Data'] and item['Form_Data'] != 'None':
- try:
- if item['Form_Data'][0] != "{":
- item["Params_Data"] = item['Form_Data']
- form_data = None
- else:
- item['Form_Data'] = item['Form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[")
- item['Form_Data'] = item['Form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}")
- item['Form_Data'] = item['Form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"")
- item['Form_Data'] = item['Form_Data'].replace("\n", "")
- form_data = json.loads(item['Form_Data'])
- except Exception as e:
- logging.error(item['Form_Data'])
- logging.error(fd)
- form_data = None
- raise e
- else:
- form_data = None
- if form_data and '_INVOKE_FUNC_TITLE_' in form_data:
- title = form_data['_INVOKE_FUNC_TITLE_'][0]
- appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1]
- print(title, appcontext)
- task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''')
- if task.empty:
- task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''')
- if task.empty:
- task = self.yxdf.query(f'''四级标题 == "{title}"''')
- if task.empty:
- task = self.yxdf.query(f'''三级标题 == "{title}"''')
- try:
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True,
- 'domain': '营销管理',
- '一级标题': task['一级标题'].values[0],
- '二级标题': task['二级标题'].values[0],
- '三级标题': task['三级标题'].values[0],
- '四级标题': task['四级标题'].values[0],
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- except Exception as e:
- logging.error(task)
- logging.error(item['Form_Data'])
- logging.error(title)
- logging.error(e)
- logging.error(form_data['_INVOKE_FUNC_URL_'][0])
- else:
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': False,
- 'domain': '营销管理',
- '一级标题': '登录门户',
- '二级标题': '登录门户',
- '三级标题': '登录门户',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- elif i[1] == '4a.gd.csg.local':
- result = {
- 'time': item['Record_Time'],
- 'user': item['User_Id'],
- 'method': item['Request_Method'],
- 'hasform': True if item['Form_Data'] else False,
- 'domain': '登录门户',
- '一级标题': '登录门户',
- '二级标题': '登录门户',
- '三级标题': '登录门户',
- 'text': item['Button_Text'] if item['Button_Text'] else None
- }
- return result
- # 字典键首字母大写
- # def capital_to_higher(dict_info: dict) -> dict:
- # new_dict = {}
- # for i, j in dict_info.items():
- # new_dict[re.sub("([a-zA-Z])", lambda x: x.groups()[0].upper(), i, 1)] = j
- # return new_dict
- # 字典键全改小写
- def capital_to_lower(dict_info: dict) -> dict:
- new_dict = {}
- for i, j in dict_info.items():
- new_dict[i.lower()] = j
- return new_dict
- def predict(g: dict, item: dict):
- # 字典键全改小写
- item = capital_to_lower(item)
- # 放入全局
- if g.get(item['user_id']):
- g[item['user_id']].push(item)
- else:
- g[item['user_id']] = UserActivateDegreeAnalyzeSpace(item['user_id'])
- g[item['user_id']].push(item)
- # if g.get('BusinessClickRealTimeSpace'):
- # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
- # else:
- # g['BusinessClickRealTimeSpace'] = BusinessClickRealTimeSpace()
- # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
- # print(result)
|