# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2024-06-11 13:43:14 # @Last Modified by: privacy # @Last Modified time: 2024-07-25 16:36:24 # import os # from PIL import Image # from PyPDF2 import PdfReader # # 读取PDF文件 # with open(pdf_path, 'rb') as file: # reader = PdfReader(file) # num_pages = len(reader.pages) # # 遍历PDF的每一页 # for page_num in range(num_pages): # page = reader.pages[page_num] # # 提取页面中的图像 # if '/XObject' in page['/Resources']: # xobjects = page['/Resources']['/XObject'].get_object() # for obj in xobjects: # if xobjects[obj]['/Subtype'] == '/Image': # size = (xobjects[obj]['/Width'], xobjects[obj]['/Height']) # data = xobjects[obj].get_data() # if xobjects[obj]['/ColorSpace'] == '/DeviceRGB': # mode = "RGB" # else: # mode = "P" # img = Image.frombytes(mode, size, data) # img_path = os.path.join(output_dir, f'image_{page_num}_{obj}.png') # img.save(img_path) # print(f'Image saved: {img_path}') ####################################################################### # import os # import re # import fitz # def pdf2pic(path, save_path): # checkXO = r"/Type(?= */XObject)" # checkIM = r"/Subtype(?= */Image)" # pdf = fitz.open(path) # lenXREF = pdf._getXrefLength() # imgcount = 0 # for i in range(1, lenXREF): # text = pdf._getXrefString(i) # isXObject = re.search(checkXO, text) # isImage = re.search(checkIM, text) # if not isXObject or not isImage: # continue # imgcount += 1 # pix = fitz.Pixmap(pdf, i) # new_name = f"img_{imgcount}.png" # if pix.n < 5: # pix.writePNG(os.path.join(pic_path, new_name)) # else: # pix0 = fitz.Pixmap(fitz.csRGB, pix) # pix0.writePNG(os.path.join(pic_path, new_name)) # pix0 = None # pix = None # if __name__ == '__main__': # pdf2pic(pdf_path, image_dir) ####################################################################### # 标准包导入 import os import re import json from io import BytesIO from pprint import pprint # 第三方包导入 import numpy as np import pandas as pd import cv2 from pdfminer.high_level import extract_pages from pdfminer.layout import LTRect, LTTextBoxHorizontal, LTLine, LTFigure, LTCurve, LTImage, LTChar from pdfminer.pdfcolor import LITERAL_DEVICE_CMYK from pdfminer.pdfcolor import LITERAL_DEVICE_GRAY from pdfminer.pdfcolor import LITERAL_DEVICE_RGB from pdfminer.pdftypes import ( LITERALS_DCT_DECODE, LITERALS_JBIG2_DECODE, LITERALS_JPX_DECODE, LITERALS_FLATE_DECODE, ) from pdfminer.pdfparser import PDFParser, PDFSyntaxError from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines from pdfminer.image import BMPWriter import pdfplumber # 自定义包导入 from tools import RefPageNumberResolver HEADERS = set({'序号', '项目编码', '项目名称', '项目特征', '单位', '工程量', '全费用综合单价', '合价', '备注', '主材名称', '规格型号', '不低于下列同档次品牌', '投标选用品牌及规格型号', '名称', '事项', '数量', '含税单价(元)', '含税合价(元)', '条款号', '评分因素', '评分标准', '页码'}) def is_title(line: str) -> bool: title_word = re.findall('^[(\(][一二三四五六七八九十]+[\))]|^\d\.|^1\d\.|^2\d\.|^[第][一二三四五六七八九十\d]+[章节条]|[一二三四五六七八九十]+[、要是]', line.strip()) if title_word: return True title_word = re.findall('^附录|^参考文献|^附表', line.strip()) if title_word: return True return False def export_image(image: LTImage, path: str) -> str: """Save an LTImage to disk""" (width, height) = image.srcsize filters = image.stream.get_filters() if len(filters) == 1 and filters[0][0] in LITERALS_DCT_DECODE: name = _save_jpeg(image, path) return name elif len(filters) == 1 and filters[0][0] in LITERALS_JPX_DECODE: name = _save_jpeg2000(image, path) return name # elif image.bits == 1: # name = _save_bmp(image, width, height, (width + 7) // 8, image.bits, path) # elif image.bits == 8 and LITERAL_DEVICE_RGB in image.colorspace: # name = _save_bmp(image, width, height, width * 3, image.bits * 3, path) # elif image.bits == 8 and LITERAL_DEVICE_GRAY in image.colorspace: # name = _save_bmp(image, width, height, width, image.bits, path) # elif len(filters) == 1 and filters[0][0] in LITERALS_FLATE_DECODE: # name = _save_bytes(image) # else: # name = _save_raw(image) data = image.stream.get_data() raw_data = image.stream.get_rawdata() if data: if data[:2] == b'\xff\xd8' and data[-2:] == b'\xff\xd9': path += '.jpg' with open(path, 'wb') as file: file.write(data) return path elif data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a': path += '.png' with open(path, 'wb') as file: file.write(data) return path elif data[:2] == b'\x42\x4d': path += '.bmp' with open(path, 'wb') as file: file.write(data) return path elif data[:6] == b'\x47\x49\x46\x38\x37\x61' or data[:6] == b'\x47\x49\x46\x38\x39\x61': path += '.gif' with open(path, 'wb') as file: file.write(data) return path elif data[:2] == b'\x4d\x4d' or data[:2] == b'\x49\x49': path += '.tiff' with open(path, 'wb') as file: file.write(data) return path elif data[:8] == b'\xffO\xffQ\x00/\x00\x00': name = _save_j2k(image, path) return name else: path += '.unk' with open(path, 'wb') as file: file.write(data) return path elif raw_data: if raw_data[:2] == b'\xff\xd8' and raw_data[-2:] == b'\xff\xd9': path += '.jpg' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a': path += '.png' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:2] == b'\x42\x4d': path += '.bmp' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:6] == b'\x47\x49\x46\x38\x37\x61' or raw_data[:6] == b'\x47\x49\x46\x38\x39\x61': path += '.gif' with open(path, 'wb') as file: file.write(raw_data) return path elif raw_data[:2] == b'\x4d\x4d' or raw_data[:2] == b'\x49\x49': path += '.tiff' with open(path, 'wb') as file: file.write(raw_data) return path else: path += '.unk' with open(path, 'wb') as file: file.write(raw_data) return path else: return None def _save_j2k(image: LTImage, path: str) -> str: try: from PIL import Image except ImportError: raise ImportError(PIL_ERROR_MESSAGE) path = path + ".png" data = image.stream.get_data() assert data is not None byte_stream = BytesIO(data) roiImg = Image.open(byte_stream) roiImg.save(path) return path def _save_jpeg(image: LTImage, path: str) -> str: """Save a JPEG encoded image""" raw_data = image.stream.get_rawdata() assert raw_data is not None path = path + ".jpg" with open(path, "wb") as fp: if LITERAL_DEVICE_CMYK in image.colorspace: try: from PIL import Image, ImageChops # type: ignore[import] except ImportError: raise ImportError(PIL_ERROR_MESSAGE) ifp = BytesIO(raw_data) i = Image.open(ifp) i = ImageChops.invert(i) i = i.convert("RGB") i.save(fp, "JPEG") else: fp.write(raw_data) return path def _save_jpeg2000(image: LTImage, path: str) -> str: """Save a JPEG 2000 encoded image""" raw_data = image.stream.get_rawdata() assert raw_data is not None path = path + ".png" try: from PIL import Image # type: ignore[import] except ImportError: raise ImportError(PIL_ERROR_MESSAGE) # 如果我们只写原始数据,我尝试过的大多数图像程序都无法打开文件。 # 然而,使用OpenCV2打开和保存会生成一个文件,该文件似乎很容易被其他程序打开 ifp = BytesIO(raw_data) i = Image.open(ifp) opencv_image = cv2.cvtColor(np.array(i), cv2.COLOR_RGB2BGR) cv2.imwrite(path, opencv_image) return path def _save_bmp(image: LTImage, width: int, height: int, bytes_per_line: int, bits: int, path: str) -> str: """Save a BMP encoded image""" data = image.stream.get_data() path = path + ".bmp" with open(path, "wb") as fp: fp.write(data) return path def main_parse(pdf_path: str, title_path: str, image_dir: str) -> None: texts = [] images = [] # 读取PDF文件并提取页面 for page_number, page_layout in enumerate(extract_pages(pdf_path)): title_index = 0 image_index = 0 for element in page_layout: if isinstance(element, LTLine): pass elif isinstance(element, LTRect): pass elif isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1: text = element.get_text().strip() # # 假设标题通常是一行且字体较大 if text and (is_title(text) or element.height > 15): texts.append({'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text}) title_index += 1 elif isinstance(element, LTFigure): for e_obj in element._objs: if isinstance(e_obj, LTImage): # 提取图片数据 image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}') image_file = export_image(e_obj, image_file) images.append(image_file) pprint(f'Image saved: {image_file}') image_index += 1 with open(title_path, 'w', encoding='utf-8') as fp: json.dump(texts, fp, indent=4, ensure_ascii=False) def table_parse(pdf_path: str, title_path: str, start_title: str = '六、已标价工程量清单', end_title: str = '七、施工组织设计', table_path: str = 'table.json', start_page_number: int = None, end_page_number: int = None) -> list: """pdf表格解析功能 @pdf_path @title_path @start_title @end_title @table_path @start_page_number @end_page_number """ tables = [] if (start_page_number == None) or (end_page_number == None): df = pd.read_json(title_path) start_page_number = df[df['text'] == start_title].page_number.max() end_page_number = df[df['text'] == end_title].page_number.max() def concat_table(tables, table): """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表 @tables @table """ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]] tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]] if len(table) > 1: second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]] # pprint(first) if len(HEADERS & set(first)) > 2: # pprint("找到大量表头元素,判断为独立表头,生成新表!") tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1}) elif ((i-1) in tables[-1]['page_numbers']) and (len(first) == tables[-1]['col_len']): # pprint("有空列,不是单独表,直接合并") tables[-1]['page_numbers'].append(i) tables[-1]['table'].extend(table) else: tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0}) return tables with pdfplumber.open(pdf_path) as pdf: for i in range(start_page_number, end_page_number): for table in pdf.pages[i].extract_tables(): tables = concat_table(tables, table) with open(table_path, 'w', encoding='utf-8') as fp: json.dump(tables, fp, indent=4, ensure_ascii=False) return tables class PdfExtractAttr(object): def __init__(self, file_path: str): """PDF文件解析 @file_path """ super(PdfExtractAttr, self).__init__() self.file_path = file_path self.details = [] self.tables = [] self.content = [] def parse_outline(self): """PDF大纲解析 """ results = [] with open(self.file_path, "rb") as fp: try: parser = PDFParser(fp) document = PDFDocument(parser) ref_pagenum_resolver = RefPageNumberResolver(document) outlines = document.get_outlines() for (level, title, dest, a, se) in outlines: if dest: page_num = ref_pagenum_resolver.resolve(dest) elif a: page_num = ref_pagenum_resolver.resolve(a) elif se: page_num = ref_pagenum_resolver.resolve(se) else: page_num = None results.append({'level': level, 'title': title, 'page_number': page_num}) except PDFNoOutlines: print("No outlines found.") except PDFSyntaxError: print("Corrupted PDF or non-PDF file.") finally: parser.close() with open('outlines.json', 'w', encoding='utf-8') as op: json.dump(results, op, indent=4, ensure_ascii=False) print(results) def extract_content(self) -> list: with pdfplumber.open(self.file_path) as pdf: for page in pdf.pages: self.content.append({ 'page_number': page.page_number - 1, 'text': page.extract_text() }) return self.content def parse_text(self) -> None: """文本解析 """ for page_number, page_layout in enumerate(extract_pages(self.file_path)): for element in page_layout: if isinstance(element, LTTextBoxHorizontal): # 距离左侧 left = element.x0 # 距离右侧 right = (page_layout.width - element.x1) # 距离上侧 top = (page_layout.height - element.y1) # 距离下侧 button = element.y0 # 文本宽度 width = element.width if (left > right) and (abs(left - right) > 100): alignment = 'right' elif (left > 100) and (abs(left - right) < 50) and ((abs(left - right) / width) < 0.5): alignment = 'center' else: alignment = 'left' self.details.append({ 'page_number': page_number, 'index': element.index, 'x0': element.bbox[0], 'y0': element.bbox[1], 'x1': element.bbox[2], 'y1': element.bbox[3], 'alignment': alignment, 'lines': len(element._objs), 'text': element.get_text().strip(), 'is_table_name': element.get_text().strip().endswith('表') }) self.detail_df = pd.DataFrame(self.details) def concat_table(self, table: list, page_number: int, table_name: str = None) -> None: """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表 @table """ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]] tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]] if len(table) > 1: second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]] else: second = None # pprint(first) if len(HEADERS & set(first)) > 2: # pprint("找到大量表头元素,判断为独立表头,生成新表!") self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""}) elif second and (len(HEADERS & set(second)) > 2): # pprint("找到大量表头元素,判断为独立表头,生成新表!") if not table_name: first = [i for i in first if i] if len(first) == 1: table_name = "".join(first) self.tables.append({"page_numbers": [page_number], "title_len": len(second), "col_len": len(table[-1]), "table": table[1:], "confidence": 1, "table_name": table_name if table_name else ""}) elif ((page_number-1) in self.tables[-1]['page_numbers']) and (len(first) == self.tables[-1]['col_len']): # pprint("有空列,不是单独表,直接合并") self.tables[-1]['page_numbers'].append(page_number) self.tables[-1]['table'].extend(table) else: self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0, "table_name": table_name if table_name else ""}) def parse_table(self) -> None: """表格解析 """ with pdfplumber.open(self.file_path) as pdf: for page_number, page_layout in enumerate(pdf.pages): # 查询是否存在表格 tables = page_layout.find_tables() # 检测到该页面存在一个表格,对其进行合并判断 if len(tables) == 1: table = tables[0] x0, y0, x1, y1 = table.bbox table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''') if table_title_df.empty: self.concat_table(table.extract(), page_number=page_number) else: table_title_name = table_title_df.iloc[0]['text'] self.concat_table(table.extract(), page_number=page_number, table_name=table_title_name) table = tables[0] #self.concat_table(table.extract(), table_title_name) # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干 elif len(tables) > 1: pass def output(self, table_path: str = 'all_tables.json'): """结果输出 """ with open(table_path, 'w', encoding='utf-8') as fp: json.dump(self.tables, fp, indent=4, ensure_ascii=False) return self.tables if __name__ == '__main__': # pdf_path = './投标文件-修改版9-5-1-1.pdf' pdf_path = './南方电网数字研究院有限公司.pdf' # title_path = './投标文件-修改版9-5-1-1.json' title_path = './南方电网数字研究院有限公司.json' image_dir = './extracted_images' os.makedirs(image_dir, exist_ok=True) main_parse(pdf_path=pdf_path, title_path=title_path, image_dir=image_dir) # tables = table_parse(pdf_path=pdf_path, title_path=title_path, start_title='六、已标价工程量清单', end_title = '七、施工组织设计') # tables = table_parse(pdf_path=pdf_path, title_path=title_path, start_page_number=0, end_page_number=725) agent = PdfExtractAttr(file_path=pdf_path) print(agent.extract_content()) agent.parse_outline() agent.parse_text() agent.parse_table() agent.output()