'''招标文件内容提取''' import pandas as pd import numpy as np import pdfplumber import json import os import re import cv2 from io import BytesIO from pdfminer.layout import LTRect, LTTextBoxHorizontal, LTLine, LTFigure, LTCurve, LTImage, LTChar from pdfminer.high_level import extract_pages from pdfminer.pdfcolor import LITERAL_DEVICE_CMYK from pdfminer.pdftypes import ( LITERALS_DCT_DECODE, LITERALS_JBIG2_DECODE, LITERALS_JPX_DECODE, LITERALS_FLATE_DECODE, ) from pprint import pprint from pdfminer.pdfparser import PDFParser, PDFSyntaxError from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines import pdfplumber import camelot from .tools import RefPageNumberResolver HEADERS = set({'序号', '项目编码', '项目名称', '项目特征', '单位', '工程量', '全费用综合单价', '合价', '备注', '主材名称', '规格型号', '不低于下列同档次品牌', '投标选用品牌及规格型号', '名称', '事项', '数量', '含税单价(元)', '含税合价(元)', '条款号', '评分因素', '评分标准', '页码'}) HEADERS |= set({'条款号' ,'评审因素' ,'评审标准', ''}) def is_title(line: str) -> bool: title_word = re.findall('^[(\(][一二三四五六七八九十]+[\))]|^\d\.|^1\d\.|^2\d\.|^[第][一二三四五六七八九十\d]+[章节条]|[一二三四五六七八九十]+[、要是]', line.strip()) if title_word: return True title_word = re.findall('^附录|^参考文献|^附表', line.strip()) if title_word: return True return False PIL_ERROR_MESSAGE = "PIL导入错误" def _save_jpeg(image: LTImage, path: str) -> str: """Save a JPEG encoded image""" raw_data = image.stream.get_rawdata() assert raw_data is not None path = path + ".jpg" with open(path, "wb") as fp: if LITERAL_DEVICE_CMYK in image.colorspace: try: from PIL import Image, ImageChops # type: ignore[import] except ImportError: raise ImportError(PIL_ERROR_MESSAGE) ifp = BytesIO(raw_data) i = Image.open(ifp) i = ImageChops.invert(i) i = i.convert("RGB") i.save(fp, "JPEG") else: fp.write(raw_data) return path def _save_jpeg2000(image: LTImage, path: str) -> str: """Save a JPEG 2000 encoded image""" raw_data = image.stream.get_rawdata() assert raw_data is not None path = path + ".png" try: from PIL import Image # type: ignore[import] except ImportError: raise ImportError(PIL_ERROR_MESSAGE) # 如果我们只写原始数据,我尝试过的大多数图像程序都无法打开文件。 # 然而,使用OpenCV2打开和保存会生成一个文件,该文件似乎很容易被其他程序打开 ifp = BytesIO(raw_data) i = Image.open(ifp) opencv_image = cv2.cvtColor(np.array(i), cv2.COLOR_RGB2BGR) cv2.imwrite(path, opencv_image) return path def export_image(image: LTImage, path: str) -> str: """Save an LTImage to disk""" (width, height) = image.srcsize filters = image.stream.get_filters() if len(filters) == 1 and filters[0][0] in LITERALS_DCT_DECODE: name = _save_jpeg(image, path) return name elif len(filters) == 1 and filters[0][0] in LITERALS_JPX_DECODE: name = _save_jpeg2000(image, path) return name data = image.stream.get_data() raw_data = image.stream.get_rawdata() if data: if data[:2] == b'\xff\xd8' and data[-2:] == b'\xff\xd9': path += '.jpg' with open(path, 'wb') as file: file.write(data) return path elif data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a': path += '.png' with open(path, 'wb') as file: file.write(data) return path elif data[:2] == b'\x42\x4d': path += '.bmp' with open(path, 'wb') as file: file.write(data) return path elif data[:6] == b'\x47\x49\x46\x38\x37\x61' or data[:6] == b'\x47\x49\x46\x38\x39\x61': path += '.gif' with open(path, 'wb') as file: file.write(data) return path elif data[:2] == b'\x4d\x4d' or data[:2] == b'\x49\x49': path += '.tiff' with open(path, 'wb') as file: file.write(data) return path else: path += '.unk' with open(path, 'wb') as file: file.write(data) return path elif raw_data: if raw_data[:2] == b'\xff\xd8' and raw_data[-2:] == b'\xff\xd9': path += '.jpg' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a': path += '.png' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:2] == b'\x42\x4d': path += '.bmp' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:6] == b'\x47\x49\x46\x38\x37\x61' or raw_data[:6] == b'\x47\x49\x46\x38\x39\x61': path += '.gif' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:2] == b'\x4d\x4d' or raw_data[:2] == b'\x49\x49': path += '.tiff' with open(path, 'wb') as file: file.write(raw_data) return path else: path += '.unk' with open(path, 'wb') as file: file.write(raw_data) return path else: return None def main_parse(pdf_path: str, title_path: str, image_dir: str) -> None: texts = [] images = [] # 读取PDF文件并提取页面 for page_number, page_layout in enumerate(extract_pages(pdf_path)): title_index = 0 image_index = 0 for element in page_layout: if isinstance(element, LTLine): pass elif isinstance(element, LTRect): pass elif isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1: text = element.get_text().strip() # # 假设标题通常是一行且字体较大 if text and (is_title(text) or element.height > 15): texts.append({'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text}) title_index += 1 # elif isinstance(element, LTFigure): # for e_obj in element._objs: # if isinstance(e_obj, LTImage): # # 提取图片数据 # image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}') # image_file = export_image(e_obj, image_file) # images.append(image_file) # pprint(f'Image saved: {image_file}') # image_index += 1 with open(title_path, 'w', encoding='utf-8') as fp: json.dump(texts, fp, indent=4, ensure_ascii=False) return title_path,image_dir from typing import Optional, List def parse_title(file_path: str, title_path: Optional[str] = None) -> list: """ 标题解析,用于报价唯一 Args: title_path: 保存路径 Returns: results: 标题列表 """ results = [] seq_num = 0 for page_number, page_layout in enumerate(extract_pages(file_path)): title_index = 0 for element in page_layout: if isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1: text = element.get_text().strip() if text and (is_title(text) or element.height > 15): results.append({ 'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text, 'title': text, 'seq_num': seq_num }) seq_num += 1 title_index += 1 if title_path: with open(title_path, 'w', encoding='utf-8') as fp: json.dump(results, fp, indent=4, ensure_ascii=False) return title_path def parse_image(file_path: str, image_dir: str, image_meta_path: str) -> List[dict]: """ 解析PDF中的图片 Args: image_dir: 解析目录 Returns: image_list: 图片列表 """ image_list = [] for page_number, page_layout in enumerate(extract_pages(file_path)): image_index = 0 for element in page_layout: if isinstance(element, LTFigure): for e_obj in element._objs: if isinstance(e_obj, LTImage): # 提取图片数据 image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}') image_file = export_image(e_obj, image_file) image_list.append({ "image_index": image_index, "page_number": page_number, "image_name": image_file }) image_index += 1 if image_meta_path: with open(image_meta_path, 'w', encoding='utf-8') as fp: json.dump(image_list, fp, indent=4, ensure_ascii=False) return image_meta_path def table_parse(pdf_path: str, title_path: str, start_title: str = '第三章 评标办法(综合评估法)', end_title: str = '第四章 合同条款及格式', table_path: str = None, start_page_number: int = None, end_page_number: int = None ) -> list: """pdf表格解析功能 @pdf_path @title_path @start_title @end_title @table_path @start_page_number @end_page_number """ tables = [] if (start_page_number == None) or (end_page_number == None): df = pd.read_json(title_path) start_page_number = df[df['text'] == start_title].page_number.max() end_page_number = df[df['text'] == end_title].page_number.max() def concat_table(tables, table): """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表 @tables @table """ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]] tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]] if len(table) > 1: second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]] # pprint(first) if len(HEADERS & set(first)) > 2: # pprint("找到大量表头元素,判断为独立表头,生成新表!") tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1}) elif ((i-1) in tables[-1]['page_numbers']) and (len(first) == tables[-1]['col_len']): # pprint("有空列,不是单独表,直接合并") tables[-1]['page_numbers'].append(i) tables[-1]['table'].extend(table) else: tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0}) return tables with pdfplumber.open(pdf_path) as pdf: print(start_page_number, end_page_number) for i in range(start_page_number, end_page_number): for table in pdf.pages[i].extract_tables(): tables = concat_table(tables, table) with open(table_path, 'w', encoding='utf-8') as fp: json.dump(tables, fp, indent=4, ensure_ascii=False) return table_path class PdfExtractAttr_(object): def __init__(self, file_path: str): """PDF文件解析 @file_path """ super(PdfExtractAttr_, self).__init__() self.file_path = file_path self.details = [] self.tables = [] self.content = [] self.chapters = [] self.references = [] self.detail_df = None self.outlines = None def parse_outline(self): """PDF大纲解析 """ results = [] with open(self.file_path, "rb") as fp: try: parser = PDFParser(fp) document = PDFDocument(parser) ref_pagenum_resolver = RefPageNumberResolver(document) outlines = document.get_outlines() for (level, title, dest, a, se) in outlines: if dest: page_num = ref_pagenum_resolver.resolve(dest) elif a: page_num = ref_pagenum_resolver.resolve(a) elif se: page_num = ref_pagenum_resolver.resolve(se) else: page_num = None results.append({'level': level, 'title': title, 'page_number': page_num}) except PDFNoOutlines: print("No outlines found.") except PDFSyntaxError: print("Corrupted PDF or non-PDF file.") finally: parser.close() with open('outlines.json', 'w', encoding='utf-8') as op: json.dump(results, op, indent=4, ensure_ascii=False) # print(results) def extract_content(self, content_path: str = None) -> list: with pdfplumber.open(self.file_path) as pdf: for page in pdf.pages: self.content.append({ 'page_number': page.page_number - 1, 'text': page.extract_text() }) with open(content_path, 'w', encoding='utf-8') as fp: json.dump(self.content, fp, indent=4, ensure_ascii=False) return content_path def parse_text(self) -> None: """文本解析 """ for page_number, page_layout in enumerate(extract_pages(self.file_path)): for element in page_layout: if isinstance(element, LTTextBoxHorizontal): # 距离左侧 left = element.x0 # 距离右侧 right = (page_layout.width - element.x1) # 距离上侧 top = (page_layout.height - element.y1) # 距离下侧 button = element.y0 # 文本宽度 width = element.width if (left > right) and (abs(left - right) > 100): alignment = 'right' elif (left > 100) and (abs(left - right) < 50) and ((abs(left - right) / width) < 0.5): alignment = 'center' else: alignment = 'left' self.details.append({ 'page_number': page_number, 'index': element.index, 'x0': element.bbox[0], 'y0': element.bbox[1], 'x1': element.bbox[2], 'y1': element.bbox[3], 'alignment': alignment, 'lines': len(element._objs), 'text': element.get_text().strip(), 'is_table_name': element.get_text().strip().endswith('表') }) self.detail_df = pd.DataFrame(self.details) def concat_table(self, table: list, page_number: int, table_name: str = None, new: bool = False) -> None: """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表 @table """ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]] if new: self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""}) return tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]] if len(table) > 1: second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]] else: second = None # pprint(first) if not self.tables or len(HEADERS & set(first)) > 2: # pprint("找到大量表头元素,判断为独立表头,生成新表!") self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""}) elif second and (len(HEADERS & set(second)) > 2): # pprint("找到大量表头元素,判断为独立表头,生成新表!") if not table_name: first = [i for i in first if i] if len(first) == 1: table_name = "".join(first) self.tables.append({"page_numbers": [page_number], "title_len": len(second), "col_len": len(table[-1]), "table": table[1:], "confidence": 1, "table_name": table_name if table_name else ""}) elif ((page_number-1) in self.tables[-1]['page_numbers']) and (len(first) == self.tables[-1]['col_len']): # pprint("有空列,不是单独表,直接合并") self.tables[-1]['page_numbers'].append(page_number) self.tables[-1]['table'].extend(table) else: self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0, "table_name": table_name if table_name else ""}) def parse_table(self) -> None: """表格解析 """ with pdfplumber.open(self.file_path) as pdf: for page_number, page_layout in enumerate(pdf.pages): # 查询是否存在表格 tables = page_layout.find_tables() # 检测到该页面存在一个表格,对其进行合并判断 if len(tables) == 1: table = tables[0] x0, y0, x1, y1 = table.bbox table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''') if table_title_df.empty: self.concat_table(table.extract(), page_number=page_number) else: table_title_name = table_title_df.iloc[0]['text'] self.concat_table(table.extract(), page_number=page_number, table_name=table_title_name) table = tables[0] #self.concat_table(table.extract(), table_title_name) # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干 elif len(tables) > 1: pass def parse_table_pro(self, table_path: str = 'all_tables.json') -> str: """表格解析 """ if self.detail_df == None: self.parse_text() with pdfplumber.open(self.file_path) as pdf: for page_number, page_layout in enumerate(pdf.pages): # 查询是否存在表格 tables = page_layout.find_tables() if not tables: continue tables_pro = camelot.read_pdf( self.file_path, # flavor='stream', pages=str(page_number+1), # edge_tol=200, ) if not tables_pro: continue print(len(tables), len(tables_pro)) # 检测到该页面存在一个表格,对其进行合并判断 if (len(tables) != 0) and (len(tables_pro) == 1): print(f"解析PDF{page_number}页的表格") # print(f"解析PDF{page_number}页的表格") table = tables[0] table_pro = tables_pro[0].df.to_dict(orient='split')['data'] x0, y0, x1, y1 = table.bbox table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''') if table_title_df.empty: self.concat_table(table_pro, page_number=page_number) else: table_title_name = table_title_df.iloc[0]['text'] self.concat_table(table_pro, page_number=page_number, table_name=table_title_name) table = tables[0] # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干 elif len(tables_pro) > 1: print(f"解析PDF{page_number}页的表格") first_table = tables_pro[0] self.concat_table(first_table.df.to_dict(orient='split')['data'], page_number=page_number) for table_index in range(1, len(tables_pro)): self.concat_table(tables_pro[table_index].df.to_dict(orient='split')['data'], page_number=page_number, new=True) with open(table_path, 'w', encoding='utf-8') as fp: json.dump(self.tables, fp, indent=4, ensure_ascii=False) return table_path if __name__ == '__main__': # pdf_path = 'data/预审查数据/基于物联网技术的三峡坝区智慧仓储研究与建设招标文件-发出.pdf' # image_dir = 'data/预审查数据/extracted_images' # title_path = 'data/预审查数据/基于物联网技术的三峡坝区智慧仓储研究与建设招标文件-发出.json' # pdf_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.pdf' # image_dir = 'data/预审查数据/extracted_images' # title_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json' # os.makedirs(image_dir, exist_ok=True) # main_parse(pdf_path=pdf_path, title_path=title_path, image_dir=image_dir) # table_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json' # content_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json' # agent = PdfExtractAttr_(file_path=pdf_path) ## agent.extract_content(content_path=content_path) # contents = agent.output_() # agent.parse_text() # agent.parse_table() ## agent.parse_table_pro(table_path=table_path) # all_tables = agent.output() import glob dir_path = 'data/财报素材' for pdf_path in glob.glob(f'{dir_path}/*.pdf'): print(pdf_path) if '600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告' not in pdf_path: continue agent = PdfExtractAttr_(file_path=pdf_path) content_path = f'{dir_path}/{pdf_path.split("/")[-1].split(".")[0]}_content.json' agent.extract_content(content_path=content_path) table_path = f'{dir_path}/{pdf_path.split("/")[-1].split(".")[0]}_table.json' agent.parse_table_pro(table_path=table_path)