123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- """
- 对 Word 中数据库设计表构建图谱
- 字段名判断依据:1、字段名称不为保留字 2、字母和下划线 3、驼峰命名 4、注意数字
- 字段类型判断:re.match(r'\w+\(\d+\)', s)
- 1、判断是否标准表 [依据]-> 判断是否包含列类型 -> 找到列
- 2、判断列名 [是]-> 判断是否符合列名规范
- 3、判断列中文名 [规则]-> 找到列中文名
- 4、判断列类型 [是]-> 判断是否符合列规范
- 5、判断主键 [未找到主键]-> 判断是否是ID
- 6、判断是否必填字段 [依据]-> 分类[是否为空|是否必填] -> [未找到字段]-> 设定默认值必填
- 7、判断列注释 [未找到注释]-> 中文名 [未找到中文名]-> 表名+列名 -> [列类型为bool、时间、流等]-> 添加UUID
- 8、字段唯一判断条件 <列名,列类型,注释>
- """
- import re
- import uuid
- import logging
- from collections import Counter
- import docx
- from docx import Document
- from docx.oxml.table import CT_Tbl
- from docx.oxml.text.paragraph import CT_P
- from docx.table import _Cell, Table
- from docx.text.paragraph import Paragraph
- import jieba
- import pandas as pd
- import networkx as nx
- import matplotlib.pyplot as plt
- # from text2vec import Similarity
- # sim = Similarity()
- from py2neo import Node, Graph, Relationship
- graph = Graph('bolt://192.168.1.150:7687/', user='neo4j', password='password', name="neo4j")
- graph.delete_all()
- # # 列类型判断
- # with open('dict/columntype.txt', 'r', encoding='utf-8') as fp:
- # COLUMN_TYPES = {i.strip() for i in fp.readlines()}
- # # 列名判断
- # with open('dict/columnname.txt', 'r', encoding='utf-8') as fp:
- # COLUMN_DICT = {i.strip(): 'name' for i in fp.readlines()}
- # # 注释判断
- # with open('dict/comment.txt', 'r', encoding='utf-8') as fp:
- # REMARK_DICT = {i.strip(): 'remark' for i in fp.readlines()}
- COLUMN_TYPES = {'int', 'bigint', 'tinyint', 'smallint', 'bigint unsigned', 'float', 'double', 'decimal', 'date', 'datetime', 'char', 'varchar', 'text', 'longtext', 'blob', 'bool', 'boolean'}
- TYPE_DICT = {'类型': 'column_type', '数据类型': 'column_type'}
- COLUMN_DICT = {'名称': 'name', '字段名': 'name', 'field name': 'name', '字段代码': 'name', '代码': 'name', '物理字段名': 'name'}
- C_NAME_DICT = {'字段中文名': 'c_name', '中文含义': 'c_name', '名字': 'c_name', '字段名称': 'c_name', '逻辑字段名': 'c_name'}
- REMARK_DICT = {'Alias': 'remark', 'description': 'remark', '说明': 'remark', '描述': 'remark', '备注': 'remark'}
- REQUIRED_DICT = {'空/非空': 'required', '可不可以为空': 'required', '是否为空': 'required', '允许空值': 'required', '是否必填': 'required', '空值': 'required'}
- PRIMARY_KEY_DICT = {'主键': 'primary_key'}
- FOREIGN_KEY_DICT = {'外键': 'foreign_key'}
- class LoggerHandler(logging.Logger):
- def __init__(self, name: str, console_handler_level: str = logging.INFO, fmt: str = '%(levelname)s: %(asctime)s: %(name)s: %(filename)s: %(lineno)d: %(funcName)s: %(message)s'):
- super().__init__(name)
- self.setLevel(logging.INFO)
- self.fmt = logging.Formatter(fmt)
- self.set_console_handler(console_handler_level)
- def set_console_handler(self, console_handler_level: str = logging.INFO) -> None:
- ch = logging.StreamHandler()
- ch.setLevel(console_handler_level)
- ch.setFormatter(self.fmt)
- self.addHandler(ch)
- logger = LoggerHandler(__name__, fmt='%(levelname)s: %(asctime)s: %(lineno)d: %(funcName)s: %(message)s')
- class Word:
- def __init__(self, path: str, draw: bool = False) -> None:
- self.draw = draw
- self.doc = Document(path)
- if draw:
- self.G = nx.Graph()
- self.namecount = dict({})
- self.all_tables = pd.DataFrame()
- def iter_block_item(self, parent):
- if isinstance(parent, docx.document.Document):
- parent_elm = parent.element.body
- elif isinstance(parent, _Cell):
- parent_elm = parent._tc
- else:
- raise ValueError("something error")
- for child in parent_elm.iterchildren():
- if isinstance(child, CT_P):
- yield Paragraph(child, parent)
- elif isinstance(child, CT_Tbl):
- yield Table(child, parent)
- def parse(self) -> tuple:
- for block in self.iter_block_item(self.doc):
- if block.style.name == 'Heading 1' and block.text:
- yield ('Heading', block.text.lower())
- elif block.style.name == 'Heading 2' and block.text:
- yield ('Heading', block.text.lower())
- elif block.style.name == 'Heading 3' and block.text:
- yield ('Heading', block.text.lower())
- elif block.style.name == 'Heading 4' and block.text:
- yield ('Heading', block.text.lower())
- elif block.style.name == 'Heading 5' and block.text:
- yield ('Heading', block.text.lower())
- elif block.style.name == 'Heading 6' and block.text:
- yield ('Heading', block.text.lower())
- elif block.style.name == 'Normal' and block.text:
- yield ('Normal', block.text.lower())
- elif block.style.name == 'Table Grid':
- tables = []
- for row in block.rows:
- rows = []
- for cell in row.cells:
- for paragraph in cell.paragraphs:
- rows.append(paragraph.text.strip().lower())
- tables.append(rows)
- yield ('Table', tables)
- elif block.style.name == 'Normal Table':
- tables = []
- for row in block.rows:
- rows = []
- for cell in row.cells:
- for paragraph in cell.paragraphs:
- rows.append(paragraph.text.strip().lower())
- tables.append(rows)
- yield ('Table', tables)
- elif block.text:
- yield ('Unknow', block)
- # def clean_table(self, raw_table):
- # table = []
- # dirty_table = []
- # while raw_table and '' in raw_table[0]:
- # raw_table = raw_table[1:]
- # # 表格预处理
- # rowslen = [len(row) for row in raw_table]
- # if not rowslen:
- # return None
- # rowlen = Counter(rowslen).most_common(1)[0][0]
- # for i,l in enumerate(rowslen):
- # if l == rowlen:
- # table.append(raw_table[i])
- # else:
- # dirty_table.append(raw_table[i])
- # return table, dirty_table
- def predict(self):
- for r in self.parse():
- if r[0] in ['Heading', 'Normal'] and r[1]:
- tablename = r[1]
- logger.debug(tablename)
- elif r[0] == 'Table':
- # table = r[1]
- # table, dirty_table = self.clean_table(r[1])
- table, dirty_table = self.get_table(r[1])
- if not table:
- continue
- # 判断表是否为需要解析的表
- if any({'fulltype', 'type'} & {self.detect_type(i) for i in table[1]}):
- ############################### 数据库表名解析 ##############################
- if re.search("[a-zA-Z_\d]+", tablename):
- table_name = re.search("[a-zA-Z_]+", tablename).group()
- try:
- table_c_name = re.search('[\u4e00-\u9fa5]{3,}', tablename).group()
- except Exception as e:
- table_c_name = "未知表"
- logger.info(f"得到数据库表,表名:{table_name}\t猜测中文表名:{table_c_name}")
- else:
- table_name = "UnknowTable"
- table_c_name = "未知表"
- ############################### 表名解析结束 ###############################
- ############################# 表字段在此修改 ############################
- df = pd.DataFrame(table)
- df.columns = df.values.tolist()[0]
- df.drop([0], inplace=True)
- # # 表字段修正
- df.rename(columns={**TYPE_DICT, **COLUMN_DICT, **C_NAME_DICT, **REMARK_DICT, **REQUIRED_DICT, **PRIMARY_KEY_DICT, **FOREIGN_KEY_DICT}, inplace=True)
- df['table_name'] = table_name
- df['table_c_name'] = table_c_name
- for i in df.columns:
- if self.detect_type(df.loc[1, i]) != 'unknow':
- df.rename(columns={i: 'column_type'}, inplace=True)
- break
- # 判断字段是否允许为空,允许为空值的不可连接
- # for i in df.columns:
- # if sim.get_score(i, '允许值为空') > 0.7:
- # df['unique'] = df[i].apply(lambda x: str(uuid.uuid1()) if x != 'n' else '')
- # break
- # elif sim.get_score(i, '必填') > 0.7:
- # df['unique'] = df[i].apply(lambda x: '' if x != 'n' else str(uuid.uuid1()))
- # break
- # if 'unique' not in df.columns:
- # print("无法判断字段是否必填")
- # df['unique'] = ''
- ############################### 必填字段判断 ###############################
- if 'required' not in df.columns:
- logger.warning("无法判断字段是否必填,设定默认值必填,所有字段皆可关联!")
- df['required'] = ''
- ############################### 必填字段判断 ###############################
- # 注释字段必填
- if 'remark' not in df.columns:
- if 'c_name' not in df.columns:
- logger.warning(f"未找到注释字段,当前字段包含:{df.columns}")
- df['remark'] = ''
- else:
- logger.warning(f"未找到注释字段,使用字段中文名代替!,当前字段包含:{df.columns}")
- df['remark'] = df['c_name']
- ############################### 指定字段不可关联 ###############################
- df['remark'] = df['remark'] + df['column_type'].apply(lambda x: str(uuid.uuid1()) if x in ['date', 'datetime', 'blob', 'text'] else '')
- df['remark'] = df['remark'] + df['name'].apply(lambda x: str(uuid.uuid1()) if x in ['create_time', 'update_time'] else '')
- df['remark'] = df.apply(lambda x: x['name'] if not x['remark'] else x['remark'], axis=1)
- ############################### 指定字段不可关联 ###############################
- ############################### 为空字段不可关联 ###############################
- pass
- ############################### 为空字段不可关联 ###############################
- if not df.query(' name in ["id", "ID", "Id"] ').empty:
- idx = df.query(' name in ["id", "ID", "Id"] ').index[0]
- df.loc[idx, 'remark'] = ''.join([table_name, '_id'])
- # 判断唯一的依据
- logger.debug(f'''remark type: {type(df.loc[1, 'remark'])}''')
- logger.debug(f'''column_type type: {type(df.loc[1, 'column_type'])}''')
- df['vec'] = df['name'] + '_' + df['column_type'] + '_' + df['remark']
- # ############################### 表字段结束修改 ##############################
- # if self.draw:
- # self.G.add_node(table_name)
- # nodelist = [
- # (uuid.uuid1(), item) for item in df.to_dict(orient ='records')
- # ]
- # self.G.add_nodes_from(nodelist)
- # edgelist = [
- # (table_name, node[0]) for node in nodelist
- # ]
- # self.G.add_edges_from(edgelist)
- # 创建表节点
- result = graph.nodes.match("表", name = table_name, c_name=table_c_name)
- if len(result) > 0:
- start_node = result.first()
- else:
- start_node = Node("表", name=table_name, c_name=table_c_name)
- graph.create(start_node)
- # 迭代表字段
- for item in df.to_dict(orient ='records'):
- # 确保属性插入正常,删除非字符串键值
- for key in set(item.keys()):
- if not isinstance(key, str):
- del item[key]
- ############################# 在此修改 ############################
- # 字段名设置合并条件
- name = item['name']
- if not item['vec']:
- item['vec'] = table_name + '_' + name
- ############################# 结束修改 ############################
- # 创建字段节点
- result = graph.nodes.match("字段", vec = item['vec'])
- if len(result) > 0:
- # 查询到相关字段,使用旧字段
- end_node = result.first()
- relationship = Relationship(start_node, "related", end_node, **{"name": item['name']})
- else:
- # 未查询到相关字段,创建节点
- end_node = Node("字段", **item)
- graph.create(end_node)
- relationship = Relationship(start_node, "has", end_node)
- # 创建表字段关系
- graph.merge(relationship)
- else:
- print("非标准表格", table)
- else:
- print(r)
- print(self.all_tables.columns)
- print(self.all_tables)
- def detect_type(self, text: str):
- fulltype = re.match(r'(\w+)\(\d+\)', text)
- if fulltype and (fulltype.group(1) in COLUMN_TYPES):
- return 'fulltype'
- elif text in COLUMN_TYPES:
- return 'type'
- else:
- return 'unknow'
-
- def get_table(self, raw_table):
- table = []
- dirty_table = []
- has_head = False
- for row in raw_table:
- if has_head:
- table.append(row)
- elif set(row) & set({**TYPE_DICT, **COLUMN_DICT, **C_NAME_DICT, **REMARK_DICT, **REQUIRED_DICT, **FOREIGN_KEY_DICT}.keys()):
- head = row
- has_head = True
- else:
- dirty_table.append(row)
- # for row in raw_table:
- # if get_head:
- # table.append(row)
- # continue
- # for col in row:
- # fulltype = re.match(r'(\w+)\(\d+\)', col)
- # if fulltype and (fulltype.group(1) in COLUMN_TYPES):
- # table.append(row)
- # get_head = True
- # break
- # elif col in COLUMN_TYPES:
- # table.append(row)
- # get_head = True
- # break
- # else:
- # head = row
- if table and (len(head) == len(table[0])) and (len(Counter([len(_) for _ in table]).keys()) == 1):
- table.insert(0, head)
- return table, dirty_table
- else:
- return None, dirty_table
- def draw(self):
- if self.draw:
- nx.draw(self.G, with_labels = True)
- plt.show()
- else:
- return "Draw is not enabled"
- if __name__ == '__main__':
- # path = '''data/数据库设计说明书.docx'''
- path = '''data/数据库设计文档.docx'''
- path = '''data/数据库设计(1).docx'''
- path = '''data/数据库设计(2).docx'''
- path = '''data/国家电投人才猎头智能人才库项目-数据库设计说明书.docx'''
- path = '''data/FJS-OCR 富士通识别平台 数据库设计说明书.docx'''
- path = '''data/中国跳水队智能辅助教练系统-国际比赛数据 数据库设计说明书.docx'''
- path = '''data/租房查询系统_数据库设计说明书_2.0.docx'''
- path = '''data/url-ukWkMKhnRgCvxVZt.docx'''
- path = '''data/url-qqp17mI32jTyozQt.docx'''
- path = '''data/电商-数据库详细设计说明书V0.4.docx'''
- word = Word(path)
- word.predict()
|