""" 对 Word 中数据库设计表构建图谱 字段名判断依据:1、字段名称不为保留字 2、字母和下划线 3、驼峰命名 4、注意数字 字段类型判断:re.match(r'\w+\(\d+\)', s) 1、判断是否标准表 [依据]-> 判断是否包含列类型 -> 找到列 2、判断列名 [是]-> 判断是否符合列名规范 3、判断列中文名 [规则]-> 找到列中文名 4、判断列类型 [是]-> 判断是否符合列规范 5、判断主键 [未找到主键]-> 判断是否是ID 6、判断是否必填字段 [依据]-> 分类[是否为空|是否必填] -> [未找到字段]-> 设定默认值必填 7、判断列注释 [未找到注释]-> 中文名 [未找到中文名]-> 表名+列名 -> [列类型为bool、时间、流等]-> 添加UUID 8、字段唯一判断条件 <列名,列类型,注释> """ import re import uuid import logging from collections import Counter import docx from docx import Document from docx.oxml.table import CT_Tbl from docx.oxml.text.paragraph import CT_P from docx.table import _Cell, Table from docx.text.paragraph import Paragraph import jieba import pandas as pd import networkx as nx import matplotlib.pyplot as plt # from text2vec import Similarity # sim = Similarity() from py2neo import Node, Graph, Relationship graph = Graph('bolt://192.168.1.150:7687/', user='neo4j', password='password', name="neo4j") graph.delete_all() # # 列类型判断 # with open('dict/columntype.txt', 'r', encoding='utf-8') as fp: # COLUMN_TYPES = {i.strip() for i in fp.readlines()} # # 列名判断 # with open('dict/columnname.txt', 'r', encoding='utf-8') as fp: # COLUMN_DICT = {i.strip(): 'name' for i in fp.readlines()} # # 注释判断 # with open('dict/comment.txt', 'r', encoding='utf-8') as fp: # REMARK_DICT = {i.strip(): 'remark' for i in fp.readlines()} COLUMN_TYPES = {'int', 'bigint', 'tinyint', 'smallint', 'bigint unsigned', 'float', 'double', 'decimal', 'date', 'datetime', 'char', 'varchar', 'text', 'longtext', 'blob', 'bool', 'boolean'} TYPE_DICT = {'类型': 'column_type', '数据类型': 'column_type'} COLUMN_DICT = {'名称': 'name', '字段名': 'name', 'field name': 'name', '字段代码': 'name', '代码': 'name', '物理字段名': 'name'} C_NAME_DICT = {'字段中文名': 'c_name', '中文含义': 'c_name', '名字': 'c_name', '字段名称': 'c_name', '逻辑字段名': 'c_name'} REMARK_DICT = {'Alias': 'remark', 'description': 'remark', '说明': 'remark', '描述': 'remark', '备注': 'remark'} REQUIRED_DICT = {'空/非空': 'required', '可不可以为空': 'required', '是否为空': 'required', '允许空值': 'required', '是否必填': 'required', '空值': 'required'} PRIMARY_KEY_DICT = {'主键': 'primary_key'} FOREIGN_KEY_DICT = {'外键': 'foreign_key'} class LoggerHandler(logging.Logger): def __init__(self, name: str, console_handler_level: str = logging.INFO, fmt: str = '%(levelname)s: %(asctime)s: %(name)s: %(filename)s: %(lineno)d: %(funcName)s: %(message)s'): super().__init__(name) self.setLevel(logging.INFO) self.fmt = logging.Formatter(fmt) self.set_console_handler(console_handler_level) def set_console_handler(self, console_handler_level: str = logging.INFO) -> None: ch = logging.StreamHandler() ch.setLevel(console_handler_level) ch.setFormatter(self.fmt) self.addHandler(ch) logger = LoggerHandler(__name__, fmt='%(levelname)s: %(asctime)s: %(lineno)d: %(funcName)s: %(message)s') class Word: def __init__(self, path: str, draw: bool = False) -> None: self.draw = draw self.doc = Document(path) if draw: self.G = nx.Graph() self.namecount = dict({}) self.all_tables = pd.DataFrame() def iter_block_item(self, parent): if isinstance(parent, docx.document.Document): parent_elm = parent.element.body elif isinstance(parent, _Cell): parent_elm = parent._tc else: raise ValueError("something error") for child in parent_elm.iterchildren(): if isinstance(child, CT_P): yield Paragraph(child, parent) elif isinstance(child, CT_Tbl): yield Table(child, parent) def parse(self) -> tuple: for block in self.iter_block_item(self.doc): if block.style.name == 'Heading 1' and block.text: yield ('Heading', block.text.lower()) elif block.style.name == 'Heading 2' and block.text: yield ('Heading', block.text.lower()) elif block.style.name == 'Heading 3' and block.text: yield ('Heading', block.text.lower()) elif block.style.name == 'Heading 4' and block.text: yield ('Heading', block.text.lower()) elif block.style.name == 'Heading 5' and block.text: yield ('Heading', block.text.lower()) elif block.style.name == 'Heading 6' and block.text: yield ('Heading', block.text.lower()) elif block.style.name == 'Normal' and block.text: yield ('Normal', block.text.lower()) elif block.style.name == 'Table Grid': tables = [] for row in block.rows: rows = [] for cell in row.cells: for paragraph in cell.paragraphs: rows.append(paragraph.text.strip().lower()) tables.append(rows) yield ('Table', tables) elif block.style.name == 'Normal Table': tables = [] for row in block.rows: rows = [] for cell in row.cells: for paragraph in cell.paragraphs: rows.append(paragraph.text.strip().lower()) tables.append(rows) yield ('Table', tables) elif block.text: yield ('Unknow', block) # def clean_table(self, raw_table): # table = [] # dirty_table = [] # while raw_table and '' in raw_table[0]: # raw_table = raw_table[1:] # # 表格预处理 # rowslen = [len(row) for row in raw_table] # if not rowslen: # return None # rowlen = Counter(rowslen).most_common(1)[0][0] # for i,l in enumerate(rowslen): # if l == rowlen: # table.append(raw_table[i]) # else: # dirty_table.append(raw_table[i]) # return table, dirty_table def predict(self): for r in self.parse(): if r[0] in ['Heading', 'Normal'] and r[1]: tablename = r[1] logger.debug(tablename) elif r[0] == 'Table': # table = r[1] # table, dirty_table = self.clean_table(r[1]) table, dirty_table = self.get_table(r[1]) if not table: continue # 判断表是否为需要解析的表 if any({'fulltype', 'type'} & {self.detect_type(i) for i in table[1]}): ############################### 数据库表名解析 ############################## if re.search("[a-zA-Z_\d]+", tablename): table_name = re.search("[a-zA-Z_]+", tablename).group() try: table_c_name = re.search('[\u4e00-\u9fa5]{3,}', tablename).group() except Exception as e: table_c_name = "未知表" logger.info(f"得到数据库表,表名:{table_name}\t猜测中文表名:{table_c_name}") else: table_name = "UnknowTable" table_c_name = "未知表" ############################### 表名解析结束 ############################### ############################# 表字段在此修改 ############################ df = pd.DataFrame(table) df.columns = df.values.tolist()[0] df.drop([0], inplace=True) # # 表字段修正 df.rename(columns={**TYPE_DICT, **COLUMN_DICT, **C_NAME_DICT, **REMARK_DICT, **REQUIRED_DICT, **PRIMARY_KEY_DICT, **FOREIGN_KEY_DICT}, inplace=True) df['table_name'] = table_name df['table_c_name'] = table_c_name for i in df.columns: if self.detect_type(df.loc[1, i]) != 'unknow': df.rename(columns={i: 'column_type'}, inplace=True) break # 判断字段是否允许为空,允许为空值的不可连接 # for i in df.columns: # if sim.get_score(i, '允许值为空') > 0.7: # df['unique'] = df[i].apply(lambda x: str(uuid.uuid1()) if x != 'n' else '') # break # elif sim.get_score(i, '必填') > 0.7: # df['unique'] = df[i].apply(lambda x: '' if x != 'n' else str(uuid.uuid1())) # break # if 'unique' not in df.columns: # print("无法判断字段是否必填") # df['unique'] = '' ############################### 必填字段判断 ############################### if 'required' not in df.columns: logger.warning("无法判断字段是否必填,设定默认值必填,所有字段皆可关联!") df['required'] = '' ############################### 必填字段判断 ############################### # 注释字段必填 if 'remark' not in df.columns: if 'c_name' not in df.columns: logger.warning(f"未找到注释字段,当前字段包含:{df.columns}") df['remark'] = '' else: logger.warning(f"未找到注释字段,使用字段中文名代替!,当前字段包含:{df.columns}") df['remark'] = df['c_name'] ############################### 指定字段不可关联 ############################### df['remark'] = df['remark'] + df['column_type'].apply(lambda x: str(uuid.uuid1()) if x in ['date', 'datetime', 'blob', 'text'] else '') df['remark'] = df['remark'] + df['name'].apply(lambda x: str(uuid.uuid1()) if x in ['create_time', 'update_time'] else '') df['remark'] = df.apply(lambda x: x['name'] if not x['remark'] else x['remark'], axis=1) ############################### 指定字段不可关联 ############################### ############################### 为空字段不可关联 ############################### pass ############################### 为空字段不可关联 ############################### if not df.query(' name in ["id", "ID", "Id"] ').empty: idx = df.query(' name in ["id", "ID", "Id"] ').index[0] df.loc[idx, 'remark'] = ''.join([table_name, '_id']) # 判断唯一的依据 logger.debug(f'''remark type: {type(df.loc[1, 'remark'])}''') logger.debug(f'''column_type type: {type(df.loc[1, 'column_type'])}''') df['vec'] = df['name'] + '_' + df['column_type'] + '_' + df['remark'] # ############################### 表字段结束修改 ############################## # if self.draw: # self.G.add_node(table_name) # nodelist = [ # (uuid.uuid1(), item) for item in df.to_dict(orient ='records') # ] # self.G.add_nodes_from(nodelist) # edgelist = [ # (table_name, node[0]) for node in nodelist # ] # self.G.add_edges_from(edgelist) # 创建表节点 result = graph.nodes.match("表", name = table_name, c_name=table_c_name) if len(result) > 0: start_node = result.first() else: start_node = Node("表", name=table_name, c_name=table_c_name) graph.create(start_node) # 迭代表字段 for item in df.to_dict(orient ='records'): # 确保属性插入正常,删除非字符串键值 for key in set(item.keys()): if not isinstance(key, str): del item[key] ############################# 在此修改 ############################ # 字段名设置合并条件 name = item['name'] if not item['vec']: item['vec'] = table_name + '_' + name ############################# 结束修改 ############################ # 创建字段节点 result = graph.nodes.match("字段", vec = item['vec']) if len(result) > 0: # 查询到相关字段,使用旧字段 end_node = result.first() relationship = Relationship(start_node, "related", end_node, **{"name": item['name']}) else: # 未查询到相关字段,创建节点 end_node = Node("字段", **item) graph.create(end_node) relationship = Relationship(start_node, "has", end_node) # 创建表字段关系 graph.merge(relationship) else: print("非标准表格", table) else: print(r) print(self.all_tables.columns) print(self.all_tables) def detect_type(self, text: str): fulltype = re.match(r'(\w+)\(\d+\)', text) if fulltype and (fulltype.group(1) in COLUMN_TYPES): return 'fulltype' elif text in COLUMN_TYPES: return 'type' else: return 'unknow' def get_table(self, raw_table): table = [] dirty_table = [] has_head = False for row in raw_table: if has_head: table.append(row) elif set(row) & set({**TYPE_DICT, **COLUMN_DICT, **C_NAME_DICT, **REMARK_DICT, **REQUIRED_DICT, **FOREIGN_KEY_DICT}.keys()): head = row has_head = True else: dirty_table.append(row) # for row in raw_table: # if get_head: # table.append(row) # continue # for col in row: # fulltype = re.match(r'(\w+)\(\d+\)', col) # if fulltype and (fulltype.group(1) in COLUMN_TYPES): # table.append(row) # get_head = True # break # elif col in COLUMN_TYPES: # table.append(row) # get_head = True # break # else: # head = row if table and (len(head) == len(table[0])) and (len(Counter([len(_) for _ in table]).keys()) == 1): table.insert(0, head) return table, dirty_table else: return None, dirty_table def draw(self): if self.draw: nx.draw(self.G, with_labels = True) plt.show() else: return "Draw is not enabled" if __name__ == '__main__': # path = '''data/数据库设计说明书.docx''' path = '''data/数据库设计文档.docx''' path = '''data/数据库设计(1).docx''' path = '''data/数据库设计(2).docx''' path = '''data/国家电投人才猎头智能人才库项目-数据库设计说明书.docx''' path = '''data/FJS-OCR 富士通识别平台 数据库设计说明书.docx''' path = '''data/中国跳水队智能辅助教练系统-国际比赛数据 数据库设计说明书.docx''' path = '''data/租房查询系统_数据库设计说明书_2.0.docx''' path = '''data/url-ukWkMKhnRgCvxVZt.docx''' path = '''data/url-qqp17mI32jTyozQt.docx''' path = '''data/电商-数据库详细设计说明书V0.4.docx''' word = Word(path) word.predict()