123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- """
- 对 Word 中数据库设计表构建图谱
- 字段名判断依据:1、字段名称不为保留字 2、字母和下划线 3、驼峰命名 4、注意数字
- 字段类型判断:re.match(r'\w+\(\d+\)', s)
- """
- import re
- import logging
- import docx
- from docx import Document
- from docx.oxml.table import CT_Tbl
- from docx.oxml.text.paragraph import CT_P
- from docx.table import _Cell, Table
- from docx.text.paragraph import Paragraph
- import uuid
- import pandas as pd
- import networkx as nx
- import matplotlib.pyplot as plt
- from py2neo import Node, Graph, Relationship
- graph = Graph('bolt://192.168.1.150:7687/', user='neo4j', password='password', name="neo4j")
- graph.delete_all()
- # 列类型判断
- with open('dict/columntype.txt', 'r', encoding='utf-8') as fp:
- coltypes = {i.strip() for i in fp.readlines()}
- # 列名判断
- with open('dict/columnname.txt', 'r', encoding='utf-8') as fp:
- coldict = {i.strip(): 'colname' for i in fp.readlines()}
- # 注释判断
- with open('dict/comment.txt', 'r', encoding='utf-8') as fp:
- aliasdict = {i.strip(): 'alias' for i in fp.readlines()}
- class LoggerHandler(logging.Logger):
- def __init__(self, name: str, console_handler_level: str = logging.INFO, fmt: str = '%(levelname)s: %(asctime)s: %(name)s: %(filename)s: %(lineno)d: %(funcName)s: %(message)s'):
- super().__init__(name)
- self.setLevel(logging.INFO)
- self.fmt = logging.Formatter(fmt)
- self.set_console_handler(console_handler_level)
- def set_console_handler(self, console_handler_level: str = logging.INFO) -> None:
- ch = logging.StreamHandler()
- ch.setLevel(console_handler_level)
- ch.setFormatter(self.fmt)
- self.addHandler(ch)
- logger = LoggerHandler(__name__, fmt='%(levelname)s: %(asctime)s: %(lineno)d: %(funcName)s: %(message)s')
- class Word:
- def __init__(self, path: str, draw: bool = False) -> None:
- self.draw = draw
- self.doc = Document(path)
- if draw:
- self.G = nx.Graph()
- def iter_block_item(self, parent):
- if isinstance(parent, docx.document.Document):
- parent_elm = parent.element.body
- elif isinstance(parent, _Cell):
- parent_elm = parent._tc
- else:
- raise ValueError("something error")
- for child in parent_elm.iterchildren():
- if isinstance(child, CT_P):
- yield Paragraph(child, parent)
- elif isinstance(child, CT_Tbl):
- yield Table(child, parent)
- def parse(self) -> tuple:
- for block in self.iter_block_item(self.doc):
- if block.style.name == 'Heading 1' and block.text:
- yield ('Heading', block.text)
- elif block.style.name == 'Heading 2' and block.text:
- yield ('Heading', block.text)
- elif block.style.name == 'Heading 3' and block.text:
- yield ('Heading', block.text)
- elif block.style.name == 'Heading 4' and block.text:
- yield ('Heading', block.text)
- elif block.style.name == 'Heading 5' and block.text:
- yield ('Heading', block.text)
- elif block.style.name == 'Heading 6' and block.text:
- yield ('Heading', block.text)
- elif block.style.name == 'Normal' and block.text:
- yield ('Normal', block.text)
- elif block.style.name == 'Table Grid':
- tables = []
- for row in block.rows:
- rows = []
- for cell in row.cells:
- for paragraph in cell.paragraphs:
- rows.append(paragraph.text.strip())
- tables.append(rows)
- yield ('Table', tables)
- elif block.style.name == 'Normal Table':
- tables = []
- for row in block.rows:
- rows = []
- for cell in row.cells:
- for paragraph in cell.paragraphs:
- rows.append(paragraph.text.strip())
- tables.append(rows)
- yield ('Table', tables)
- def predict(self):
- for r in self.parse():
- if r[0] in ['Heading', 'Normal']:
- tablename = r[1]
- logger.debug(tablename)
- elif r[0] == 'Table':
- # 判断表是否为需要解析的表
- if any(coltypes & {i.lower() for i in r[1][1]}):
- # 数据库表名解析
- if re.search("[a-zA-Z_\d]+", tablename):
- tablename = re.search("[a-zA-Z_]+", tablename).group()
- logger.info(f"得到数据库表,表名:{tablename}")
- ############################# 在此修改 ############################
- df = pd.DataFrame(r[1])
- df.columns = df.values.tolist()[0]
- df.drop([0], inplace=True)
- ############################# 结束修改 ############################
- if self.draw:
- self.G.add_node(tablename)
- nodelist = [
- (uuid.uuid1(), item) for item in df.to_dict(orient ='records')
- ]
- self.G.add_nodes_from(nodelist)
- edgelist = [
- (tablename, node[0]) for node in nodelist
- ]
- self.G.add_edges_from(edgelist)
- # 创建表节点
- start_node = Node("表", name=tablename)
- graph.create(start_node)
- ############################### 在此修改 ##############################
- # 表字段修正
- df.rename(columns=coldict, inplace=True)
- df.rename(columns=aliasdict, inplace=True)
- df['tablename'] = tablename
- df['fullname'] = df['tablename'] + '_' + df['colname']
- ############################### 结束修改 ##############################
- # 别名字段必填
- if 'alias' not in df.columns:
- logger.warning(f"未找到注释字段,当前字段包含:{df.columns}")
- df['alias'] = ''
- # 迭代表字段
- for item in df.to_dict(orient ='records'):
- # 确保属性插入正常,删除非字符串键值
- for key in set(item.keys()):
- if not isinstance(key, str):
- del item[key]
- ############################# 在此修改 ############################
- # 字段名设置合并条件
- colname = item['colname']
- if not item['alias']:
- item['alias'] = tablename + '_' + colname
- ############################# 结束修改 ############################
- # 创建字段节点
- result = graph.nodes.match("字段", alias = item['alias'])
- if len(result) > 0:
- # 查询到相关字段,使用旧字段
- end_node = result.first()
- relationship = Relationship(start_node, "foreignkey", end_node, **{"name": item['colname']})
- else:
- # 未查询到相关字段,创建节点
- end_node = Node("字段", name=colname, **item)
- graph.create(end_node)
- relationship = Relationship(start_node, "has", end_node)
- # 创建表字段关系
- graph.merge(relationship)
- def draw(self):
- if self.draw:
- nx.draw(self.G, with_labels = True)
- plt.show()
- else:
- return "Draw is not enabled"
- if __name__ == '__main__':
- path = '''data/数据库设计文档.docx'''
- path = '''data/数据库设计(1).docx'''
- # path = '''data/数据库设计(2).docx'''
- # path = '''data/国家电投人才猎头智能人才库项目-数据库设计说明书.docx'''
- # path = '''data/FJS-OCR 富士通识别平台 数据库设计说明书.docx'''
- # path = '''data/中国跳水队智能辅助教练系统-国际比赛数据 数据库设计说明书.docx'''
- word = Word(path)
- word.predict()
|