Ver código fonte

second commit

xzc 1 ano atrás
pai
commit
b8dde7f56d

+ 110 - 0
README.md

@@ -0,0 +1,110 @@
+梅州电网IT终端行为采集分析
+===
+
+环境
+---
+```
+docker 23.02
+python 3.7
+rocketmq 5.1.4
+rocketmq-client-python 2.0.0
+dmPython 2.5.5
+```
+
+数据库安装
+---
+```bash
+docker load -i dm8_20230808_rev197096_x86_rh6_64_single.tar
+
+docker run -d -p 30236:5236 \
+    --name dm8_test \
+    --privileged=true \
+    -e PAGE_SIZE=16 \
+    -e LD_LIBRARY_PATH=/opt/dmdbms/bin \
+    -e EXTENT_SIZE=32 \
+    -e BLANK_PAD_MODE=1 \
+    -e LOG_SIZE=1024 \
+    -e UNICODE_FLAG=1 \
+    -e LENGTH_IN_CHAR=1 \
+    -e INSTANCE_NAME=dm8_test \
+    -v /data/dm8_test:/opt/dmdbms/data \
+    dm8_single:dm8_20230808_rev197096_x86_rh6_64
+```
+
+
+
+
+原则:
+---
+1. 浏览器插件数据传到服务器前存在缓存时间,日志记录的时间以前端记录时间为准
+
+1. 时间片最小单位为 `1s`(单个用户在一秒内操作的频率不大于一)
+2. 时间片内的多次网络请求可以合并为一个用户操作(由原则一推理得出)
+3. 用户操作不可划分(由原则二推理得出)
+4. 业务操作由多个用户操作组成
+5. 底级菜单不完全等于业务操作
+
+### 流式日志
+
+### 用户操作的域(完成)
+根据 Host 判断
+
+### 用户操作菜单(完成)
+根据 current_Url 切分判断
+根据 form_data 切分判断
+
+### 判断用户登录系统起始时间和结束时间
+如果当前 url 在 10 分钟内变化率小于 5,则判断为登出状态
+如果 tSession 变化,则判断为登出状态
+
+### 判断业务进入和完成时间
+判断当前 url 和 target_url
+
+一般页面跳转条件判断:
+1、设定全局 Url
+2、当前 current_Url 与全局 Url 不一致
+3、日志中找到最近一条 请求地址 和当前 current_Url 相同的日志
+4、该日志为页面跳转点击时间发生记录
+无法判断的情况:
+1、Ajax页面请求,URL不变
+2、请求的url和最终显示的url不完全相同(去除session,去除cookie,如何判断是否可删?)
+3、去除query不可行,页面的区分度就为query字段
+
+业务项起始条件判断:
+1、可以判断请求业务目录的起始时间(由请求参数解析),但无法判断是否开始执行具体业务
+2、期间多次页面跳转发生
+3、URL三级标题发生变化,业务目录终止
+无法判断的情况:
+1、Ajax页面的业务目录由表单体现,无法完成页面跳转判断
+2、前端插件暂时无法判断请求是否由人工点击按钮产生(判断表单是否是人工提交困难,行为路径需判断人工提交表单的标志)
+
+### 用户操作流程
+根据 业务进入和完成时间 的结果进行统计
+
+### 用户其他设备操作(暂无技术方案)
+
+
+
+```Dockerfile
+FROM python:3.8.10-alpine
+
+RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apk/repositories
+
+RUN apk add linux-headers build-base cmake automake autoconf libtool bzip2-dev zlib-dev boost-dev 
+
+ADD 2.1.0.tar.gz /workspace
+
+WORKDIR /workspace
+
+RUN ./build.sh
+```
+
+
+```Dockerfile
+FROM python:3.8.10-alpine
+RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apk/repositories
+RUN apk add build-base py3-grpcio
+```
+
+
+

+ 1 - 1
V1/db.py

@@ -7,7 +7,7 @@ class DB():
 		print(url_dict)
 		self.db = pymysql.connect(host=url_dict['host'], port=url_dict['port'], 
 		user=url_dict['user'], passwd=url_dict['passwd'], 
-		db=url_dict['db_name'],autocommit=True)
+		db=url_dict['db_name'], autocommit=True)
 
 	def __del__(self):
 		self.db.close()

Diferenças do arquivo suprimidas por serem muito extensas
+ 78 - 0
V2/all_data.json


+ 26 - 0
V2/app/__init__.py

@@ -0,0 +1,26 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-08 10:17:43
+# @Last Modified time: 2024-01-17 17:43:47
+from flask import Flask
+from config import config, Config
+
+
+
+def create_app(config_name):
+    app = Flask(__name__)
+    app.config.from_object(config[config_name])
+
+    config[config_name].init_app(app)
+
+    from . import models
+    models.db.init_app(app)
+    models.init_app(app)
+
+    from .api import api as api_blueprint
+    app.register_blueprint(api_blueprint, url_prefix='/api')
+
+    from .main import main as main_blueprint
+    app.register_blueprint(main_blueprint)
+
+    return app

+ 8 - 0
V2/app/api/__init__.py

@@ -0,0 +1,8 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-12 11:25:13
+# @Last Modified time: 2024-01-12 16:42:18
+
+from flask import Blueprint
+
+api = Blueprint('api', __name__)

+ 11 - 0
V2/app/main/__init__.py

@@ -0,0 +1,11 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-08 15:46:36
+# @Last Modified time: 2024-01-16 11:42:28
+from flask import Blueprint
+from flask_cors import CORS
+
+main = Blueprint('main', __name__)
+cors = CORS(main)
+
+from . import views

+ 177 - 0
V2/app/main/preprocess.py

@@ -0,0 +1,177 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2023-12-25 10:19:57
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-01-12 14:26:31
+import pandas as pd
+from urllib import parse
+
+all_keys = []
+
+def parse_url(url):
+    query_string = parse.urlparse(url)
+    return query_string.scheme, query_string.netloc, query_string.path, query_string.params, query_string.fragment, parse.parse_qs(query_string.query)
+
+def parse_path(url):
+    path_string = parse.urlparse(url).path
+    return path_string
+
+def parse_appCode(url):
+    query_string = parse.urlparse(url).query
+    appCode = parse.parse_qs(query_string).get('appCode')
+    if appCode:
+        return appCode[0]
+
+def parse_iframeUrl(url):
+    fragment_string = parse.urlparse(url).fragment
+    iframe_url = parse.parse_qs(fragment_string).get('url')
+    iframe_from = parse.parse_qs(fragment_string).get('from')
+    if iframe_url:
+        path = parse.urlparse(iframe_url[0]).path
+        if path and iframe_from:
+            return path, iframe_from[0]
+        elif path:
+            return path, None
+        elif iframe_from:
+            return None, iframe_from[0]
+
+def parse_appcontext(url):
+    query_string = parse.urlparse(url).query
+    appcontext = parse.parse_qs(query_string).get('appcontext')
+    if appcontext:
+        return appcontext[0]
+
+def parse_Edit(url):
+    query_string = parse.urlparse(url).query
+    isEdit = parse.parse_qs(query_string).get('isEdit')
+    if isEdit:
+        return isEdit[0]
+
+def parse_Query(url):
+    query_string = parse.urlparse(url).query
+    isQuery = parse.parse_qs(query_string).get('isQuery')
+    if isQuery:
+        return isQuery[0]
+
+def parse_editFlag(url):
+    query_string = parse.urlparse(url).query
+    editFlag = parse.parse_qs(query_string).get('editFlag')
+    if editFlag:
+        return editFlag[0]
+
+
+"""
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="基建管理应用")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="数字供应链")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="合同管理")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="安全生产")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="创新管理")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="基建智慧工程")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="并网服务管理")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="基础应用")
+
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="计划预算管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="成本管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="资金管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="核算管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="报账管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="工程财务管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="资产价值管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="物资财务管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="价格管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="税务管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="会计档案")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="共享服务")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="报表管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="综合管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="基础应用")
+
+df['tag'] = df['Unnamed: 1'].apply(lambda x: x.split()[2])
+df['url'] = df['Unnamed: 1'].apply(lambda x: x.split()[1])
+
+del df['Unnamed: 0']
+del df['Unnamed: 1']
+
+df.drop(df[df['tag'] == 'undefined'].index, inplace=True)
+
+df.to_excel("temp.xlsx", sheet_name='Sheet1')
+"""
+
+
+# df['tag'] = df['url'].apply(lambda x: x.split("/")[-1].split("=")[-1])
+
+
+"""
+df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="资产域")
+del df['域']
+df['domain'] = '资产域'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appCode'] = df['url'].apply(lambda x: parse_appCode(x))
+
+del df['url']
+
+print(df)
+
+df.to_json('资产域.json', orient='records', lines=True, force_ascii=False)
+"""
+
+
+"""
+df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="财务域")
+df['domain'] = '财务域'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appCode'] = df['url'].apply(lambda x: parse_appCode(x))
+
+del df['url']
+
+print(df)
+
+df.to_json('财务域.json', orient='records', lines=True, force_ascii=False)
+"""
+
+
+"""
+df = pd.read_excel("营销域关联字段查询表.xlsx", sheet_name="营销域")
+df['一级标题'] = df['一级标题'].apply(lambda x: x.strip('\''))
+df['二级标题'] = df['二级标题'].apply(lambda x: x.strip('\''))
+df['三级标题'] = df['三级标题'].apply(lambda x: x.strip('\''))
+df['四级标题'] = df['四级标题'].apply(lambda x: x.strip('\'') if isinstance(x, str) else None)
+df['url'] = df['url'].apply(lambda x: x.strip('\''))
+df['domain'] = '营销域'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appcontext'] = df['url'].apply(lambda x: parse_appcontext(x))
+
+df['isEdit'] = df['url'].apply(lambda x: parse_Edit(x))
+df['editFlag'] = df['url'].apply(lambda x: parse_editFlag(x))
+df['isQuery'] = df['url'].apply(lambda x: parse_Query(x))
+
+print(df)
+
+df.to_json('营销域.json', orient='records', lines=True, force_ascii=False)
+"""
+
+
+"""
+df = pd.read_excel("人资域关联字段查询表.xlsx", sheet_name="Sheet1")
+df['domain'] = '人资域'
+
+df['netloc'] = '10.10.21.23'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appCode'] = df['url'].apply(lambda x: parse_appCode(x))
+
+df['iframe'] = df['url'].apply(lambda x: parse_iframeUrl(x))
+
+del df['url']
+
+print(df)
+
+df.to_json('人资域.json', orient='records', lines=True, force_ascii=False)
+"""

+ 154 - 0
V2/app/main/views.py

@@ -0,0 +1,154 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-08 15:46:48
+# @Last Modified time: 2024-01-23 16:31:07
+import threading
+import pandas as pd
+from typing import Optional
+from datetime import datetime
+from pydantic import BaseModel
+from flask import request, jsonify
+from flask_pydantic import validate
+
+from . import main
+from .preprocess import parse_url
+
+from ..models import *
+from ..tasks import tasks
+
+
+g = dict()
+
+class RecodModel(BaseModel):
+    record_Time: datetime
+    current_Url: str
+    full_Url: Optional[str] = None
+    sim_Url: Optional[str] = None
+    user_Id: str
+    device_Id: str
+    request_Method: str
+    event_Type: str
+    form_Data: Optional[str] = None
+    json_Data: Optional[str] = None
+    value_Data: Optional[str] = None
+    status_Code: Optional[int] = None
+    target_Url: Optional[str] = None
+    iframe_Url: Optional[str] = None
+    button_Text: Optional[str] = None
+
+
+
+@main.route('/record', methods=['GET', 'POST'])
+@validate()
+def record(body: RecodModel):
+    """接收采集数据,记录临时表
+    """
+    try:
+        # --- 采集原始表 --- #
+        record = Record()
+        record.Record_Time = body.record_Time
+        record.Current_Url = body.current_Url
+        record.Full_Url = body.full_Url
+        record.User_Id = body.user_Id
+        record.Request_Method = body.request_Method
+        record.Event_Type = body.event_Type
+        record.Target_Url = body.target_Url
+        record.Button_Text = body.button_Text
+        db.session.add(record)
+        db.session.commit()
+
+        # print(body.dict())
+        tasks.predict(g, item=body.dict())
+
+        # # --- 采集总量 +1 --- #
+        # try:
+        #     tc = TotalCount.query.filter_by(Name = '采集量').one()
+        #     tc.add_one()
+        #     db.session.add(tc)
+        #     db.session.commit()
+        # except Exception as e:
+        #     db.session.rollback()
+        #     print('--->', e)
+
+        # # --- 登录人数 +1 --- #
+        # pl = PersonLogin.query.filter_by(User_Id = body.user_Id)
+        # if pl.count() == 0:
+        #     try:
+        #         db.session.add(PersonLogin(user_id = body.user_Id))
+        #         db.session.commit()
+        #     except Exception as e:
+        #         db.session.rollback()
+        #         print('--->', e)
+
+        # # --- 解析访问域 --- #
+        # df = pd.DataFrame([body.dict()])
+        # result = parser.process(df)
+        # try:
+        #     if result.get("domain") == "人资域":
+        #         PersonLogin.query.filter_by(User_Id = body.user_Id).update({"RZ_Visit": True})
+        #     elif result.get("domain") == "资产域":
+        #         PersonLogin.query.filter_by(User_Id = body.user_Id).update({"ZC_Visit": True})
+        #     elif result.get("domain") == "财务域":
+        #         PersonLogin.query.filter_by(User_Id = body.user_Id).update({"CW_Visit": True})
+        #     elif result.get("domain") == "营销域":
+        #         PersonLogin.query.filter_by(User_Id = body.user_Id).update({"YX_Visit": True})
+        #     db.session.commit()
+        # except Exception as e:
+        #     db.session.rollback()
+        #     print('--->', e)
+
+    except Exception as e:
+        db.session.rollback()
+        print('--->', e)
+        return jsonify({"status": "failed"})
+    return jsonify({"status": "success"})
+
+@main.route('/screen1', methods=['GET', 'POST'])
+def screen1():
+    # tc = db.session.execute(db.select(TotalCount.Count).where(TotalCount.Name == '采集总量')).one()
+    result = {
+        "table_1": [i.to_json() for i in db.session.query(TotalCount).filter().all()],
+        "table_2": {},
+        "table_3": {},
+        "table_4": {},
+        "table_5": [i.to_json() for i in db.session.query(PersonLogin).filter().all()],
+        "table_6": {},
+        "table_7": {}
+    }
+    return jsonify(result)
+
+@main.route('/screen2', methods=['GET', 'POST'])
+def screen2():
+    result = {
+        "table_1": {},
+        "table_2": {},
+        "table_3": {},
+        "table_4": {}
+    }
+    return jsonify({})
+
+@main.route('/screen3', methods=['GET', 'POST'])
+def screen3():
+    for item in g['fuli@mz.gd.csg.cn'].getActionByDateRange().itertuples():
+        print(f"""
+            当前时间: {item[0]}
+            起始URL:{item.cur_url[:50]}
+            终点URL:{item.tar_url[:50]}
+            """)
+    result = {
+        "table_1": {},
+        "table_2": {},
+        "table_3": {},
+        "table_4": {},
+        "table_5": {},
+        "table_6": {}
+    }
+    return jsonify({})
+
+@main.route('/show')
+def show():
+    print(g)
+    record = db.session.query(Record).filter().all()
+    # record = db.session.execute(db.select(Record).where(Record.Request_Method == 'GET')).all()
+    print(record)
+    return [r.to_json() for r in record]

+ 229 - 0
V2/app/models.py

@@ -0,0 +1,229 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-12 15:23:13
+# @Last Modified time: 2024-01-22 15:10:32
+from datetime import datetime
+import click
+from flask import current_app
+from flask.cli import with_appcontext
+from flask_sqlalchemy import SQLAlchemy
+
+
+db = SQLAlchemy()
+
+
+class Record(db.Model):
+    # 原始记录表
+    __tablename__ = 'recorder'
+    # 表的结构:
+    id             = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='记录id')
+    Record_Time    = db.Column(db.DateTime, nullable=False, doc='记录时间')
+    Current_Url    = db.Column(db.UnicodeText, nullable=False, doc='当前url')
+    Full_Url       = db.Column(db.UnicodeText, nullable=False, doc='')
+    Sim_Url        = db.Column(db.UnicodeText, nullable=True, doc='')
+    User_Id        = db.Column(db.String, nullable=False, doc='用户id')
+    Device_Id      = db.Column(db.String, nullable=True, doc='设备id')
+    Request_Method = db.Column(db.String, nullable=False, doc='请求方法')
+    Event_Type     = db.Column(db.String, nullable=True, doc='事件类型')
+    Form_Data      = db.Column(db.UnicodeText, nullable=True, doc='表单数据')
+    Json_Data      = db.Column(db.UnicodeText, nullable=True, doc='')
+    Value_Data     = db.Column(db.UnicodeText, nullable=True, doc='')
+    Status_Code    = db.Column(db.String, nullable=True, doc='状态码')
+    Target_Url     = db.Column(db.UnicodeText, nullable=False, doc='请求地址')
+    Iframe_Url     = db.Column(db.UnicodeText, nullable=True, doc='内联url')
+    Button_Text    = db.Column(db.String, nullable=True, doc='按钮文字')
+
+    def to_json(self):
+        json_data = {
+            'record_time': self.Record_Time,
+            'current_url': self.Current_Url,
+            'full_url': self.Full_Url,
+            'user_id': self.User_Id,
+            'device_id': self.Device_Id,
+            'request_method': self.Request_Method,
+            'event_type': self.Event_Type,
+            'form_data': self.Form_Data,
+            'status_code': self.Status_Code,
+            'target_url': self.Target_Url,
+            'iframe_url': self.Iframe_Url,
+            'button_text': self.Button_Text
+        }
+        return json_data
+
+
+class Fragment(db.Model):
+    # 临时表(屏一表三 | 屏一表四)
+    __tablename__ = 'fragment'
+    # 表的结构:
+    id       = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='统计id')
+    Time     = db.Column(db.DateTime, nullable=False, doc='记录时间')
+    User_Id  = db.Column(db.String, nullable=False, doc='用户id')
+    Method   = db.Column(db.String, nullable=False, doc='请求方法')
+    Has_Form = db.Column(db.Boolean, nullable=False, default=False, doc='是否有表单')
+    Dom      = db.Column(db.String, nullable=False, doc='访问域')
+    Title1   = db.Column(db.String, nullable=False, doc='一级标题')
+    Title2   = db.Column(db.String, nullable=False, doc='二级标题')
+    Title3   = db.Column(db.String, nullable=False, doc='三级标题')
+    Title4   = db.Column(db.String, nullable=True, doc='四级标题')
+    Behavior = db.Column(db.String, nullable=False, doc='行为内容')
+    Stay     = db.Column(db.Integer, nullable=False, default=0, doc='持续时间')
+
+
+class TotalCount(db.Model):
+    # 统计表(屏一表一 | 屏一表二)
+    __tablename__ = 'totalcount'
+    # 表的结构: 采集总量,偏离总量,登录人数,四域偏离量
+    id      = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='统计id')
+    Name    = db.Column(db.String,  nullable=False, unique=True, doc='统计名')
+    Count   = db.Column(db.Integer, nullable=False, default=0,   doc='统计量')
+
+    def __init__(self, name, count = 0):
+        self.Name = name
+        self.Count = count
+
+    def to_json(self):
+        json_data = {
+            self.Name: self.Count
+        }
+        return json_data
+
+    def add_one(self):
+        self.Count += 1
+
+
+class PersonLogin(db.Model):
+    # 用户登录统计表(屏二表二 | 屏一表五)
+    __tablename__ = 'personlogin'
+    # 表的结构: 用户在线时长
+    id              = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='id')
+    User_Id         = db.Column(db.String,  nullable=False, unique=True, doc='业务用户id')
+    Online_Duration = db.Column(db.Integer, nullable=False, default=0, doc='在线时长')
+    ZC_Visit        = db.Column(db.Boolean, nullable=False, default=False, doc='是否访问资产域')
+    CW_Visit        = db.Column(db.Boolean, nullable=False, default=False, doc='是否访问财务域')
+    RZ_Visit        = db.Column(db.Boolean, nullable=False, default=False, doc='是否访问人资域')
+    YX_Visit        = db.Column(db.Boolean, nullable=False, default=False, doc='是否访问财务域')
+
+    def __init__(self, user_id):
+        self.User_Id = user_id
+
+    def visit_zc(self):
+        self.ZC_Visit = True
+
+    def visit_cw(self):
+        self.CW_Visit = True
+
+    def visit_rz(self):
+        self.RZ_Visit = True
+
+    def visit_yx(self):
+        self.YX_Visit = True
+
+    def to_json(self):
+        json_data = {
+            "User_Id": self.User_Id        ,
+            "Online_Duration": self.Online_Duration,
+            "ZC_Visit": self.ZC_Visit       ,
+            "CW_Visit": self.CW_Visit       ,
+            "RZ_Visit": self.RZ_Visit       ,
+            "YX_Visit": self.YX_Visit
+        }
+        return json_data
+
+
+class Business(db.Model):
+    # 业务操作统计表(屏二表一 | 屏二表三)
+    __tablename__ = 'business'
+    # 表的结构: 用户操作数量,业务数量,业务时长,业务偏离统计量
+    id        = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='行为id')
+    User_Id   = db.Column(db.String, unique=True, nullable=False, doc='业务用户id')
+    Be_Num    = db.Column(db.Integer, nullable=False, doc='操作统计量')
+    Bu_Num    = db.Column(db.Integer, nullable=False, doc='业务统计量')
+    Time      = db.Column(db.DateTime, nullable=False, doc='业务时间')
+    Bias_Num  = db.Column(db.Integer, nullable=False, doc='业务偏离统计量')
+
+
+class TotalVisit(db.Model):
+    # 域访问量统计表
+    __tablename__ = 'totalvisit'
+    # 表的结构:
+    id    = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='域id')
+    Name  = db.Column(db.String, unique=True, nullable=False, doc='域名')
+    Count = db.Column(db.Integer, nullable=False, doc='访问量')
+
+
+class TotalBusiness(db.Model):
+    # 业务统计表
+    __tablename__ = 'totalbusiness'
+    # 表的结构:
+    id     = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='业务id')
+    Name   = db.Column(db.String, nullable=False, doc='业务名称')
+    Count  = db.Column(db.Integer, nullable=False, doc='访问量')
+
+
+class PersonTotalVisit(db.Model):
+    # 个人域访问量统计表
+    __tablename__ = 'persontotalvisit'
+    # 表的结构:
+    id      = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='域id')
+    User_Id = db.Column(db.String, nullable=False, doc='用户id')
+    Name    = db.Column(db.String, nullable=False, doc='域名')
+    Count   = db.Column(db.Integer, nullable=False, doc='访问量')
+
+
+class PersonPortrayal(db.Model):
+    # 用户画像表(屏三表四)
+    __tablename__ = 'personportrayal'
+    # 表的结构:
+    id      = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='画像id')
+    User_Id = db.Column(db.String, nullable=False, doc='用户id')
+    Attr1   = db.Column(db.String, nullable=False, doc='用户属性1')
+    Attr2   = db.Column(db.String, nullable=False, doc='用户属性2')
+    Attr3   = db.Column(db.String, nullable=False, doc='用户属性3')
+    Attr4   = db.Column(db.String, nullable=False, doc='用户属性4')
+    Attr5   = db.Column(db.String, nullable=False, doc='用户属性5')
+    Attr6   = db.Column(db.String, nullable=False, doc='用户属性6')
+    Attr7   = db.Column(db.String, nullable=False, doc='用户属性7')
+    Attr8   = db.Column(db.String, nullable=False, doc='用户属性8')
+
+
+class PersonDeparture(db.Model):
+    # 个人行为偏离统计
+    __tablename__ = 'persondeparture'
+    # 表的结构:
+    id      = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='统计id')
+    User_Id = db.Column(db.String, nullable=False, doc='用户id')
+    Count   = db.Column(db.Integer, nullable=False, doc='偏离量')
+
+
+class PersonBehavior(db.Model):
+    # 个人行为表
+    __tablename__ = 'personbehavior'
+    # 表的结构:
+    id      = db.Column(db.Integer, primary_key=True, autoincrement=True, doc='行为id')
+    User_Id = db.Column(db.String, nullable=False, unique=True, doc='行为用户id')
+    Time    = db.Column(db.DateTime, nullable=False, doc='行为时间')
+    BGroup  = db.Column(db.String, doc='行为组')
+
+
+
+@click.command('init-db')
+@with_appcontext
+def init_db_command():
+    db.create_all()
+    tc1 = TotalCount(name='采集量')
+    tc2 = TotalCount(name='发现行为偏离')
+    tc3 = TotalCount(name='资产域行为偏离')
+    tc4 = TotalCount(name='财务域行为偏离')
+    tc5 = TotalCount(name='人资域行为偏离')
+    tc6 = TotalCount(name='营销管理行为偏离')
+    db.session.add_all([tc1,tc2,tc3,tc4,tc5,tc6])
+    db.session.commit()
+
+@click.command('drop-db')
+@with_appcontext
+def drop_db_command():
+    db.drop_all()
+
+def init_app(app):
+    app.cli.add_command(init_db_command)
+    app.cli.add_command(drop_db_command)

+ 108 - 0
V2/app/tasks/UserActivateDegreeAnalyze.py

@@ -0,0 +1,108 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-22 13:47:11
+# @Last Modified time: 2024-01-23 16:46:12
+import pandas as pd
+from pprint import pprint
+
+# 用户活跃度分析
+# 第一个功能:统计指定时间范围内的访问次数最多的10个用户
+# 第二个功能:统计最近一个周期相对上一个周期访问次数增长最多的10个用户
+
+class UserActivateDegreeAnalyzeSpace(object):
+    """用户访问分析
+    接收用户创建的分析任务
+    时间范围:起始时间~结束时间
+    在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
+    执行submit,将taskid作为参数传递给submit
+    """
+    def __init__(self, user: str):
+        self.user = user
+        self.pool_size = 10
+        self.DataStream = pd.DataFrame(columns=['time', 'cur_url', 'tar_url', 'dev_id', 'req_met', 'eve_typ', 'for_dat', 'sta_cod', 'ifr_url', 'but_txt'])
+
+    def push(self, item: dict) -> None:
+        self.DataStream = self.DataStream.append({
+                "time": item['Record_Time'],
+                "cur_url": item['Current_Url'],
+                "tar_url": item['Target_Url'],
+            },
+            ignore_index=True)
+
+    def getActionByDateRange(self) -> pd.DataFrame:
+        """获取指定日期范围内的用户访问行为数据
+        """
+        current = self.DataStream.iloc[0]['cur_url']
+        target = dict()
+        result = pd.DataFrame()
+        # 拷贝数据,清除数据
+        CopyDataStream = self.DataStream.copy(deep=True)
+        self.DataStream.drop(self.DataStream.index, inplace=True)
+        # 设置时间格式
+        CopyDataStream['time'] = pd.to_datetime(CopyDataStream['time'])
+        # 时间排序
+        CopyDataStream = CopyDataStream.set_index('time')
+        # 遍历数据
+        for idx, item in enumerate(CopyDataStream.itertuples()):
+            if item.cur_url == current:                     # 当前地址没变,页面无跳转(需判断AJAX)
+                if item.tar_url not in target:              # 目标url不在临时目录
+                    target[item.tar_url] = idx              # 添加目标URL到临时目录
+            else:                                           # 当前地址发生变化,页面跳转
+                current = item.cur_url                      # 当前地址修改
+                if target.get(current):                     # 查看是否有请求当前URL的历史记录
+                    print(target.get(current))
+                    # pprint({
+                    #     "源地址": CopyDataStream.loc[target.get(current)]['cur_url'],
+                    #     "跳转地": CopyDataStream.loc[target.get(current)]['tar_url']
+                    #     })      # 查看历史记录
+                    result = result.append([CopyDataStream.iloc[target.get(current)]])
+                else:                                       # 为找到历史记录
+                    print("*********非正常跳转*********")    # 非正常跳转
+                target = dict()                             # 清除历史记录
+                target[item.tar_url] = idx                  # 添加当前记录
+        return result
+
+    def getSession2Action(self):
+        """获取sessionid到访问行为数据的映射的
+        """
+        pass
+
+    def aggregateBySession(self):
+        """对行为数据按session粒度进行聚合,并将用户信息join后聚合
+        """
+        pass
+
+    def filterSessionAndAggrStat(self):
+        """过滤session数据,并进行计数器值累加
+        """
+        pass
+
+    def getSession2detail(self):
+        """获取通过筛选条件的session的访问明细数据
+        """
+        pass
+
+    def getTopKCategory(self):
+        """获取topK热门分类
+        第一步:获取符合条件的访问过的所有品类
+        第二步:计算各品类的点击的次数
+        第三步:自定义二次排序key
+        第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
+        第五步:取出top10热门品类,并写入DM
+        """
+        pass
+
+    def getClickCategory2Count(self):
+        """获取各分类点击次数
+        1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
+        """
+        pass
+
+    def getTopKSession(self):
+        """获取每个分类top10活跃用户
+        1、将topK热门分类的id,生成DF
+        2、计算topK分类被各用户访问的次数
+        3、分组取TopK算法实现,获取每个分类的topK活跃用户
+        4、获取topK活跃用户的明细数据,并写入DM
+        """
+        pass

+ 0 - 0
V2/app/tasks/__init__.py


+ 177 - 0
V2/app/tasks/preprocess.py

@@ -0,0 +1,177 @@
+# -*- coding: utf-8 -*-
+# @Author: privacy
+# @Date:   2023-12-25 10:19:57
+# @Last Modified by:   privacy
+# @Last Modified time: 2024-01-12 14:26:31
+import pandas as pd
+from urllib import parse
+
+all_keys = []
+
+def parse_url(url):
+    query_string = parse.urlparse(url)
+    return query_string.scheme, query_string.netloc, query_string.path, query_string.params, query_string.fragment, parse.parse_qs(query_string.query)
+
+def parse_path(url):
+    path_string = parse.urlparse(url).path
+    return path_string
+
+def parse_appCode(url):
+    query_string = parse.urlparse(url).query
+    appCode = parse.parse_qs(query_string).get('appCode')
+    if appCode:
+        return appCode[0]
+
+def parse_iframeUrl(url):
+    fragment_string = parse.urlparse(url).fragment
+    iframe_url = parse.parse_qs(fragment_string).get('url')
+    iframe_from = parse.parse_qs(fragment_string).get('from')
+    if iframe_url:
+        path = parse.urlparse(iframe_url[0]).path
+        if path and iframe_from:
+            return path, iframe_from[0]
+        elif path:
+            return path, None
+        elif iframe_from:
+            return None, iframe_from[0]
+
+def parse_appcontext(url):
+    query_string = parse.urlparse(url).query
+    appcontext = parse.parse_qs(query_string).get('appcontext')
+    if appcontext:
+        return appcontext[0]
+
+def parse_Edit(url):
+    query_string = parse.urlparse(url).query
+    isEdit = parse.parse_qs(query_string).get('isEdit')
+    if isEdit:
+        return isEdit[0]
+
+def parse_Query(url):
+    query_string = parse.urlparse(url).query
+    isQuery = parse.parse_qs(query_string).get('isQuery')
+    if isQuery:
+        return isQuery[0]
+
+def parse_editFlag(url):
+    query_string = parse.urlparse(url).query
+    editFlag = parse.parse_qs(query_string).get('editFlag')
+    if editFlag:
+        return editFlag[0]
+
+
+"""
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="基建管理应用")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="数字供应链")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="合同管理")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="安全生产")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="创新管理")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="基建智慧工程")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="并网服务管理")
+# df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="基础应用")
+
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="计划预算管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="成本管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="资金管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="核算管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="报账管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="工程财务管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="资产价值管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="物资财务管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="价格管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="税务管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="会计档案")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="共享服务")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="报表管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="综合管理")
+# df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="基础应用")
+
+df['tag'] = df['Unnamed: 1'].apply(lambda x: x.split()[2])
+df['url'] = df['Unnamed: 1'].apply(lambda x: x.split()[1])
+
+del df['Unnamed: 0']
+del df['Unnamed: 1']
+
+df.drop(df[df['tag'] == 'undefined'].index, inplace=True)
+
+df.to_excel("temp.xlsx", sheet_name='Sheet1')
+"""
+
+
+# df['tag'] = df['url'].apply(lambda x: x.split("/")[-1].split("=")[-1])
+
+
+"""
+df = pd.read_excel("资产域关联字段查询表.xlsx", sheet_name="资产域")
+del df['域']
+df['domain'] = '资产域'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appCode'] = df['url'].apply(lambda x: parse_appCode(x))
+
+del df['url']
+
+print(df)
+
+df.to_json('资产域.json', orient='records', lines=True, force_ascii=False)
+"""
+
+
+"""
+df = pd.read_excel("财务域关联字段查询表.xlsx", sheet_name="财务域")
+df['domain'] = '财务域'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appCode'] = df['url'].apply(lambda x: parse_appCode(x))
+
+del df['url']
+
+print(df)
+
+df.to_json('财务域.json', orient='records', lines=True, force_ascii=False)
+"""
+
+
+"""
+df = pd.read_excel("营销域关联字段查询表.xlsx", sheet_name="营销域")
+df['一级标题'] = df['一级标题'].apply(lambda x: x.strip('\''))
+df['二级标题'] = df['二级标题'].apply(lambda x: x.strip('\''))
+df['三级标题'] = df['三级标题'].apply(lambda x: x.strip('\''))
+df['四级标题'] = df['四级标题'].apply(lambda x: x.strip('\'') if isinstance(x, str) else None)
+df['url'] = df['url'].apply(lambda x: x.strip('\''))
+df['domain'] = '营销域'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appcontext'] = df['url'].apply(lambda x: parse_appcontext(x))
+
+df['isEdit'] = df['url'].apply(lambda x: parse_Edit(x))
+df['editFlag'] = df['url'].apply(lambda x: parse_editFlag(x))
+df['isQuery'] = df['url'].apply(lambda x: parse_Query(x))
+
+print(df)
+
+df.to_json('营销域.json', orient='records', lines=True, force_ascii=False)
+"""
+
+
+"""
+df = pd.read_excel("人资域关联字段查询表.xlsx", sheet_name="Sheet1")
+df['domain'] = '人资域'
+
+df['netloc'] = '10.10.21.23'
+
+df['path'] = df['url'].apply(lambda x: parse_path(x))
+
+df['appCode'] = df['url'].apply(lambda x: parse_appCode(x))
+
+df['iframe'] = df['url'].apply(lambda x: parse_iframeUrl(x))
+
+del df['url']
+
+print(df)
+
+df.to_json('人资域.json', orient='records', lines=True, force_ascii=False)
+"""

+ 557 - 0
V2/app/tasks/tasks.py

@@ -0,0 +1,557 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-18 09:47:28
+# @Last Modified time: 2024-01-23 11:53:14
+
+import re
+import datetime
+import dmPython
+import pandas as pd
+from pandas import DataFrame
+from threading import Thread
+from .preprocess import parse_url
+from .UserActivateDegreeAnalyze import UserActivateDegreeAnalyzeSpace
+
+def func(item):
+    print(item)
+    session = DM8Session()
+    try:
+        result = session.insert("""INSERT INTO FRAGMENT("Time", "User_Id", "Method", "Has_Form", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?,?,?,?)""",
+                        (item['time'],item['user'],item['method'],item['hasform'],item['domain'],item['一级标题'],item['二级标题'],item['三级标题'],item['四级标题'],item['text'], item['stay']))
+    except Exception as e:
+        print(e)
+
+
+# class filteredRealTimeLogDStream(object):
+#     time: str
+#     user_id: str
+#     method: str
+#     hasform: bool
+#     domain: str
+#     title1: str
+#     title2: str
+#     title3: str
+#     title4: str
+#     behavior: str
+#     stay: int = 0
+
+
+class DM8Session(object):
+    def __init__(self):
+        self.conn = dmPython.connect(user='SYSDBA', password='SYSDBA001', server='192.168.1.202', port=30236)
+
+    def query(self, sql_cmd: str, args=None):
+        '''以数据框形式返回查询据结果'''
+        try:
+            self.cursor = self.conn.cursor()
+            if args:
+                self.cursor.execute(sql_cmd, args)
+            else:
+                self.cursor.execute(sql_cmd)
+            data = self.cursor.fetchall()  # 以元组形式返回查询数据
+            header = [t[0] for t in self.cursor.description]
+            df = pd.DataFrame(list(data), columns=header)  # pd.DataFrame 对列表具有更好的兼容性
+        except Exception as e:
+            print(e)
+        finally:
+            self.cursor.close()
+        return df
+
+    def insert(self, sql_cmd: str, args):
+        try:
+            self.cursor = self.conn.cursor()
+            # 执行sql语句
+            self.cursor.execute(sql_cmd, args)
+            self.conn.commit()
+            status = True
+        except Exception as e:
+            # 发生错误时回滚
+            self.conn.rollback()
+            print(e)
+            status = False
+        finally:
+            self.cursor.close()
+        return status
+
+    def update(self, sql_cmd: str, args):
+        try:
+            self.cursor = self.conn.cursor()
+            self.cursor.execute(sql_cmd, args)
+            self.conn.commit()
+            status = True
+        except Exception as e:
+            self.conn.rollback()
+            print(e)
+            status = False
+        finally:
+            self.cursor.close()
+        return status
+
+    def __del__(self):
+        self.conn.close()
+
+
+class UrlParser(object):
+    def __init__(self):
+        self.rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8')
+        self.zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8')
+        self.cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8')
+        self.yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8')
+
+    def process(self, row: dict):
+        # row['Current_Url'] = row['Current_Url'].apply(lambda x: (x))
+        row['Current_Url'] = parse_url(row['Current_Url'])
+        row['Target_Url'] = parse_url(row['Target_Url'])
+        # return self.process_record(row.iloc[0])
+        return self.process_record(row)
+
+    def process_record(self, item: dict):
+        i = item['Current_Url']
+        j = item['Target_Url']
+
+        # result = item
+        result = None
+
+        if i[1] == '10.10.21.23':
+            result = {'domain': '人资域'}
+            if i[5].get("appCode"):
+                task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
+                result = {
+                    'time': item['Record_Time'],
+                    'user': item['User_Id'],
+                    'method': item['Request_Method'],
+                    'hasform': True if item['Form_Data'] else False,
+                    'domain': '人资域',
+                    '一级标题': task['一级标题'].values[0],
+                    '二级标题': task['二级标题'].values[0],
+                    '三级标题': task['三级标题'].values[0],
+                    'text': item['Button_Text'] if item['Button_Text'] else None
+                }
+        elif i[1] == '10.10.21.28':
+            result = {'domain': '资产域'}
+            if i[5].get('appCode'):
+                task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
+                result = {
+                    'time': item['Record_Time'],
+                    'user': item['User_Id'],
+                    'method': item['Request_Method'],
+                    'hasform': True if item['Form_Data'] else False,
+                    'domain': '资产域',
+                    '一级标题': task['一级标题'].values[0],
+                    '二级标题': task['二级标题'].values[0],
+                    '三级标题': task['三级标题'].values[0],
+                    'text': item['Button_Text'] if item['Button_Text'] else None
+                }
+        elif i[1] == 'fms.gmp.cloud.hq.iv.csg':
+            result = {'domain': '财务域'}
+            if i[5].get('appCode'):
+                task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
+                result = {
+                    'time': item['Record_Time'],
+                    'user': item['User_Id'],
+                    'method': item['Request_Method'],
+                    'hasform': True if item['Form_Data'] else False,
+                    'domain': '财务域',
+                    '一级标题': task['一级标题'].values[0],
+                    '二级标题': task['二级标题'].values[0],
+                    '三级标题': task['三级标题'].values[0],
+                    'text': item['Button_Text'] if item['Button_Text'] else None
+                }
+        elif i[1] == '10.150.23.1:8010':
+            result = {'domain': '营销域'}
+            fd = item['Form_Data']
+            if item['Form_Data'] and item['Form_Data'] != 'None':
+                try:
+                    if item['Form_Data'][0] != "{":
+                        item["Params_Data"] = item['Form_Data']
+                        form_data = None
+                    else:
+                        item['Form_Data'] = item['Form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[")
+                        item['Form_Data'] = item['Form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}")
+                        item['Form_Data'] = item['Form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"")
+                        item['Form_Data'] = item['Form_Data'].replace("\n", "")
+                        form_data = json.loads(item['Form_Data'])
+                except Exception as e:
+                    logging.error(item['Form_Data'])
+                    logging.error(fd)
+                    form_data = None
+                    raise e
+            else:
+                form_data = None
+
+            if form_data and '_INVOKE_FUNC_TITLE_' in form_data:
+                title = form_data['_INVOKE_FUNC_TITLE_'][0]
+                appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1]
+                logging.debug(appcontext)
+                task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''')
+                if task.empty:
+                    task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''')
+                if task.empty:
+                    task = self.yxdf.query(f'''四级标题 == "{title}"''')
+                if task.empty:
+                    task = self.yxdf.query(f'''三级标题 == "{title}"''')
+                try:
+                    result = {
+                        'time':    item['Record_Time'],
+                        'user':    item['User_Id'],
+                        'method':  item['Request_Method'],
+                        'hasform': True,
+                        'domain': '营销域',
+                        '一级标题': task['一级标题'].values[0],
+                        '二级标题': task['二级标题'].values[0],
+                        '三级标题': task['三级标题'].values[0],
+                        '四级标题': task['四级标题'].values[0],
+                        'text':    item['Button_Text'] if item['Button_Text'] else None
+                    }
+                except Exception as e:
+                    logging.error(task)
+                    logging.error(item['Form_Data'])
+                    logging.error(title)
+                    logging.error(e)
+                    logging.error(form_data['_INVOKE_FUNC_URL_'][0])
+            else:
+                result = {
+                    'time': item['Record_Time'],
+                    'user': item['User_Id'],
+                    'domain': '营销域',
+                    'hasform': False,
+                    'text': item['Button_Text'] if item['Button_Text'] else None
+                }
+        elif i[1] == '4a.gd.csg.local':
+            result = {
+                'time': item['Record_Time'],
+                'user': item['User_Id'],
+                '域': '登录门户'
+            }
+        return result
+
+
+class UserVisitAnalyzeSpace(object):
+    """用户访问分析
+    接收用户创建的分析任务
+    时间范围:起始时间~结束时间
+    在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
+    执行submit,将taskid作为参数传递给submit
+    """
+    def __init__(self, user: str):
+        self.user = user
+        self.pool_size = 10
+        self.start_point = 0
+        self.end_point = 0
+        # 一级缓存
+        self.cache_item = dict()
+        # 二级缓存数据
+        self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay'])
+
+    def __repr__(self):
+        return f'<User: {self.user} DF: {self.df_queue}>'
+
+    def push(self, item: dict):
+        # 消息进队列
+        self.df_queue.loc[self.end_point] = item
+        self.end_point = (self.end_point + 1) % self.pool_size
+
+    def pop(self):
+        # 消息出队列
+        item = self.df_queue.loc[self.start_point]
+        self.start_point = (self.start_point + 1) % self.pool_size
+        return item
+
+    def send(self, item: dict):
+        # 当前缓存无数据
+        if not self.cache_item:
+            self.cache_item = item
+        else:
+            # 判断缓存内数据和新数据哪个更新
+            if item['time'] > self.cache_item['time']:
+                # 当前数据较新,计算页面持续时间
+                delay = (item['time'] - self.cache_item['time']).seconds
+                if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']):
+                    self.cache_item['stay'] += delay
+                else:
+                    self.push(self.cache_item)
+                    self.cache_item = item
+                    t = Thread(target=func, args=(self.pop(),))
+                    t.start()
+            else:
+                # 老数据较新,数据发送产生异常,数据产生积压
+                pass
+
+
+        # self.df = pd.concat([self.df, pd.DataFrame([item])])
+        # if self.df.shape[0] >= self.pool_size:
+        #     # 新建临时表
+        #     dt = self.df[0:1]
+        #     index = 0
+        #     for i in range(1, self.pool_size):
+        #         d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i])
+        #         if isinstance(d2, pd.DataFrame):
+        #             dt.iloc[index] = d1
+        #             index += 1
+        #             dt.iloc[index] = d2
+        #             session = DM8Session()
+        #             result = session.insert("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""",
+        #                     args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay']))
+        #         else:
+        #             dt.iloc[index] = d1
+        #     self.df = pd.concat([dt, self.df[self.pool_size:]])
+
+    def getActionByDateRange(self):
+        """获取指定日期范围内的用户访问行为数据
+        """
+        pass
+
+    def getSession2Action(self) -> DataFrame:
+        """获取sessionid到访问行为数据的映射的
+        """
+        pass
+
+    def aggregateBySession(self):
+        """对行为数据按session粒度进行聚合,并将用户信息join后聚合
+        """
+        pass
+
+    def filterSsessionAndAggrStat(self):
+        """过滤session数据,并进行计数器值累加
+        """
+        pass
+
+    def getSession2detail(self):
+        """获取通过筛选条件的session的访问明细数据
+        """
+        pass
+
+    def getTopKCategory(self):
+        """获取topK热门分类
+        第一步:获取符合条件的访问过的所有品类
+        第二步:计算各品类的点击的次数
+        第三步:自定义二次排序key
+        第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
+        第五步:取出top10热门品类,并写入DM
+        """
+        pass
+
+    def getClickCategory2Count(self):# -> DataFrame
+        """获取各分类点击次数
+        1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
+        """
+        pass
+
+    def getTopKSession(self):
+        """获取每个分类top10活跃用户
+        1、将topK热门分类的id,生成DF
+        2、计算topK分类被各用户访问的次数
+        3、分组取TopK算法实现,获取每个分类的topK活跃用户
+        4、获取topK活跃用户的明细数据,并写入DM
+        """
+        pass
+
+    # --- 池化 --- #
+    def pooling(self, line1: dict, line2: dict):
+        if line1['time'] > line2['time']:
+            deltat = (line1['time'] - line2['time']).seconds
+            if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
+                line2['stay'] += deltat
+                return line2, None
+            else:
+                return line2, line1
+        else:
+            deltat = (line2['time'] - line1['time']).seconds
+            if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
+                line1['stay'] += deltat
+                return line1, None
+            else:
+                return line1, line2 
+
+
+class BusinessClickRealTimeSpace(object):
+    def __init__(self):
+        self.aggregatedDStream = pd.DataFrame(columns=['word', 'count'])
+
+    def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame:
+        """计算业务访问流量实时统计
+        计算每天业务的访问量
+        设计维度:日期时间、用户、业务、访问次数
+        可以看到当天所有的实时数据,
+        通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份
+        """
+
+        """
+        我们要对原始数据进行map,映射成<date_user_domain,1>格式
+        然后,对上述格式的数据,执行updateStateByKey算子
+        """
+        def _filter(data: dict) -> tuple:
+            return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1}
+
+        mappedDStream = _filter(sourceDStream)
+
+        self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True)
+
+        self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count")
+        # 将计算出来的结果同步一份到DM
+        session = DM8Session()
+        for row in self.aggregatedDStream.itertuples():
+            try:
+                result = session.insert("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count))
+            except Exception as e:
+                print(e)
+        return self.aggregatedDStream
+
+    def calculateTopKBusiness(self):
+        """计算每天用户的TopK访问业务
+        计算出每天各业务的点击量
+        """
+        pass
+
+    def calculateBusinessCountByWindow(self):
+        """计算最近1小时滑动窗口内的业务访问趋势
+        """
+        pass
+
+
+# class PageOneStepSpace(object):
+#     def __init__(self):
+#         pass
+
+#     def getSession2Action(self):
+#         """获取<session,用户访问行为>格式的数据
+#         """
+#         pass
+
+#     def persistConvertRate(self):
+#         """持久转化率
+#         """
+#         pass
+
+
+def capital_to_higher(dict_info):
+    new_dict = {}
+    for i, j in dict_info.items():
+        new_dict[re.sub("([a-zA-Z])", lambda x: x.groups()[0].upper(), i, 1)] = j
+    return new_dict
+
+
+def predict(g: dict, item: dict):
+    item = capital_to_higher(item)
+    if g.get(item['User_Id']):
+        g[item['User_Id']].push(item)
+    else:
+        g[item['User_Id']] = UserActivateDegreeAnalyzeSpace(item['User_Id'])
+        g[item['User_Id']].push(item)
+
+
+    # session = DM8Session()
+
+    # # 添加采集量统计
+    # try:
+    #     df = session.query("""SELECT "ID", "Count" FROM TOTALCOUNT WHERE "Name" = ?""", args="采集量")
+    #     if df.shape[0] == 1:
+    #         result = session.update("""UPDATE TOTALCOUNT SET "Count"=? WHERE TOTALCOUNT.ID = ?""", (int(df['Count'])+1, int(df['ID'])))
+    # except Exception as e:
+    #     print(e)
+
+    # # URL预处理存储
+    # parser = UrlParser()
+    # frag = parser.process(item)
+    # frag['stay'] = 0
+
+    # 添加序列
+    # if g.get(item['User_Id']):
+    #     # g[item['User_Id']].push(item)
+    #     g[item['User_Id']].send(frag)
+    # else:
+    #     g[item['User_Id']] = UserSpace(item['User_Id'])
+    #     # g[item['User_Id']].push(item)
+    #     g[item['User_Id']].send(frag)
+
+    # if g.get('BusinessClickRealTimeSpace'):
+    #     result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
+    # else:
+    #     g['BusinessClickRealTimeSpace'] = BusinessClickRealTimeSpace()
+    #     result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
+    # print(result)
+
+
+
+"""
+from sqlalchemy import Column, Integer, String, Date, Numeric, Text
+from sqlalchemy.ext.declarative import declarative_base
+
+# 创建对象的基类:
+Base = declarative_base()
+
+class Product(Base):
+    # 表的名字:
+    __tablename__ = 'product'
+    # 表的结构:
+    PRODUCTID = Column(Integer,autoincrement=True, primary_key=True)
+
+
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+from Product import Base, Product
+
+
+def fun_select_all(DBSession):
+    # 创建Session
+    session = DBSession()
+    # 查询所有的
+    list_product = session.query(Product).all()
+    print('查询所有结果:')
+    for product in list_product:
+        print(product.NAME)# , product.AUTHOR, product.PUBLISHER )
+    print('')
+    session.close()
+
+def fun_insert(DBSession):
+    # 创建Session
+    session = DBSession()
+    new_product = Product()
+    new_product.NAME = '水浒传'
+    session.add(new_product)
+    session.commit()
+    print('插入成功')
+    session.close()
+
+def fun_update(DBSession):
+    # 创建Session
+    session = DBSession()
+    product = session.query(Product).filter(Product.NAME == '水浒传').one()
+    product.NAME = '水浒'
+    session.commit()
+    print('更新成功')
+    session.close()
+
+def fun_delete(DBSession):
+    # 创建Session
+    session = DBSession()
+    session.query(Product).filter(Product.NAME == '水浒').delete()
+    session.commit()
+    print('删除成功')
+    session.close()
+
+
+def main():
+    #dialect 是SQLAlchemy用来与各种类型的DBAPI和数据库通信的系统。
+    conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236'
+    #创建Engine对象
+    engine = create_engine(conn_url)
+    #创建DBSession对象
+    DBSession = sessionmaker(bind=engine)
+
+    Base.metadata.create_all(engine) # 创建表结构
+
+    fun_select_all(DBSession)
+    # # 插入
+    fun_insert(DBSession)
+    fun_select_all(DBSession)
+    # # 插入
+    fun_insert1(DBSession)
+    fun_select_all(DBSession)
+    # # 更新
+    fun_update(DBSession)
+    fun_select_all(DBSession)
+    # # 删除
+    fun_delete(DBSession)
+    fun_select_all(DBSession)
+"""

+ 46 - 0
V2/config.py

@@ -0,0 +1,46 @@
+import os
+basedir = os.path.abspath(os.path.dirname(__file__))
+
+
+class Config:
+    SECRET_KEY = os.environ.get('SECRET_KEY') or '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef'
+    SSL_DISABLE = False
+    SQLALCHEMY_RECORD_QUERIES = True
+    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
+    # SQLALCHEMY_ECHO = True
+
+    FLASKY_ADMIN = os.environ.get('FLASKY_ADMIN')
+    FLASKY_POSTS_PER_PAGE = 20
+    FLASKY_COMMENTS_PER_PAGE = 30
+    FLASKY_FOLLOWERS_PER_PAGE = 50
+    FLASKY_SLOW_DB_QUERY_TIME = 0.5
+
+    @staticmethod
+    def init_app(app):
+        pass
+
+
+class DevelopmentConfig(Config):
+    DEBUG = True
+    SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or 'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')
+
+
+class TestingConfig(Config):
+    TESTING = True
+    SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or 'sqlite:///' + os.path.join(basedir, 'data-test.sqlite')
+
+
+class ProductionConfig(Config):
+    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
+
+    @classmethod
+    def init_app(cls, app):
+        Config.init_app(app)
+
+
+config = {
+	'development': DevelopmentConfig,
+	'testing': TestingConfig,
+	'production': ProductionConfig,
+    'default': DevelopmentConfig
+}

+ 91 - 0
V2/downdata.py

@@ -0,0 +1,91 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-12 13:25:10
+# @Last Modified time: 2024-01-23 16:38:29
+import pymysql
+import pandas as pd
+
+class MYDATABASE:
+    def __init__(self, db_name=None):
+        '''连接数据库,创建游标'''
+        config = dict(zip(['host', 'user', 'port', 'password'],
+                          ['192.168.1.202', 'root', 13388, 'zh123456']))
+        config.update(database=db_name)
+        self.connection = pymysql.connect(**config)
+        self.cursor = self.connection.cursor()
+
+    def __del__(self):
+        self.connection.close()
+
+    def create_database(self, db_name: str):
+        '''新建数据库'''
+        sql = f'CREATE DATABASE IF NOT EXISTS {db_name};'
+        self.cursor.execute(sql)
+
+    def create_table(self, tbl_name: str):
+        '''新建数据库'''
+        sql = f'CREATE TABLE IF NOT EXISTS {tbl_name};'
+        self.cursor.execute(sql)
+
+    def drop_database(self, db_name: str):
+        '''删除数据库'''
+        sql = f'DROP DATABASE IF EXISTS {db_name};'
+        self.cursor.execute(sql)
+
+    def drop_table(self, tbl_name: str):
+        '''删除数据库'''
+        sql = f'DROP TABLE IF EXISTS {tbl_name};'
+        self.cursor.execute(sql)
+
+    def query(self, sql: str):
+        '''以数据框形式返回查询据结果'''
+        self.cursor.execute(sql)
+        data = self.cursor.fetchall()  # 以元组形式返回查询数据
+        header = [t[0] for t in self.cursor.description]
+        df = pd.DataFrame(list(data), columns=header)  # pd.DataFrame 对列表具有更好的兼容性
+        return df
+
+    def show_databases(self):
+        '''查看服务器上的所有数据库'''
+        sql = 'SHOW DATABASE;'
+        return self.query(sql)
+
+    def select_database(self):
+        '''查看当前数据库'''
+        sql = 'SELECT DATABASE();'
+        return self.query(sql)
+
+    def show_tables(self):
+        '''查看当前数据库中所有的表'''
+        sql = 'SHOW TABLES;'
+        return self.query(sql)
+
+    def insert_table(self, sql):
+        try:
+            # 执行sql语句
+            cursor.execute(sql)
+            self.connection.commit()
+        except Exception as e:
+            # 发生错误时回滚
+            self.connection.rollback()
+
+
+# mydb = MYDATABASE(db_name="mzinfo")
+
+# df = mydb.query("SELECT * FROM recoder;")
+
+# print(df)
+
+# from sqlalchemy import create_engine
+# connect_info = 'mysql+pymysql://{}:{}@{}:{}?charset=utf8'
+# engine = create_engine(connect_info)
+
+sql_cmd = "SELECT record_Time, current_Url, user_Id, request_Method, event_Type, form_Data, target_Url, iframe_Url, button_Text FROM recoder WHERE user_Id = 'fuli@mz.gd.csg.cn' LIMIT 1000;"
+# df = pd.read_sql(sql_cmd, con=engine, index_col="record_Time")
+
+con = pymysql.connect(host='192.168.1.202', user='root', port=13388,  password='zh123456', database='mzinfo', charset='utf8', use_unicode=True)
+df = pd.read_sql(sql_cmd, con)
+df['record_Time'] = pd.to_datetime(df['record_Time'])
+df = df.set_index('record_Time')
+print(df.head)
+df.to_json("fuli_data.json", orient='records', lines=True, force_ascii=False)

Diferenças do arquivo suprimidas por serem muito extensas
+ 0 - 0
V2/fuli_data.json


+ 66 - 0
V2/manage.py

@@ -0,0 +1,66 @@
+import os
+COV = None
+if os.environ.get('FLASK_COVERAGE'):
+    import coverage
+    COV = coverage.coverage(branch=True, include='app/*')
+    COV.start()
+
+if os.path.exists('.env'):
+    print('Importing environment from .env...')
+    for line in open('.env'):
+        var = line.strip().split('=')
+        if len(var) == 2:
+            os.environ[var[0]] = var[1]
+
+from app import create_app, db
+from app.models import *
+from flask_script import Manager, Shell
+from flask_migrate import Migrate, MigrateCommand
+
+app = create_app(os.getenv('FLASK_CONFIG') or 'default')
+manager = Manager(app)
+migrate = Migrate(app, db)
+
+def make_shell_context():
+    return dict(app=app, db=db)
+
+manager.add_command('shell', Shell(make_context=make_shell_context))
+manager.add_command('db', MigrateCommand)
+
+@manager.command
+def test(coverage=False):
+    if coverage and not os.environ.get('FLASK_COVERAGE'):
+        import sys
+        os.environ['FLASK_COVERAGE'] = '1'
+        os.execvp(sys.executable, [sys.executable] + sys.argv)
+    import unittest
+    tests = unittest.TestLoader().discover('test')
+    unittest.TextTestRunner(verbosity=2).run(tests)
+    if COV:
+        COV.stop()
+        COV.save()
+        print('Coverage Summary:')
+        COV.report()
+        basedir = os.path.abspath(os.path.dirname(__file__))
+        covdir = os.path.join(basedir, 'tmp/coverage')
+        COV.html_report(directory=covdir)
+        print(f'HTML version: file://{covdir}/index.html')
+        COV.erase()
+
+@manager.command
+def profile(length=25, profile_dir=None):
+    from werkzeug.contrib.profiler import ProfilerMiddleware
+    app.wsgi_app = ProfilerMiddleware(app.wsgi_app, restrictions=[length],
+                                      profile_dir=profile_dir)
+    app.run()
+
+@manager.command
+def deploy():
+    from flask.ext.migrate import upgrade
+    from app.models import Role, User
+    upgrade()
+    Role.insert_roles()
+    User.add_self_follows()
+
+if __name__ == '__main__':
+    manager.run()

+ 355 - 0
V2/record_parser.py

@@ -0,0 +1,355 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:     2024-01-04 11:57:15
+# @Last Modified time: 2024-01-23 11:09:55
+from datetime import datetime
+import json
+import urllib
+# import pymysql
+import pandas as pd
+
+# pd.set_option('display.width', 1000)
+# pd.set_option('display.max_rows', None)
+# pd.set_option('display.max_columns', None)
+
+pd.set_option('max_colwidth', 1000)
+
+import logging
+
+from app.main.preprocess import parse_url
+
+"""
+rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8')
+
+zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8')
+
+cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8')
+
+yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8')
+"""
+
+"""
+from flask.ext.sqlalchemy import SQLAlchemy
+
+db = SQLAlchemy()
+
+class Recorder(db.Model):
+    __tablename__ = 'recoder'
+    record_Time = db.Column(db.Time)
+    current_Url = db.Column(db.Text)
+    full_Url    = db.Column(db.Text)
+    sim_Url     = db.Column(db.Text)
+    user_Id     = db.Column(db.Text)
+    device_Id   = db.Column(db.Text)
+    request_Method = db.Column(db.Text)
+    event_Type  = db.Column(db.Text)
+    form_Data   = db.Column(db.Text)
+    json_Data   = db.Column(db.Text)
+    value_Data  = db.Column(db.Text)
+    status_Code = db.Column(db.Text)
+    target_Url  = db.Column(db.Text)
+    iframe_Url  = db.Column(db.Text)
+    button_Text = db.Column(db.Text)
+
+
+db = pymysql.connect(host="192.168.1.202", port=13389, 
+        user="root", passwd="zh123456", 
+        db="mzinfo", autocommit=True)
+
+cursor = db.cursor()
+"""
+
+
+"""
+class MYDATABASE:
+    def __init__(self, db_name=None):
+        '''连接数据库,创建游标'''
+        config = dict(zip(['host', 'user', 'port', 'password'],
+                          ['192.168.1.202', 'root', 13388, 'zh123456']))
+        config.update(database=db_name)
+        self.connection = pymysql.connect(**config)
+        self.cursor = self.connection.cursor()
+
+    def __del__(self):
+        self.connection.close()
+
+    def create_database(self, db_name: str):
+        '''新建数据库'''
+        sql = f'CREATE DATABASE IF NOT EXISTS {db_name};'
+        self.cursor.execute(sql)
+
+    def create_table(self, tbl_name: str):
+        '''新建数据库'''
+        sql = f'CREATE TABLE IF NOT EXISTS {tbl_name};'
+        self.cursor.execute(sql) 
+
+    def drop_database(self, db_name: str):
+        '''删除数据库'''
+        sql = f'DROP DATABASE IF EXISTS {db_name};'
+        self.cursor.execute(sql)
+
+    def drop_table(self, tbl_name: str):
+        '''删除数据库'''
+        sql = f'DROP TABLE IF EXISTS {tbl_name};'
+        self.cursor.execute(sql)
+
+    def query(self, sql: str):
+        '''以数据框形式返回查询据结果'''
+        self.cursor.execute(sql)
+        data = self.cursor.fetchall()  # 以元组形式返回查询数据
+        header = [t[0] for t in self.cursor.description]
+        df = pd.DataFrame(list(data), columns=header)  # pd.DataFrame 对列表具有更好的兼容性
+        return df
+
+    def show_databases(self):
+        '''查看服务器上的所有数据库'''
+        sql = 'SHOW DATABASE;'
+        return self.query(sql)
+
+    def select_database(self):
+        '''查看当前数据库'''
+        sql = 'SELECT DATABASE();'
+        return self.query(sql)
+
+    def show_tables(self):
+        '''查看当前数据库中所有的表'''
+        sql = 'SHOW TABLES;'
+        return self.query(sql)
+
+    def insert_table(self, sql):
+        try:
+            # 执行sql语句
+            cursor.execute(sql)
+            self.connection.commit()
+        except Exception as e:
+            # 发生错误时回滚
+            self.connection.rollback()
+
+
+mydb = MYDATABASE(db_name="mzinfo")
+
+df = mydb.query("SELECT * FROM recoder;")
+
+# print(df)
+
+df.to_json("temp.json", orient='records', lines=True, force_ascii=False)
+"""
+
+
+df = pd.read_json(path_or_buf='all_data.json', orient='records', lines=True, encoding='utf-8')
+# df = pd.read_json(path_or_buf='temp.json', orient='records', lines=True, encoding='utf-8')
+
+# print(df['user_Id'].unique())
+# print(df.groupby('user_Id').size().sort_values())
+
+df = df[df['user_Id'] == 'liguiwen@mz.gd.csg.cn']
+
+# df = df.sort_values(by='record_Time')
+
+# print(df.head)
+
+# print(df.iloc[130]['current_Url'])
+
+# print(df['current_Url'].unique())
+
+# print(df['current_Url'].str.split("?"))
+
+# print(df['value_Data'].unique())
+
+# url = df.iloc[0]['current_Url']
+
+# print(len(parse_url(url)))
+
+# print(df['current_Url'].apply(lambda x: parse_url(x)))
+
+"""
+df['current_Url'] = df['current_Url'].apply(lambda x: parse_url(x))
+
+df['target_Url'] = df['target_Url'].apply(lambda x: parse_url(x))
+
+df['iframe_Url'] = df['iframe_Url'].apply(lambda x: parse_url(x))
+
+df['value_Data'] = df['current_Url'].apply(lambda x: x[-1])
+"""
+
+# print(df.sample(10, random_state=42)['current_Url'])
+
+
+class UrlParser:
+    def __init__(self):
+        self.rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8')
+        self.zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8')
+        self.cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8')
+        self.yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8')
+
+    def process(self, row):
+        row['current_Url'] = row['current_Url'].apply(lambda x: parse_url(x))
+        # print(row.iloc[0])
+        return self.process_record(row.iloc[0])
+
+
+    def process_record(self, item):
+        i = item['current_Url']
+        j = item['target_Url']
+        user = item['user_Id']
+
+        # result = item
+        result = None
+    
+        # if (item['request_Method'] == 'GET') and (not item['form_Data']):
+        #     print(0)
+        # elif (item['request_Method'] == 'GET') and (item['form_Data'] != 'None'):
+        #     print(1, item['form_Data'])
+        # elif (item['request_Method'] == 'POST') and (item['form_Data'] != 'None'):
+        #     print(2, item['form_Data'])
+        # elif item['request_Method'] == 'POST' and (not item['form_Data']):
+        #     print(3)
+    
+        if i[1] == '10.10.21.23':
+            result = {'domain': '人资域'}
+            if i[5].get("appCode"):
+                task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
+                result = {
+                    'time': item['record_Time'],
+                    'user': user,
+                    'method': item['request_Method'],
+                    'hasform': True if item['form_Data'] else False,
+                    'domain': '人资域',
+                    '一级标题': task['一级标题'].values[0],
+                    '二级标题': task['二级标题'].values[0],
+                    '三级标题': task['三级标题'].values[0],
+                    'text': item['button_Text'] if item['button_Text'] else None
+                }
+        elif i[1] == '10.10.21.28':
+            result = {'domain': '资产域'}
+            if i[5].get('appCode'):
+                task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
+                result = {
+                    'time': item['record_Time'],
+                    'user': user,
+                    'method': item['request_Method'],
+                    'hasform': True if item['form_Data'] else False,
+                    'domain': '资产域',
+                    '一级标题': task['一级标题'].values[0],
+                    '二级标题': task['二级标题'].values[0],
+                    '三级标题': task['三级标题'].values[0],
+                    'text': item['button_Text'] if item['button_Text'] else None
+                }
+        elif i[1] == 'fms.gmp.cloud.hq.iv.csg':
+            result = {'domain': '财务域'}
+            if i[5].get('appCode'):
+                task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
+                result = {
+                    'time': item['record_Time'],
+                    'user': user,
+                    'method': item['request_Method'],
+                    'hasform': True if item['form_Data'] else False,
+                    'domain': '财务域',
+                    '一级标题': task['一级标题'].values[0],
+                    '二级标题': task['二级标题'].values[0],
+                    '三级标题': task['三级标题'].values[0],
+                    'text': item['button_Text'] if item['button_Text'] else None
+                }
+        elif i[1] == '10.150.23.1:8010':
+            result = {'domain': '营销域'}
+            fd = item['form_Data']
+            if item['form_Data'] and item['form_Data'] != 'None':
+                try:
+                    if item['form_Data'][0] != "{":
+                        item["params_Data"] = item['form_Data']
+                        form_data = None
+                    else:
+                        item['form_Data'] = item['form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[")
+                        item['form_Data'] = item['form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}")
+                        item['form_Data'] = item['form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"")
+                        item['form_Data'] = item['form_Data'].replace("\n", "")
+                        form_data = json.loads(item['form_Data'])
+                except Exception as e:
+                    logging.error(item['form_Data'])
+                    logging.error(fd)
+                    form_data = None
+                    raise e
+            else:
+                form_data = None
+
+            if form_data and '_INVOKE_FUNC_TITLE_' in form_data:
+                title = form_data['_INVOKE_FUNC_TITLE_'][0]
+                appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1]
+                logging.debug(appcontext)
+                task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''')
+                if task.empty:
+                    task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''')
+                if task.empty:
+                    task = self.yxdf.query(f'''四级标题 == "{title}"''')
+                if task.empty:
+                    task = self.yxdf.query(f'''三级标题 == "{title}"''')
+                try:
+                    result = {
+                        'time': item['record_Time'],
+                        'user': user,
+                        'method': item['request_Method'],
+                        'hasform': True,
+                        'domain': '营销域',
+                        '一级标题': task['一级标题'].values[0],
+                        '二级标题': task['二级标题'].values[0],
+                        '三级标题': task['三级标题'].values[0],
+                        '四级标题': task['四级标题'].values[0],
+                        'text': item['button_Text'] if item['button_Text'] else None
+                    }
+                except Exception as e:
+                    logging.error(task)
+                    logging.error(item['form_Data'])
+                    logging.error(title)
+                    logging.error(e)
+                    logging.error(form_data['_INVOKE_FUNC_URL_'][0])
+            else:
+                result = {
+                    'time': item['record_Time'],
+                    'user': user,
+                    'domain': '营销域',
+                    'hasform': False,
+                    'text': item['button_Text'] if item['button_Text'] else None
+                }
+        elif i[1] == '4a.gd.csg.local':
+            result = {
+                'time': item['record_Time'],
+                'user': user,
+                '域': '登录门户'
+            }
+        return result
+
+
+
+# parser = UrlParser()
+# df['current_Url'] = df['current_Url'].apply(lambda x: parse_url(x))
+# # # for _, item in df.sample(1000, random_state=4).iterrows():
+# for _, item in df.sample(10, random_state=5).iterrows():
+# # for _ in range(5):
+#     # for _, item in df.get_chunk(10).iterrows():
+#     # for _, item in item.iterrows():
+#     print(parser.process_record(item))
+#     # logging.warning(parser.process_record(item))
+
+
+def main():
+    current = df.iloc[0]['current_Url']
+    target  = dict()
+
+    for item in df.itertuples():
+
+        if item.current_Url == current:                 # 当前地址没变,页面无跳转(需判断AJAX)
+            if item.target_Url not in target:           # 目标url不在临时目录
+                target[item.target_Url] = item.Index    # 添加目标URL到临时目录
+        else:                                           # 当前地址发生变化,页面跳转
+            current = item.current_Url                  # 当前地址修改
+            if target.get(current):                     # 查看是否有请求当前URL的历史记录
+                print(df.loc[target.get(current)])     # 查看历史记录
+            else:                                       # 为找到历史记录
+                print("*********非正常跳转*********")    # 非正常跳转
+
+            target = dict()                             # 清除历史记录
+            target[item.target_Url] = item.Index        # 添加当前记录
+
+
+if __name__ == '__main__':
+    main()

+ 8 - 0
V2/requirements/common.txt

@@ -0,0 +1,8 @@
+dmPython==2.5.5
+Flask==2.2.5
+Flask-Pydantic==0.12.0
+Flask-Cors==4.0.0
+Flask-SQLAlchemy==3.0.5
+pandas==1.3.5
+SQLAlchemy==2.0.25
+sqlalchemy-dm==2.0.0

+ 107 - 0
V2/stream_parser.py

@@ -0,0 +1,107 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-15 16:43:04
+# @Last Modified time: 2024-01-22 11:30:31
+# from app.models import Record
+# import pandas as pd
+
+
+# class UserSpace(object):
+#     def __init__(self, user: str):
+#         self.user = user
+#         self.df = pd.DataFrame()
+
+#     def __repr__(self):
+#         return f'<User: {self.user} DF: {self.df}>'
+
+#     def push(self, item: dict):
+#         self.df = pd.concat([self.df, pd.DataFrame([item])])
+
+#     def pop(self):
+#         pass
+
+
+# def predict(item):
+#     if g.get(item['user']):
+#         g[item['user']].push(item)
+#     else:
+#         g[item['user']] = UserSpace(item['user'])
+#         g[item['user']].push(item)
+
+
+
+# if __name__ == '__main__':
+#     g = dict()
+#     predict({'user': '123', 'date': 123, 'domain': '人资域'})
+#     predict({'user': '456', 'date': 456, 'domain': '营销域'})
+#     predict({'user': '123', 'date': 789, 'domain': '营销域'})
+#     print(g)
+
+"""
+import time
+import logging
+import multiprocessing
+from datetime import datetime
+from multiprocessing.connection import Listener
+
+import pandas as pd
+
+logging.basicConfig(
+    format='%(asctime)s %(levelname)s %(message)s',
+    level=logging.INFO
+)
+
+logger = logging.getLogger('multilog')
+logger.setLevel(logging.DEBUG)
+
+class ProcessServer(object):
+    DataFrame = pd.DataFrame(columns=['user', 'time', 'domain'])
+
+    def __init__(self, host: str = "localhost", port: int = 5000):
+        self.host = host
+        self.port = port
+        self.run_server()
+
+    def do_socket(self, conn, addr, ):
+        try:
+            while True:
+                if conn.poll(1) == False:
+                    time.sleep(0.5)
+                    continue
+
+                data = conn.recv()  # 等待接受数据
+                conn.send('sucess')
+                # ***********************
+                # 要执行的程序写在这里
+                # ***********************
+                self.df = pd.concat([self.df, pd.DataFrame([data])])
+                logger.info(data)
+                logger.info(self.df)
+    
+        except Exception as e:
+            logger.error(f'Socket Error {e}')
+        finally:
+            try:
+                conn.close()
+                logger.info(f'Connection close. {addr}')
+            except:
+                logger.error('close except')
+
+    def run_server(self):
+        server_sock = Listener((self.host, self.port))
+        logger.info(f"Sever running... {self.host} {self.port}")
+
+        pool = multiprocessing.Pool(10)
+        while True:
+            conn = server_sock.accept()
+            addr = server_sock.last_accepted
+            logger.info(f'Accept new connection {addr}')
+
+            # 创建进程来处理TCP连接:
+            pool.apply_async(func=self.do_socket, args=(conn, addr,))
+
+
+if __name__ == '__main__':
+    server = ProcessServer(port=35010)
+    server.run_server()
+"""

Diferenças do arquivo suprimidas por serem muito extensas
+ 0 - 0
V2/temp.json


+ 35 - 0
V2/test.py

@@ -0,0 +1,35 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-18 10:51:09
+# @Last Modified time: 2024-01-18 15:56:47
+# import pandas as pd
+# from record_parser import UrlParser
+
+# df = pd.DataFrame([{  "record_Time": "2024-01-17T05:14:53.632Z",  "current_Url": "http://10.10.21.28/gmp/static/gmpweb/workbench/menu/AppNavExpand.html?appCode=eqpitassets&tSession=1703234778559",  "full_Url": "http://domain/path",  "user_Id": "zhangchijun@mz.gd.csg.cn",  "device_Id": "string",  "request_Method": "GET",  "event_Type": "click",  "form_Data": "{}",  "value_Data": "string",  "status_Code": "",  "target_Url": "http:\/\/10.10.21.28\/api\/jadp\/workbench\/todos\/todoCountByUserId?_=1703234767322", "button_Text": "申领管理"}])
+
+# parser = UrlParser()
+# result = parser.process(df)
+# print(result)
+
+
+# import time
+# import json
+# from multiprocessing.connection import Client
+
+# client = Client(('127.0.0.1', 35010))
+
+
+# # while True:
+# if True:
+#     data = {'user': '001', 'time': 1, 'domain': '001'}
+#     client.send(data)
+#     response = client.recv()  # 等待接受数据
+#     time.sleep(1)
+#     client.send(data)
+#     response = client.recv()  # 等待接受数据
+#     client.send(data)
+#     time.sleep(1)
+#     response = client.recv()  # 等待接受数据
+#     client.close()
+#     print(response)
+#     time.sleep(1)

+ 27 - 0
V2/tests/Product.py

@@ -0,0 +1,27 @@
+
+# coding: utf-8
+from sqlalchemy import Column, Integer, String,Date,Numeric,Text
+from sqlalchemy.ext.declarative import declarative_base
+# 创建对象的基类:
+Base = declarative_base()
+
+class Product(Base):
+    # 表的名字:
+    __tablename__ = 'product'
+    # 表的结构:
+    PRODUCTID = Column(Integer,autoincrement=True, primary_key=True)
+    NAME = Column(String(100))
+    # AUTHOR = Column(String(25))
+    # PUBLISHER = Column(String(50))
+    # PUBLISHTIME = Column(Date)
+    # PRODUCTNO = Column(String(25))
+    # SATETYSTOCKLEVEL = Column(Integer)
+    # ORIGINALPRICE = Column(Numeric(19,4))
+    # NOWPRICE = Column(Numeric(19,4))
+    # DISCOUNT = Column(Numeric(2,1))
+    # DESCRIPTION = Column(Text)
+    # TYPE = Column(String(5))
+    # PAPERTOTAL = Column(Integer)
+    # WORDTOTAL = Column(Integer)
+    # SELLSTARTTIME = Column(Date)
+    # SELLENDTIME = Column(Date)

+ 39 - 0
V2/tests/c.py

@@ -0,0 +1,39 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-10 16:16:57
+# @Last Modified time:2024-01-10 16:16:57
+import sys, time
+from rocketmq.client import PushConsumer, ConsumeStatus
+
+topicName = 'test'
+groupName = 'log'
+nameserver = 'localhost:9876'
+TAGS = 'XXX'
+
+# 消息处理回调
+def callback(msg):
+    # 模拟业务
+    print('Received message. messageId: ', msg.id, ' body: ', msg.body, msg.get_property('property'))
+    # 消费成功回复CONSUME_SUCCESS
+    return ConsumeStatus.CONSUME_SUCCESS
+    # 消费成功回复消息状态
+    # return ConsumeStatus.RECONSUME_LATER
+
+def start_consume_message():
+    # 初始化消费者,并设置消费者组信息
+    consumer = PushConsumer(groupName)
+    # 设置服务地址
+    consumer.set_name_server_address(nameserver)
+    # 订阅topic
+    consumer.subscribe(topicName, callback, TAGS)
+    print(' [Consumer] Waiting for messages.')
+    # 启动消费者
+    consumer.start()
+
+    while True:
+        time.sleep(30)
+    # 资源释放
+    consumer.shutdown()
+
+if __name__ == '__main__':
+    start_consume_message()

+ 94 - 0
V2/tests/dmSQLAlchemy.py

@@ -0,0 +1,94 @@
+
+# coding: utf-8
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+from Product import Base, Product
+
+def main():
+    #dialect 是SQLAlchemy用来与各种类型的DBAPI和数据库通信的系统。
+    conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236'
+    #创建Engine对象
+    engine = create_engine(conn_url)
+    #创建DBSession对象
+    DBSession = sessionmaker(bind=engine)
+
+    Base.metadata.create_all(engine) # 创建表结构
+
+    fun_select_all(DBSession)
+    # # 插入
+    fun_insert(DBSession)
+    fun_select_all(DBSession)
+    # # 插入
+    fun_insert1(DBSession)
+    fun_select_all(DBSession)
+    # # 更新
+    fun_update(DBSession)
+    fun_select_all(DBSession)
+    # # 删除
+    fun_delete(DBSession)
+    fun_select_all(DBSession)
+
+def fun_select_all(DBSession):
+    # 创建Session
+    session = DBSession()
+    # 查询所有的
+    list_product = session.query(Product).all()
+    print('查询所有结果:')
+    for product in list_product:
+        print(product.NAME)# , product.AUTHOR, product.PUBLISHER )
+    print('')
+    session.close()
+
+def fun_insert(DBSession):
+    # 创建Session
+    session = DBSession()
+    new_product = Product()
+    new_product.NAME = '水浒传'
+    # new_product.AUTHOR = '施耐庵,罗贯中'
+    # new_product.PUBLISHER = '中华书局'
+    # new_product.PUBLISHTIME = '2005-4-1'
+    # new_product.PRODUCTNO = '9787101046137'
+    # new_product.SATETYSTOCKLEVEL = '10'
+    # new_product.ORIGINALPRICE = '19'
+    # new_product.NOWPRICE = '14.3'
+    # new_product.DISCOUNT = '7.5'
+    # new_product.DESCRIPTION = '''  《水浒传》是宋江起义故事在民间长期流传基础上产生出来的,吸收了民间文学的营养。'''
+    # new_product.PHOTO = ''
+    # new_product.TYPE = '16'
+    # new_product.PAPERTOTAL = '922'
+    # new_product.WORDTOTAL = '912000'
+    # new_product.SELLSTARTTIME = '2006-03-20'
+    # new_product.SELLENDTIME = ''
+
+    session.add(new_product)
+    session.commit()
+    print('插入成功')
+    session.close()
+
+def fun_insert1(DBSession):
+    session = DBSession()
+    new_product = Product()
+    new_product.NAME = '红楼梦'
+    session.add(new_product)
+    session.commit()
+    session.close()
+
+def fun_update(DBSession):
+    # 创建Session
+    session = DBSession()
+    product = session.query(Product).filter(Product.NAME == '水浒传').one()
+    product.NAME = '水浒'
+    session.commit()
+    print('更新成功')
+    session.close()
+
+def fun_delete(DBSession):
+    # 创建Session
+    session = DBSession()
+    session.query(Product).filter(Product.NAME == '水浒').delete()
+    session.commit()
+    print('删除成功')
+    session.close()
+
+if __name__ == '__main__':
+    main()

+ 42 - 0
V2/tests/p.py

@@ -0,0 +1,42 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-10 16:16:18
+# @Last Modified time:2024-01-10 16:16:18
+import sys, time
+from rocketmq.client import Producer, Message
+
+topicName = 'test'
+groupName = 'log'
+nameserver = 'localhost:9876'
+KEYS = 'XXX'
+TAGS = 'XXX'
+
+def create_message():
+    # 组装消息
+    msg = Message(topicName)
+    # 设置keys
+    msg.set_tags(KEYS)
+    # 设置tags
+    msg.set_tags(TAGS)
+    msg.set_property('property', 'test')
+    # 消息内容
+    msg.set_body('message body')
+    return msg
+
+def send_message_sync(count):
+    # 初始化生产者,并设置生产组消息,组名称使用全称
+    producer = Producer(groupName)
+    # 设置服务器地址
+    producer.set_name_server_address(nameserver)
+    # 启动生产者
+    producer.start()
+    for n in range(count):
+        msg = create_message()
+        # 发送同步消息
+        ret = producer.send_sync(msg)
+        print(ret.status, ret.msg_id, ret.offset)
+    # 资源释放
+    producer.shutdown()
+
+if __name__ == '__main__':
+    send_message_sync(10)

+ 289 - 0
V2/tests/test_api.py

@@ -0,0 +1,289 @@
+#!/usr/bin/python
+# -*- coding=utf-8 -*-
+# @Create Time:		2024-01-18 15:29:28
+# @Last Modified time: 2024-01-23 15:54:23
+# import re
+# import json
+# import unittest
+# from base64 import b64encode
+# from flask import url_for
+# from app import create_app, db
+# from app.models import *
+
+# class APITestCase(unittest.TestCase):
+#     def setUp(self):
+#         self.app = create_app('testing')
+#         self.app_context = self.app.app_context()
+#         self.app_context.push()
+#         db.create_all()
+#         self.client = self.app.test_client()
+
+#     def tearDown(self):
+#         db.session.remove()
+#         db.drop_all()
+#         self.app_context.pop()
+
+    # def get_api_headers(self, username, password):
+    #     return {
+    #         'Authorization': 'Basic ' + b64encode(
+    #             (username + ':' + password).encode('utf-8')).decode('utf-8'),
+    #         'Accept': 'application/json',
+    #         'Content-Type': 'application/json'
+    #     }
+
+    # def test_404(self):
+    #     response = self.client.get(
+    #         '/wrong/url',
+    #         headers=self.get_api_headers('email', 'password'))
+    #     self.assertTrue(response.status_code == 404)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertTrue(json_response['error'] == 'not found')
+
+    # def test_no_auth(self):
+    #     response = self.client.get(url_for('api.get_posts'),
+    #                                content_type='application/json')
+    #     self.assertTrue(response.status_code == 401)
+
+    # def test_bad_auth(self):
+    #     # add a user
+    #     r = Role.query.filter_by(name='User').first()
+    #     self.assertIsNotNone(r)
+    #     u = User(email='john@example.com', password='cat', confirmed=True,
+    #              role=r)
+    #     db.session.add(u)
+    #     db.session.commit()
+
+    #     # authenticate with bad password
+    #     response = self.client.get(
+    #         url_for('api.get_posts'),
+    #         headers=self.get_api_headers('john@example.com', 'dog'))
+    #     self.assertTrue(response.status_code == 401)
+
+    # def test_token_auth(self):
+    #     # add a user
+    #     r = Role.query.filter_by(name='User').first()
+    #     self.assertIsNotNone(r)
+    #     u = User(email='john@example.com', password='cat', confirmed=True,
+    #              role=r)
+    #     db.session.add(u)
+    #     db.session.commit()
+
+    #     # issue a request with a bad token
+    #     response = self.client.get(
+    #         url_for('api.get_posts'),
+    #         headers=self.get_api_headers('bad-token', ''))
+    #     self.assertTrue(response.status_code == 401)
+
+    #     # get a token
+    #     response = self.client.get(
+    #         url_for('api.get_token'),
+    #         headers=self.get_api_headers('john@example.com', 'cat'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertIsNotNone(json_response.get('token'))
+    #     token = json_response['token']
+
+    #     # issue a request with the token
+    #     response = self.client.get(
+    #         url_for('api.get_posts'),
+    #         headers=self.get_api_headers(token, ''))
+    #     self.assertTrue(response.status_code == 200)
+
+    # def test_anonymous(self):
+    #     response = self.client.get(
+    #         url_for('api.get_posts'),
+    #         headers=self.get_api_headers('', ''))
+    #     self.assertTrue(response.status_code == 200)
+
+
+    # def test_unconfirmed_account(self):
+    #     # add an unconfirmed user
+    #     r = Role.query.filter_by(name='User').first()
+    #     self.assertIsNotNone(r)
+    #     u = User(email='john@example.com', password='cat', confirmed=False,
+    #              role=r)
+    #     db.session.add(u)
+    #     db.session.commit()
+
+    #     # get list of posts with the unconfirmed account
+    #     response = self.client.get(
+    #         url_for('api.get_posts'),
+    #         headers=self.get_api_headers('john@example.com', 'cat'))
+    #     self.assertTrue(response.status_code == 403)
+
+
+    # def test_posts(self):
+    #     # add a user
+    #     r = Role.query.filter_by(name='User').first()
+    #     self.assertIsNotNone(r)
+    #     u = User(email='john@example.com', password='cat', confirmed=True,
+    #              role=r)
+    #     db.session.add(u)
+    #     db.session.commit()
+
+    #     # write an empty post
+    #     response = self.client.post(
+    #         url_for('api.new_post'),
+    #         headers=self.get_api_headers('john@example.com', 'cat'),
+    #         data=json.dumps({'body': ''}))
+    #     self.assertTrue(response.status_code == 400)
+
+    #     # write a post
+    #     response = self.client.post(
+    #         url_for('api.new_post'),
+    #         headers=self.get_api_headers('john@example.com', 'cat'),
+    #         data=json.dumps({'body': 'body of the *blog* post'}))
+    #     self.assertTrue(response.status_code == 201)
+    #     url = response.headers.get('Location')
+    #     self.assertIsNotNone(url)
+
+    #     # get the new post
+    #     response = self.client.get(
+    #         url,
+    #         headers=self.get_api_headers('john@example.com', 'cat'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertTrue(json_response['url'] == url)
+    #     self.assertTrue(json_response['body'] == 'body of the *blog* post')
+    #     self.assertTrue(json_response['body_html'] ==
+    #                     '<p>body of the <em>blog</em> post</p>')
+    #     json_post = json_response
+
+    #     # get the post from the user
+    #     response = self.client.get(
+    #         url_for('api.get_user_posts', id=u.id),
+    #         headers=self.get_api_headers('john@example.com', 'cat'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertIsNotNone(json_response.get('posts'))
+    #     self.assertTrue(json_response.get('count', 0) == 1)
+    #     self.assertTrue(json_response['posts'][0] == json_post)
+
+    #     # get the post from the user as a follower
+    #     response = self.client.get(
+    #         url_for('api.get_user_followed_posts', id=u.id),
+    #         headers=self.get_api_headers('john@example.com', 'cat'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertIsNotNone(json_response.get('posts'))
+    #     self.assertTrue(json_response.get('count', 0) == 1)
+    #     self.assertTrue(json_response['posts'][0] == json_post)
+
+    #     # edit post
+    #     response = self.client.put(
+    #         url,
+    #         headers=self.get_api_headers('john@example.com', 'cat'),
+    #         data=json.dumps({'body': 'updated body'}))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertTrue(json_response['url'] == url)
+    #     self.assertTrue(json_response['body'] == 'updated body')
+    #     self.assertTrue(json_response['body_html'] == '<p>updated body</p>')
+
+    # def test_users(self):
+    #     # add two users
+    #     r = Role.query.filter_by(name='User').first()
+    #     self.assertIsNotNone(r)
+    #     u1 = User(email='john@example.com', username='john',
+    #               password='cat', confirmed=True, role=r)
+    #     u2 = User(email='susan@example.com', username='susan',
+    #               password='dog', confirmed=True, role=r)
+    #     db.session.add_all([u1, u2])
+    #     db.session.commit()
+
+    #     # get users
+    #     response = self.client.get(
+    #         url_for('api.get_user', id=u1.id),
+    #         headers=self.get_api_headers('susan@example.com', 'dog'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertTrue(json_response['username'] == 'john')
+    #     response = self.client.get(
+    #         url_for('api.get_user', id=u2.id),
+    #         headers=self.get_api_headers('susan@example.com', 'dog'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertTrue(json_response['username'] == 'susan')
+
+    # def test_comments(self):
+    #     # add two users
+    #     r = Role.query.filter_by(name='User').first()
+    #     self.assertIsNotNone(r)
+    #     u1 = User(email='john@example.com', username='john',
+    #               password='cat', confirmed=True, role=r)
+    #     u2 = User(email='susan@example.com', username='susan',
+    #               password='dog', confirmed=True, role=r)
+    #     db.session.add_all([u1, u2])
+    #     db.session.commit()
+
+    #     # add a post
+    #     post = Post(body='body of the post', author=u1)
+    #     db.session.add(post)
+    #     db.session.commit()
+
+    #     # write a comment
+    #     response = self.client.post(
+    #         url_for('api.new_post_comment', id=post.id),
+    #         headers=self.get_api_headers('susan@example.com', 'dog'),
+    #         data=json.dumps({'body': 'Good [post](http://example.com)!'}))
+    #     self.assertTrue(response.status_code == 201)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     url = response.headers.get('Location')
+    #     self.assertIsNotNone(url)
+    #     self.assertTrue(json_response['body'] ==
+    #                     'Good [post](http://example.com)!')
+    #     self.assertTrue(
+    #         re.sub('<.*?>', '', json_response['body_html']) == 'Good post!')
+
+    #     # get the new comment
+    #     response = self.client.get(
+    #         url,
+    #         headers=self.get_api_headers('john@example.com', 'cat'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertTrue(json_response['url'] == url)
+    #     self.assertTrue(json_response['body'] ==
+    #                     'Good [post](http://example.com)!')
+
+    #     # add another comment
+    #     comment = Comment(body='Thank you!', author=u1, post=post)
+    #     db.session.add(comment)
+    #     db.session.commit()
+
+    #     # get the two comments from the post
+    #     response = self.client.get(
+    #         url_for('api.get_post_comments', id=post.id),
+    #         headers=self.get_api_headers('susan@example.com', 'dog'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertIsNotNone(json_response.get('posts'))
+    #     self.assertTrue(json_response.get('count', 0) == 2)
+
+    #     # get all the comments
+    #     response = self.client.get(
+    #         url_for('api.get_comments', id=post.id),
+    #         headers=self.get_api_headers('susan@example.com', 'dog'))
+    #     self.assertTrue(response.status_code == 200)
+    #     json_response = json.loads(response.data.decode('utf-8'))
+    #     self.assertIsNotNone(json_response.get('posts'))
+    #     self.assertTrue(json_response.get('count', 0) == 2)
+
+
+
+import json, time
+from datetime import datetime
+import requests
+
+with open("../fuli_data.json", "r", encoding="utf-8") as fp:
+    num = 0
+    for line in fp.readlines():
+        json_obj = json.loads(line)
+        # json_obj['record_Time'] = json_obj['record_Time']/1000
+        json_obj['device_Id'] = 'did'
+        json_obj['full_Url'] = json_obj['current_Url']
+        response = requests.post('http://localhost:7788/record', json=json_obj)
+        print(response.text)
+        time.sleep(0.2)
+        num += 1
+        if num > 40:
+            break

+ 50 - 1
compose.yaml

@@ -1,5 +1,19 @@
-version: "3"
+version: "3.7"
 services:
+  dm8:
+    image: "dm8_single:dm8_20230808_rev197096_x86_rh6_64"
+    environment:
+      PAGE_SIZE: 16
+      LD_LIBRARY_PATH: "/opt/dmdbms/bin"
+      EXTENT_SIZE: 32
+      BLANK_PAD_MODE: 1
+      LOG_SIZE: 1024
+      UNICODE_FLAG: 1
+      LENGTH_IN_CHAR: 1
+      INSTANCE_NAME: "dm8"
+    volumes:
+      - "$PWD/dm8/data:/opt/dmdbms/data"
+    restart: always
   db:
     image: "mysql:latest"
     environment:
@@ -9,6 +23,35 @@ services:
     volumes:
       - "/data/mysql/data:/var/lib/mysql"
     restart: always
+  # rmqnamesrv:
+  #   image: "apache/rocketmq:latest"
+  #   environment:
+  #     JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms256M -Xmx256M -Xmn128m"
+  #   command: ["sh", "mqnamesrv"]
+  #   volumes:
+  #     - "$PWD/rocketmq/logs:/home/rocketmq/logs"
+  #   ports:
+  #     - 9876:9876
+  # broker:
+  #   image: "apache/rocketmq:latest"
+  #   environment:
+  #     NAMESRV_ADDR: "rmqnamesrv:9876"
+  #     JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms256M -Xmx256M -Xmn128m"
+  #   command: ["sh", "mqbroker", "-c", "/home/rocketmq/broker.conf", "autoCreateTopicEnable=true"]
+  #   depends_on:
+  #     - "rmqnamesrv"
+  #   volumes:
+  #     - "$PWD/rocketmq/logs:/root/logs"
+  #     - "$PWD/rocketmq/store:/root/store"
+  #     - "$PWD/rocketmq/conf/broker.conf:/home/rocketmq/broker.conf"
+  # rmqdashboard:
+  #   image: "apacherocketmq/rocketmq-dashboard:latest"
+  #   environment:
+  #     JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876"
+  #   depends_on:
+  #     - "broker"
+  #   ports:
+  #     - 8080:8080
   backend:
     image: "mzinfo:v1"
     environment:
@@ -26,3 +69,9 @@ services:
       options:
         max-size: "1G"
         max-file: "10"
+  testing:
+    environment:
+      FLASKY_ADMIN: "admin"
+      DATABASE_URL: "dm+dmPython://SYSDBA:SYSDBA001@dm-database:5236"
+      DEV_DATABASE_URL: "dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236"
+      TEST_DATABASE_URL: "dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236"

Alguns arquivos não foram mostrados porque muitos arquivos mudaram nesse diff