""" 对 Word 中数据库设计表构建图谱 字段名判断依据:1、字段名称不为保留字 2、字母和下划线 3、驼峰命名 4、注意数字 """ import re import logging import docx from docx import Document from docx.oxml.table import CT_Tbl from docx.oxml.text.paragraph import CT_P from docx.table import _Cell, Table from docx.text.paragraph import Paragraph import uuid import pandas as pd import networkx as nx import matplotlib.pyplot as plt from py2neo import Node, Graph, Relationship graph = Graph('bolt://192.168.1.150:7687/', user='neo4j', password='password', name="neo4j") graph.delete_all() coltypes = {'int', 'bigint', 'float', 'double', 'decimal', 'date', 'datetime', 'char', 'varchar', 'text', 'longtext', 'blob', 'bool', 'boolean'} coldict = {'名称': 'colname', '字段名': 'colname', 'Field Name': 'colname', '关联字段': 'alias', 'Alias': 'alias'} class LoggerHandler(logging.Logger): def __init__(self, name: str, console_handler_level: str = logging.INFO, fmt: str = '%(levelname)s: %(asctime)s: %(name)s: %(filename)s: %(lineno)d: %(funcName)s: %(message)s'): super().__init__(name) self.setLevel(logging.INFO) self.fmt = logging.Formatter(fmt) self.set_console_handler(console_handler_level) def set_console_handler(self, console_handler_level: str = logging.INFO) -> None: ch = logging.StreamHandler() ch.setLevel(console_handler_level) ch.setFormatter(self.fmt) self.addHandler(ch) logger = LoggerHandler(__name__, fmt='%(levelname)s: %(asctime)s: %(lineno)d: %(funcName)s: %(message)s') class Word: def __init__(self, path: str, draw: bool = False) -> None: self.draw = draw self.doc = Document(path) if draw: self.G = nx.Graph() def iter_block_item(self, parent): if isinstance(parent, docx.document.Document): parent_elm = parent.element.body elif isinstance(parent, _Cell): parent_elm = parent._tc else: raise ValueError("something error") for child in parent_elm.iterchildren(): if isinstance(child, CT_P): yield Paragraph(child, parent) elif isinstance(child, CT_Tbl): yield Table(child, parent) def parse(self) -> tuple: for block in self.iter_block_item(self.doc): if block.style.name == 'Heading 1' and block.text: yield ('Heading', block.text) elif block.style.name == 'Heading 2' and block.text: yield ('Heading', block.text) elif block.style.name == 'Heading 3' and block.text: yield ('Heading', block.text) elif block.style.name == 'Heading 4' and block.text: yield ('Heading', block.text) elif block.style.name == 'Heading 5' and block.text: yield ('Heading', block.text) elif block.style.name == 'Heading 6' and block.text: yield ('Heading', block.text) elif block.style.name == 'Normal' and block.text: yield ('Normal', block.text) elif block.style.name == 'Table Grid': tables = [] for row in block.rows: rows = [] for cell in row.cells: for paragraph in cell.paragraphs: rows.append(paragraph.text.strip()) tables.append(rows) yield ('Table', tables) elif block.style.name == 'Normal Table': tables = [] for row in block.rows: rows = [] for cell in row.cells: for paragraph in cell.paragraphs: rows.append(paragraph.text.strip()) tables.append(rows) yield ('Table', tables) def predict(self): for r in self.parse(): if r[0] in ['Heading', 'Normal']: tablename = r[1] logger.debug(tablename) if r[0] == 'Table': # 判断表是否为需要解析的表 if any(coltypes & {i.lower() for i in r[1][1]}): # 数据库表名解析 if re.search("[a-zA-Z_]+", tablename): tablename = re.search("[a-zA-Z_]+", tablename).group() logger.info(f"得到数据库表,表名:{tablename}") df = pd.DataFrame(r[1]) df.columns = df.values.tolist()[0] df.drop([0], inplace=True) if self.draw: self.G.add_node(tablename) nodelist = [ (uuid.uuid1(), item) for item in df.to_dict(orient ='records') ] self.G.add_nodes_from(nodelist) edgelist = [ (tablename, node[0]) for node in nodelist ] self.G.add_edges_from(edgelist) # 创建表节点 start_node = Node("表", name=tablename) graph.merge(start_node, "表", "name") # 表字段修正 df.rename(columns=coldict, inplace=True) # 别名字段必填 if 'alias' not in df.columns: logger.warning(f"未找到Alias字段,当前字段包含:{df.columns}") df['alias'] = '' # 迭代表字段 for item in df.to_dict(orient ='records'): # 确保属性插入正常 for key in set(item.keys()): if not isinstance(key, str): del item[key] # 字段名设置合并条件 colname = item['colname'] if not item['alias']: item['alias'] = tablename + '_' + colname # 创建字段节点 end_node = Node("字段", name=colname, **item) graph.merge(end_node, "字段", "alias") # 创建表字段关系 # relation = Relationship(start_node, 'has', end_node) HAS = Relationship.type("has") graph.merge(HAS(start_node, end_node)) def draw(self): if self.draw: nx.draw(self.G, with_labels = True) plt.show() else: return "Draw is not enabled" if __name__ == '__main__': path = '''数据库设计文档.docx''' path = '''数据库设计(1).docx''' path = '''数据库设计(2).docx''' # path = '''国家电投人才猎头智能人才库项目-数据库设计说明书.docx''' # path = '''FJS-OCR 富士通识别平台 数据库设计说明书.docx''' # path = '''中国跳水队智能辅助教练系统-国际比赛数据 数据库设计说明书.docx''' word = Word(path) word.predict()