#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-12 13:25:10 # @Last Modified time: 2024-02-26 16:30:22 import pymysql import pandas as pd class MYDATABASE: def __init__(self, db_name=None): '''连接数据库,创建游标''' config = dict(zip(['host', 'user', 'port', 'password'], ['192.168.1.202', 'root', 13388, 'zh123456'])) config.update(database=db_name) self.connection = pymysql.connect(**config) self.cursor = self.connection.cursor() def __del__(self): self.connection.close() def create_database(self, db_name: str): '''新建数据库''' sql = f'CREATE DATABASE IF NOT EXISTS {db_name};' self.cursor.execute(sql) def create_table(self, tbl_name: str): '''新建数据库''' sql = f'CREATE TABLE IF NOT EXISTS {tbl_name};' self.cursor.execute(sql) def drop_database(self, db_name: str): '''删除数据库''' sql = f'DROP DATABASE IF EXISTS {db_name};' self.cursor.execute(sql) def drop_table(self, tbl_name: str): '''删除数据库''' sql = f'DROP TABLE IF EXISTS {tbl_name};' self.cursor.execute(sql) def query(self, sql: str): '''以数据框形式返回查询据结果''' self.cursor.execute(sql) data = self.cursor.fetchall() # 以元组形式返回查询数据 header = [t[0] for t in self.cursor.description] df = pd.DataFrame(list(data), columns=header) # pd.DataFrame 对列表具有更好的兼容性 return df def show_databases(self): '''查看服务器上的所有数据库''' sql = 'SHOW DATABASE;' return self.query(sql) def select_database(self): '''查看当前数据库''' sql = 'SELECT DATABASE();' return self.query(sql) def show_tables(self): '''查看当前数据库中所有的表''' sql = 'SHOW TABLES;' return self.query(sql) def insert_table(self, sql): try: # 执行sql语句 cursor.execute(sql) self.connection.commit() except Exception as e: # 发生错误时回滚 self.connection.rollback() # mydb = MYDATABASE(db_name="mzinfo") # df = mydb.query("SELECT * FROM recoder;") # print(df) # from sqlalchemy import create_engine # connect_info = 'mysql+pymysql://{}:{}@{}:{}?charset=utf8' # engine = create_engine(connect_info) user_list = [ 'liguiwen@mz.gd.csg.cn', 'houhuiteng@mz.gd.csg.cn', 'lijie1013@mz.gd.csg.cn', 'zengjiyi@mz.gd.csg.cn', 'fuli@mz.gd.csg.cn', 'zengjunhao@mz.gd.csg.cn', 'houyue@mz.gd.csg.cn', 'longruifan@mz.gd.csg.cn', 'wangyongqiang@mz.gd.csg.cn', 'liuhuan0828@mz.gd.csg.cn', 'heweiting@mz.gd.csg.cn', 'ganwenfeng@mz.gd.csg.cn', 'chenweichang0607@mz.gd.csg.cn', 'caiwujiancha1@mz.gd.csg.cn', 'wenyongxian@mz.gd.csg.cn', 'dengrong@mz.gd.csg.cn', 'huanghongtao@mz.gd.csg.cn', 'caiwujiancha2@mz.gd.csg.cn', 'yangying@mz.gd.csg.cn', # 'wenrui@mz.gd.csg.cn', # 'huangshisong@mz.gd.csg.cn', # 'wangwenchao@mz.gd.csg.cn', # 'fangxianhui@mz.gd.csg.cn', # 'lingyuanfeng@mz.gd.csg.cn', # 'zengshuxun@mz.gd.csg.cn', # 'yujunwen@mz.gd.csg.cn', # 'lifuzhi@mz.gd.csg.cn', # 'zhangpeijun@mz.gd.csg.cn', # 'yuanfengwen@mz.gd.csg.cn', # 'liyiwen@mz.gd.csg.cn', # 'liangxinrong@mz.gd.csg.cn', ] con = pymysql.connect(host='192.168.1.202', user='root', port=13388, password='zh123456', database='mzinfo', charset='utf8', use_unicode=True) for user in user_list: sql_cmd = f"SELECT record_Time, current_Url, user_Id, request_Method, event_Type, form_Data, target_Url, iframe_Url, button_Text FROM recoder WHERE user_Id = '{user}' ORDER BY record_Time;" # df = pd.read_sql(sql_cmd, con=engine, index_col="record_Time") df = pd.read_sql(sql_cmd, con) df['record_Time'] = pd.to_datetime(df['record_Time']) print(df.head) df.to_json(f"{user}_data.json", orient='records', lines=True, force_ascii=False)