# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2024-06-11 13:43:14 # @Last Modified by: privacy # @Last Modified time: 2024-09-25 17:34:06 # 标准包导入 import os import re import json from io import BytesIO from pprint import pprint from typing import Optional, List # 第三方包导入 import cv2 import numpy as np import pandas as pd from pdfminer.high_level import extract_pages from pdfminer.layout import LTRect, LTTextBoxHorizontal, LTLine, LTFigure, LTCurve, LTImage, LTChar from pdfminer.pdfcolor import LITERAL_DEVICE_CMYK from pdfminer.pdftypes import ( LITERALS_DCT_DECODE, LITERALS_JBIG2_DECODE, LITERALS_JPX_DECODE, LITERALS_FLATE_DECODE, ) from pdfminer.pdfparser import PDFParser, PDFSyntaxError from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines import pdfplumber import camelot # 自定义包导入 from celery_tasks.tools import RefPageNumberResolver PIL_ERROR_MESSAGE = "No module named 'PIL', please run 'pip install pillow'" HEADERS = set({'序号', '项目编码', '项目名称', '项目特征', '单位', '工程量', '全费用综合单价', '合价', '备注', '主材名称', '规格型号', '不低于下列同档次品牌', '投标选用品牌及规格型号', '名称', '事项', '数量', '含税单价(元)', '含税合价(元)', '条款号', '评分因素', '评分标准', '页码'}) pattern_1 = re.compile(r'^\d(\d*\.?\d*)+\d(%)?') pattern_2 = re.compile(r'^第[一二三四五六七八九十]+|^[一二三四五六七八九十\d]+、|^[\(\(][一二三四五六七八九十]+[\)\)]') pattern_3 = re.compile(r'^附录|^参考文献|^附表|附件[一二三四五六七八九十\d]+') def is_title(line: str) -> bool: """ 判断某行文本释放为标题 Args: line: 文本行 Results: 是否是标题 """ # if re.fullmatch(r'^\d(\d*\.?\d*)+\d(%)?', line.strip()): if pattern_1.fullmatch(line.strip()): return False # title_word = re.findall('^[(\(][一二三四五六七八九十]+[\))]|^\d\.|^1\d\.|^2\d\.|^[第][一二三四五六七八九十\d]+[章节条]|[一二三四五六七八九十]+[、要是]', line.strip()) title_word = pattern_2.findall(line.strip()) if title_word: return True title_word = pattern_3.findall(line.strip()) if title_word: return True return False def export_image(image: LTImage, path: str) -> str: """Save an LTImage to disk""" (width, height) = image.srcsize filters = image.stream.get_filters() if len(filters) == 1 and filters[0][0] in LITERALS_DCT_DECODE: name = _save_jpeg(image, path) return name elif len(filters) == 1 and filters[0][0] in LITERALS_JPX_DECODE: name = _save_jpeg2000(image, path) return name data = image.stream.get_data() raw_data = image.stream.get_rawdata() if data: if data[:2] == b'\xff\xd8' and data[-2:] == b'\xff\xd9': path += '.jpg' with open(path, 'wb') as file: file.write(data) return path elif data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a': path += '.png' with open(path, 'wb') as file: file.write(data) return path elif data[:2] == b'\x42\x4d': path += '.bmp' with open(path, 'wb') as file: file.write(data) return path elif data[:6] == b'\x47\x49\x46\x38\x37\x61' or data[:6] == b'\x47\x49\x46\x38\x39\x61': path += '.gif' with open(path, 'wb') as file: file.write(data) return path elif data[:2] == b'\x4d\x4d' or data[:2] == b'\x49\x49': path += '.tiff' with open(path, 'wb') as file: file.write(data) return path elif data[:8] == b'\xffO\xffQ\x00/\x00\x00': name = _save_j2k(image, path) return name else: path += '.unk' with open(path, 'wb') as file: file.write(data) return path elif raw_data: if raw_data[:2] == b'\xff\xd8' and raw_data[-2:] == b'\xff\xd9': path += '.jpg' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a': path += '.png' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:2] == b'\x42\x4d': path += '.bmp' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:6] == b'\x47\x49\x46\x38\x37\x61' or raw_data[:6] == b'\x47\x49\x46\x38\x39\x61': path += '.gif' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:2] == b'\x4d\x4d' or raw_data[:2] == b'\x49\x49': path += '.tiff' with open(path, 'wb') as file: file.write(raw_data) return path else: path += '.unk' with open(path, 'wb') as file: file.write(raw_data) return path else: return None def _save_j2k(image: LTImage, path: str) -> str: try: from PIL import Image except ImportError: raise ImportError(PIL_ERROR_MESSAGE) path = path + ".png" data = image.stream.get_data() assert data is not None byte_stream = BytesIO(data) roiImg = Image.open(byte_stream) roiImg.save(path) return path def _save_jpeg(image: LTImage, path: str) -> str: """Save a JPEG encoded image""" raw_data = image.stream.get_rawdata() assert raw_data is not None path = path + ".jpg" with open(path, "wb") as fp: if LITERAL_DEVICE_CMYK in image.colorspace: try: from PIL import Image, ImageChops # type: ignore[import] except ImportError: raise ImportError(PIL_ERROR_MESSAGE) ifp = BytesIO(raw_data) i = Image.open(ifp) i = ImageChops.invert(i) i = i.convert("RGB") i.save(fp, "JPEG") else: fp.write(raw_data) return path def _save_jpeg2000(image: LTImage, path: str) -> str: """Save a JPEG 2000 encoded image""" raw_data = image.stream.get_rawdata() assert raw_data is not None path = path + ".png" try: from PIL import Image # type: ignore[import] except ImportError: raise ImportError(PIL_ERROR_MESSAGE) # 如果我们只写原始数据,我尝试过的大多数图像程序都无法打开文件。 # 然而,使用OpenCV2打开和保存会生成一个文件,该文件似乎很容易被其他程序打开 ifp = BytesIO(raw_data) i = Image.open(ifp) opencv_image = cv2.cvtColor(np.array(i), cv2.COLOR_RGB2BGR) cv2.imwrite(path, opencv_image) return path def _save_bmp(image: LTImage, width: int, height: int, bytes_per_line: int, bits: int, path: str) -> str: """Save a BMP encoded image""" data = image.stream.get_data() path = path + ".bmp" with open(path, "wb") as fp: fp.write(data) return path def table_parse(pdf_path: str, title_path: str, start_title: str = '六、已标价工程量清单', end_title: str = '七、施工组织设计', table_path: str = 'table.json', start_page_number: int = None, end_page_number: int = None) -> list: """pdf表格解析功能 @pdf_path @title_path @start_title @end_title @table_path @start_page_number @end_page_number """ tables = [] if (start_page_number == None) or (end_page_number == None): df = pd.read_json(title_path) start_page_number = df[df['text'] == start_title].page_number.max() end_page_number = df[df['text'] == end_title].page_number.max() def concat_table(tables, table): """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表 @tables @table """ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]] tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]] if len(table) > 1: second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]] # pprint(first) if len(HEADERS & set(first)) > 2: # pprint("找到大量表头元素,判断为独立表头,生成新表!") tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1}) elif ((i-1) in tables[-1]['page_numbers']) and (len(first) == tables[-1]['col_len']): # pprint("有空列,不是单独表,直接合并") tables[-1]['page_numbers'].append(i) tables[-1]['table'].extend(table) else: tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0}) return tables with pdfplumber.open(pdf_path) as pdf: for i in range(start_page_number, end_page_number): for table in pdf.pages[i].extract_tables(): tables = concat_table(tables, table) with open(table_path, 'w', encoding='utf-8') as fp: json.dump(tables, fp, indent=4, ensure_ascii=False) return tables class PdfExtractAttr(object): def __init__(self, file_path: str): """ PDF文件解析 Args: file_path: PDF文件路径 """ super(PdfExtractAttr, self).__init__() self.file_path = file_path self.tables = [] self.content = [] self.chapters = [] self.references = [] self.detail_df = None self.outlines = None self.left = 0 self.right = 0 def can_merge_lines(self, line1: LTTextBoxHorizontal, line2: LTTextBoxHorizontal) -> bool: """判断两行文本是否可以合并为一段 """ # line1 已结束 或 line2 有缩进 if (line1.x1 < self.right) or (line2.x0 > self.left): return False else: return True def parse_title(self, title_path: Optional[str] = None) -> list: """ 标题解析,用于报价唯一 Args: title_path: 保存路径 Returns: results: 标题列表 """ results = [] seq_num = 0 for page_number, page_layout in enumerate(extract_pages(self.file_path)): title_index = 0 for element in page_layout: if isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1: text = element.get_text().strip() if text and (is_title(text) or element.height > 15): results.append({ 'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text, 'title': text, 'seq_num': seq_num }) seq_num += 1 title_index += 1 if title_path: with open(title_path, 'w', encoding='utf-8') as fp: json.dump(results, fp, indent=4, ensure_ascii=False) return results def parse_image(self, image_dir: str, image_meta_path: Optional[str] = None) -> List[dict]: """ 解析PDF中的图片 Args: image_dir: 解析目录 Returns: image_list: 图片列表 """ image_list = [] for page_number, page_layout in enumerate(extract_pages(self.file_path)): image_index = 0 for element in page_layout: if isinstance(element, LTFigure): for e_obj in element._objs: if isinstance(e_obj, LTImage): # 提取图片数据 image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}') image_file = export_image(e_obj, image_file) image_list.append({ "image_index": image_index, "page_number": page_number, "image_name": image_file }) image_index += 1 if image_meta_path: with open(image_meta_path, 'w', encoding='utf-8') as fp: json.dump(image_list, fp, indent=4, ensure_ascii=False) return image_list def main_parse(self, title_path: str = None, section_path: str = None) -> None: """解析PDF 参数: - title_path: str, 标题保存路径 - sections_path: str, 正文保存目录 """ self.outlines['text'] = '' # 标题 title_list = [] # 读取PDF文件并提取页面 for page_number, page_layout in enumerate(extract_pages(self.file_path)): max_start_row = self.outlines.query(f''' page_number <= {page_number+1} ''').query(''' page_number == page_number.max() ''').query(''' level == level.max() ''') if not max_start_row.empty: idx = max_start_row.index.values[0] else: idx = len(self.outlines.index) self.outlines.loc[idx] = {'level': 6, 'title': '', 'page_number': 0, 'text': ''} # 左侧坐标 x0s = [] # 右侧坐标 x1s = [] title_index = 0 for element in page_layout: if isinstance(element, LTTextBoxHorizontal): x0s.append(element.x0) x1s.append(element.x1) if x0s and x1s: # 左侧边缘 self.left = min(x0s) + 15 # 右侧边缘 self.right = max(x1s) - 15 current = None for element in page_layout: if isinstance(element, LTLine): pass elif isinstance(element, LTRect): pass elif isinstance(element, LTTextBoxHorizontal): # 文本 text = element.get_text().strip() # 假设标题通常是一行且字体较大 if len(element._objs) == 1 and text and (is_title(text) or element.height > 15): title_list.append({'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text}) title_index += 1 self.outlines.at[idx, 'text'] += '\n' self.outlines.at[idx, 'text'] += text # 正文部分 elif not current or self.can_merge_lines(current, element): # 可以合并 current = element for line in element: self.outlines.at[idx, 'text'] += line.get_text().strip() else: # 不可以合并 for line in element: self.outlines.at[idx, 'text'] += '\n' self.outlines.at[idx, 'text'] += line.get_text().strip() if title_path: with open(title_path, 'w', encoding='utf-8') as fp: json.dump(title_list, fp, indent=4, ensure_ascii=False) if section_path: self.outlines.to_json(section_path, orient='records', lines=True, force_ascii=False) return tilte_list def extract_toc(self) -> list: """PDF大纲解析,依据内容解析 """ results = [] for page_number, page in enumerate(extract_pages(self.file_path)): is_outline = False if page_number < 1: continue if page_number > 20: break lines = [] for element in page: if isinstance(element, LTTextBoxHorizontal): for line in element: lines.append(line.get_text().strip()) for line in lines: # 检查是否符合目录格式 if line and '......' in line and (line[0].isdigit() or '\u4e00' <= line[0] <= '\u9fff') and line[-1].isdigit(): is_outline = True # 计算缩进级别 indent_level = 1 # 获取内容 title = re.findall(r'^[\d\.、]{0,}[\u4e00-\u9fff、()\s]+', line).pop() # 计算页码 page_n = int(re.findall(r'\d+$', line).pop()) # 添加到目录结构中 results.append({ "level": indent_level, "title": title, "page_number": page_n }) if not is_outline: break return results def extract_content(self, content_path: str = None) -> list: self.content = [] with pdfplumber.open(self.file_path) as pdf: for page in pdf.pages: self.content.append({ 'page_number': page.page_number - 1, 'text': page.extract_text() }) for i in range(1, 5): if len(re.findall(r'\.\.\.', self.content[i]['text'])) > 10: self.content[i]['text'] = '目录' if content_path: with open(content_path, 'w', encoding='utf-8') as fp: json.dump(self.content, fp, indent=4, ensure_ascii=False) return self.content def parse_outline(self, outline_path: str = None) -> list: """PDF大纲解析,依据元数据解析,解析失败则调用内容解析 """ results = [] with open(self.file_path, "rb") as fp: try: parser = PDFParser(fp) document = PDFDocument(parser) ref_pagenum_resolver = RefPageNumberResolver(document) outlines = document.get_outlines() for (level, title, dest, a, se) in outlines: if dest: page_num = ref_pagenum_resolver.resolve(dest) elif a: page_num = ref_pagenum_resolver.resolve(a) elif se: page_num = ref_pagenum_resolver.resolve(se) else: page_num = None results.append({'level': level, 'title': title, 'page_number': page_num}) except PDFNoOutlines: print("No outlines found.") except PDFSyntaxError: print("Corrupted PDF or non-PDF file.") finally: parser.close() if not results: results = self.extract_toc() if outline_path: with open(outline_path, 'w', encoding='utf-8') as op: json.dump(results, op, indent=4, ensure_ascii=False) self.outlines = pd.DataFrame(results) return results def parse_text(self, text_path: Optional[str] = None) -> List[dict]: """文本解析 Args: text_path: 文本保存地址 Returns: 文本块 """ seq_num = -1 text_line = [] for page_number, page_layout in enumerate(extract_pages(self.file_path)): title_index = 0 for element in page_layout: if isinstance(element, LTTextBoxHorizontal): # 距离左侧 left = element.x0 # 距离右侧 right = (page_layout.width - element.x1) # # 距离上侧 # top = (page_layout.height - element.y1) # # 距离下侧 # button = element.y0 # 文本宽度 width = element.width if (left > right) and (abs(left - right) > 100): alignment = 'right' elif (left > 100) and (abs(left - right) < 50) and ((abs(left - right) / width) < 0.5): alignment = 'center' else: alignment = 'left' text = element.get_text().strip() # 判断是否为标题 if text and (is_title(text) or element.height > 15) and (len(element._objs) == 1): title_index += 1 seq_num += 1 text_type = True else: text_type = False # 判断是否为表名 if text and (text.startswith(('表', '清单')) or text.endswith(('表', '清单'))): is_table_name = True else: is_table_name = False text_line.append({ 'page_number': page_number, 'seq_num': seq_num, 'index': element.index, 'title_index': title_index, 'text': text, 'is_title': text_type, 'lines': len(element._objs), 'is_table_name': is_table_name, 'x0': element.bbox[0], 'y0': element.bbox[1], 'x1': element.bbox[2], 'y1': element.bbox[3], 'alignment': alignment, }) if text_path: with open(text_path, 'w', encoding='utf-8') as fp: json.dump(text_line, fp, indent=4, ensure_ascii=False) self.detail_df = pd.DataFrame(text_line) return text_line def concat_table(self, table: list, page_number: int, table_name: str = None, new: bool = False) -> None: """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表 @table """ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]] if new: self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""}) return # tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]] if len(table) > 1: second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]] else: second = None # pprint(first) if not self.tables or len(HEADERS & set(first)) > 2: # pprint("找到大量表头元素,判断为独立表头,生成新表!") self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""}) elif second and (len(HEADERS & set(second)) > 2): # pprint("找到大量表头元素,判断为独立表头,生成新表!") if not table_name: first = [i for i in first if i] if len(first) == 1: table_name = "".join(first) self.tables.append({"page_numbers": [page_number], "title_len": len(second), "col_len": len(table[-1]), "table": table[1:], "confidence": 1, "table_name": table_name if table_name else ""}) elif ((page_number - 1) in self.tables[-1]['page_numbers']) and (len(first) == self.tables[-1]['col_len']): # pprint("有空列,不是单独表,直接合并") self.tables[-1]['page_numbers'].append(page_number) self.tables[-1]['table'].extend(table) else: self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0, "table_name": table_name if table_name else ""}) def parse_table_pro(self, table_path: str = 'all_tables.json') -> None: """表格解析 """ self.tables = [] if self.detail_df is None: self.parse_text() with pdfplumber.open(self.file_path) as pdf: for page_number, page_layout in enumerate(pdf.pages): # 查询是否存在表格 tables = page_layout.find_tables() if not tables: continue tables_pro = camelot.read_pdf( self.file_path, # flavor='stream', pages=str(page_number + 1), # edge_tol=200, ) if not tables_pro: continue print(len(tables), len(tables_pro)) # 检测到该页面存在一个表格,对其进行合并判断 if (len(tables) != 0) and (len(tables_pro) == 1): print(f"解析PDF{page_number}页的表格") table = tables[0] table_pro = tables_pro[0].df.to_dict(orient='split')['data'] x0, y0, x1, y1 = table.bbox table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True ''') if table_title_df.empty: self.concat_table(table_pro, page_number=page_number) else: table_title_name = table_title_df.iloc[0]['text'] self.concat_table(table_pro, page_number=page_number, table_name=table_title_name) table = tables[0] # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干 elif len(tables_pro) > 1: print(f"解析PDF{page_number}页的表格") first_table = tables_pro[0] self.concat_table(first_table.df.to_dict(orient='split')['data'], page_number=page_number) for table_index in range(1, len(tables_pro)): self.concat_table(tables_pro[table_index].df.to_dict(orient='split')['data'], page_number=page_number, new=True) if table_path: with open(table_path, 'w', encoding='utf-8') as fp: json.dump(self.tables, fp, indent=4, ensure_ascii=False) return self.tables if __name__ == '__main__': # pdf_path = './投标文件-修改版9-5-1-1.pdf' # pdf_path = './2022年度工程类-公招采购资料/三峡左岸及地下电站地坪整治/三峡左岸及地下电站地坪整治招标文件(发售版).pdf' # table_path = './2022年度工程类-公招采购资料/三峡左岸及地下电站地坪整治/三峡左岸及地下电站地坪整治招标文件(发售版)-table.json' pdf_path = '''D:/desktop/三峡水利/data/0预审查初审详审测试数据/水电站物理场应用研究及关键设备物理场在线监测应用开发设备采购/水电站物理场应用研究及关键设备物理场在线监测应用开发设备采购.pdf''' text_path = '''D:/desktop/三峡水利/data/0预审查初审详审测试数据/水电站物理场应用研究及关键设备物理场在线监测应用开发设备采购/水电站物理场应用研究及关键设备物理场在线监测应用开发设备采购-text.json''' table_path = '''D:/desktop/三峡水利/data/0预审查初审详审测试数据/水电站物理场应用研究及关键设备物理场在线监测应用开发设备采购/水电站物理场应用研究及关键设备物理场在线监测应用开发设备采购-table.json''' agent = PdfExtractAttr(file_path=pdf_path) agent.parse_text(text_path=text_path) agent.parse_table_pro(table_path=table_path)