#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-04 11:57:15 # @Last Modified time: 2024-01-23 11:09:55 from datetime import datetime import json import urllib # import pymysql import pandas as pd # pd.set_option('display.width', 1000) # pd.set_option('display.max_rows', None) # pd.set_option('display.max_columns', None) pd.set_option('max_colwidth', 1000) import logging from app.main.preprocess import parse_url """ rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8') zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8') cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8') yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8') """ """ from flask.ext.sqlalchemy import SQLAlchemy db = SQLAlchemy() class Recorder(db.Model): __tablename__ = 'recoder' record_Time = db.Column(db.Time) current_Url = db.Column(db.Text) full_Url = db.Column(db.Text) sim_Url = db.Column(db.Text) user_Id = db.Column(db.Text) device_Id = db.Column(db.Text) request_Method = db.Column(db.Text) event_Type = db.Column(db.Text) form_Data = db.Column(db.Text) json_Data = db.Column(db.Text) value_Data = db.Column(db.Text) status_Code = db.Column(db.Text) target_Url = db.Column(db.Text) iframe_Url = db.Column(db.Text) button_Text = db.Column(db.Text) db = pymysql.connect(host="192.168.1.202", port=13389, user="root", passwd="zh123456", db="mzinfo", autocommit=True) cursor = db.cursor() """ """ class MYDATABASE: def __init__(self, db_name=None): '''连接数据库,创建游标''' config = dict(zip(['host', 'user', 'port', 'password'], ['192.168.1.202', 'root', 13388, 'zh123456'])) config.update(database=db_name) self.connection = pymysql.connect(**config) self.cursor = self.connection.cursor() def __del__(self): self.connection.close() def create_database(self, db_name: str): '''新建数据库''' sql = f'CREATE DATABASE IF NOT EXISTS {db_name};' self.cursor.execute(sql) def create_table(self, tbl_name: str): '''新建数据库''' sql = f'CREATE TABLE IF NOT EXISTS {tbl_name};' self.cursor.execute(sql) def drop_database(self, db_name: str): '''删除数据库''' sql = f'DROP DATABASE IF EXISTS {db_name};' self.cursor.execute(sql) def drop_table(self, tbl_name: str): '''删除数据库''' sql = f'DROP TABLE IF EXISTS {tbl_name};' self.cursor.execute(sql) def query(self, sql: str): '''以数据框形式返回查询据结果''' self.cursor.execute(sql) data = self.cursor.fetchall() # 以元组形式返回查询数据 header = [t[0] for t in self.cursor.description] df = pd.DataFrame(list(data), columns=header) # pd.DataFrame 对列表具有更好的兼容性 return df def show_databases(self): '''查看服务器上的所有数据库''' sql = 'SHOW DATABASE;' return self.query(sql) def select_database(self): '''查看当前数据库''' sql = 'SELECT DATABASE();' return self.query(sql) def show_tables(self): '''查看当前数据库中所有的表''' sql = 'SHOW TABLES;' return self.query(sql) def insert_table(self, sql): try: # 执行sql语句 cursor.execute(sql) self.connection.commit() except Exception as e: # 发生错误时回滚 self.connection.rollback() mydb = MYDATABASE(db_name="mzinfo") df = mydb.query("SELECT * FROM recoder;") # print(df) df.to_json("temp.json", orient='records', lines=True, force_ascii=False) """ df = pd.read_json(path_or_buf='all_data.json', orient='records', lines=True, encoding='utf-8') # df = pd.read_json(path_or_buf='temp.json', orient='records', lines=True, encoding='utf-8') # print(df['user_Id'].unique()) # print(df.groupby('user_Id').size().sort_values()) df = df[df['user_Id'] == 'liguiwen@mz.gd.csg.cn'] # df = df.sort_values(by='record_Time') # print(df.head) # print(df.iloc[130]['current_Url']) # print(df['current_Url'].unique()) # print(df['current_Url'].str.split("?")) # print(df['value_Data'].unique()) # url = df.iloc[0]['current_Url'] # print(len(parse_url(url))) # print(df['current_Url'].apply(lambda x: parse_url(x))) """ df['current_Url'] = df['current_Url'].apply(lambda x: parse_url(x)) df['target_Url'] = df['target_Url'].apply(lambda x: parse_url(x)) df['iframe_Url'] = df['iframe_Url'].apply(lambda x: parse_url(x)) df['value_Data'] = df['current_Url'].apply(lambda x: x[-1]) """ # print(df.sample(10, random_state=42)['current_Url']) class UrlParser: def __init__(self): self.rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8') self.zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8') self.cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8') self.yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8') def process(self, row): row['current_Url'] = row['current_Url'].apply(lambda x: parse_url(x)) # print(row.iloc[0]) return self.process_record(row.iloc[0]) def process_record(self, item): i = item['current_Url'] j = item['target_Url'] user = item['user_Id'] # result = item result = None # if (item['request_Method'] == 'GET') and (not item['form_Data']): # print(0) # elif (item['request_Method'] == 'GET') and (item['form_Data'] != 'None'): # print(1, item['form_Data']) # elif (item['request_Method'] == 'POST') and (item['form_Data'] != 'None'): # print(2, item['form_Data']) # elif item['request_Method'] == 'POST' and (not item['form_Data']): # print(3) if i[1] == '10.10.21.23': result = {'domain': '人资域'} if i[5].get("appCode"): task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['record_Time'], 'user': user, 'method': item['request_Method'], 'hasform': True if item['form_Data'] else False, 'domain': '人资域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['button_Text'] if item['button_Text'] else None } elif i[1] == '10.10.21.28': result = {'domain': '资产域'} if i[5].get('appCode'): task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['record_Time'], 'user': user, 'method': item['request_Method'], 'hasform': True if item['form_Data'] else False, 'domain': '资产域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['button_Text'] if item['button_Text'] else None } elif i[1] == 'fms.gmp.cloud.hq.iv.csg': result = {'domain': '财务域'} if i[5].get('appCode'): task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''') result = { 'time': item['record_Time'], 'user': user, 'method': item['request_Method'], 'hasform': True if item['form_Data'] else False, 'domain': '财务域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], 'text': item['button_Text'] if item['button_Text'] else None } elif i[1] == '10.150.23.1:8010': result = {'domain': '营销域'} fd = item['form_Data'] if item['form_Data'] and item['form_Data'] != 'None': try: if item['form_Data'][0] != "{": item["params_Data"] = item['form_Data'] form_data = None else: item['form_Data'] = item['form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[") item['form_Data'] = item['form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}") item['form_Data'] = item['form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"") item['form_Data'] = item['form_Data'].replace("\n", "") form_data = json.loads(item['form_Data']) except Exception as e: logging.error(item['form_Data']) logging.error(fd) form_data = None raise e else: form_data = None if form_data and '_INVOKE_FUNC_TITLE_' in form_data: title = form_data['_INVOKE_FUNC_TITLE_'][0] appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1] logging.debug(appcontext) task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''') if task.empty: task = self.yxdf.query(f'''四级标题 == "{title}"''') if task.empty: task = self.yxdf.query(f'''三级标题 == "{title}"''') try: result = { 'time': item['record_Time'], 'user': user, 'method': item['request_Method'], 'hasform': True, 'domain': '营销域', '一级标题': task['一级标题'].values[0], '二级标题': task['二级标题'].values[0], '三级标题': task['三级标题'].values[0], '四级标题': task['四级标题'].values[0], 'text': item['button_Text'] if item['button_Text'] else None } except Exception as e: logging.error(task) logging.error(item['form_Data']) logging.error(title) logging.error(e) logging.error(form_data['_INVOKE_FUNC_URL_'][0]) else: result = { 'time': item['record_Time'], 'user': user, 'domain': '营销域', 'hasform': False, 'text': item['button_Text'] if item['button_Text'] else None } elif i[1] == '4a.gd.csg.local': result = { 'time': item['record_Time'], 'user': user, '域': '登录门户' } return result # parser = UrlParser() # df['current_Url'] = df['current_Url'].apply(lambda x: parse_url(x)) # # # for _, item in df.sample(1000, random_state=4).iterrows(): # for _, item in df.sample(10, random_state=5).iterrows(): # # for _ in range(5): # # for _, item in df.get_chunk(10).iterrows(): # # for _, item in item.iterrows(): # print(parser.process_record(item)) # # logging.warning(parser.process_record(item)) def main(): current = df.iloc[0]['current_Url'] target = dict() for item in df.itertuples(): if item.current_Url == current: # 当前地址没变,页面无跳转(需判断AJAX) if item.target_Url not in target: # 目标url不在临时目录 target[item.target_Url] = item.Index # 添加目标URL到临时目录 else: # 当前地址发生变化,页面跳转 current = item.current_Url # 当前地址修改 if target.get(current): # 查看是否有请求当前URL的历史记录 print(df.loc[target.get(current)]) # 查看历史记录 else: # 为找到历史记录 print("*********非正常跳转*********") # 非正常跳转 target = dict() # 清除历史记录 target[item.target_Url] = item.Index # 添加当前记录 if __name__ == '__main__': main()