|
@@ -0,0 +1,573 @@
|
|
|
+'''招标文件内容提取'''
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+import pdfplumber
|
|
|
+import json
|
|
|
+import os
|
|
|
+import re
|
|
|
+import cv2
|
|
|
+from io import BytesIO
|
|
|
+
|
|
|
+from pdfminer.layout import LTRect, LTTextBoxHorizontal, LTLine, LTFigure, LTCurve, LTImage, LTChar
|
|
|
+from pdfminer.high_level import extract_pages
|
|
|
+from pdfminer.pdfcolor import LITERAL_DEVICE_CMYK
|
|
|
+from pdfminer.pdftypes import (
|
|
|
+ LITERALS_DCT_DECODE,
|
|
|
+ LITERALS_JBIG2_DECODE,
|
|
|
+ LITERALS_JPX_DECODE,
|
|
|
+ LITERALS_FLATE_DECODE,
|
|
|
+)
|
|
|
+from pprint import pprint
|
|
|
+from pdfminer.pdfparser import PDFParser, PDFSyntaxError
|
|
|
+from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines
|
|
|
+import pdfplumber
|
|
|
+import camelot
|
|
|
+
|
|
|
+from .tools import RefPageNumberResolver
|
|
|
+
|
|
|
+HEADERS = set({'序号', '项目编码', '项目名称', '项目特征', '单位', '工程量', '全费用综合单价', '合价', '备注', '主材名称', '规格型号', '不低于下列同档次品牌', '投标选用品牌及规格型号', '名称', '事项', '数量', '含税单价(元)', '含税合价(元)', '条款号', '评分因素', '评分标准', '页码'})
|
|
|
+HEADERS |= set({'条款号' ,'评审因素' ,'评审标准', ''})
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def is_title(line: str) -> bool:
|
|
|
+ title_word = re.findall('^[(\(][一二三四五六七八九十]+[\))]|^\d\.|^1\d\.|^2\d\.|^[第][一二三四五六七八九十\d]+[章节条]|[一二三四五六七八九十]+[、要是]', line.strip())
|
|
|
+ if title_word:
|
|
|
+ return True
|
|
|
+ title_word = re.findall('^附录|^参考文献|^附表', line.strip())
|
|
|
+ if title_word:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+PIL_ERROR_MESSAGE = "PIL导入错误"
|
|
|
+def _save_jpeg(image: LTImage, path: str) -> str:
|
|
|
+ """Save a JPEG encoded image"""
|
|
|
+ raw_data = image.stream.get_rawdata()
|
|
|
+ assert raw_data is not None
|
|
|
+
|
|
|
+ path = path + ".jpg"
|
|
|
+
|
|
|
+ with open(path, "wb") as fp:
|
|
|
+ if LITERAL_DEVICE_CMYK in image.colorspace:
|
|
|
+ try:
|
|
|
+ from PIL import Image, ImageChops # type: ignore[import]
|
|
|
+ except ImportError:
|
|
|
+ raise ImportError(PIL_ERROR_MESSAGE)
|
|
|
+
|
|
|
+ ifp = BytesIO(raw_data)
|
|
|
+ i = Image.open(ifp)
|
|
|
+ i = ImageChops.invert(i)
|
|
|
+ i = i.convert("RGB")
|
|
|
+ i.save(fp, "JPEG")
|
|
|
+ else:
|
|
|
+ fp.write(raw_data)
|
|
|
+
|
|
|
+ return path
|
|
|
+
|
|
|
+def _save_jpeg2000(image: LTImage, path: str) -> str:
|
|
|
+ """Save a JPEG 2000 encoded image"""
|
|
|
+ raw_data = image.stream.get_rawdata()
|
|
|
+ assert raw_data is not None
|
|
|
+
|
|
|
+ path = path + ".png"
|
|
|
+
|
|
|
+ try:
|
|
|
+ from PIL import Image # type: ignore[import]
|
|
|
+ except ImportError:
|
|
|
+ raise ImportError(PIL_ERROR_MESSAGE)
|
|
|
+
|
|
|
+ # 如果我们只写原始数据,我尝试过的大多数图像程序都无法打开文件。
|
|
|
+ # 然而,使用OpenCV2打开和保存会生成一个文件,该文件似乎很容易被其他程序打开
|
|
|
+ ifp = BytesIO(raw_data)
|
|
|
+ i = Image.open(ifp)
|
|
|
+ opencv_image = cv2.cvtColor(np.array(i), cv2.COLOR_RGB2BGR)
|
|
|
+ cv2.imwrite(path, opencv_image)
|
|
|
+ return path
|
|
|
+
|
|
|
+def export_image(image: LTImage, path: str) -> str:
|
|
|
+ """Save an LTImage to disk"""
|
|
|
+ (width, height) = image.srcsize
|
|
|
+
|
|
|
+ filters = image.stream.get_filters()
|
|
|
+
|
|
|
+ if len(filters) == 1 and filters[0][0] in LITERALS_DCT_DECODE:
|
|
|
+ name = _save_jpeg(image, path)
|
|
|
+ return name
|
|
|
+
|
|
|
+ elif len(filters) == 1 and filters[0][0] in LITERALS_JPX_DECODE:
|
|
|
+ name = _save_jpeg2000(image, path)
|
|
|
+ return name
|
|
|
+
|
|
|
+ data = image.stream.get_data()
|
|
|
+ raw_data = image.stream.get_rawdata()
|
|
|
+
|
|
|
+ if data:
|
|
|
+ if data[:2] == b'\xff\xd8' and data[-2:] == b'\xff\xd9':
|
|
|
+ path += '.jpg'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(data)
|
|
|
+ return path
|
|
|
+ elif data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a':
|
|
|
+ path += '.png'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(data)
|
|
|
+ return path
|
|
|
+ elif data[:2] == b'\x42\x4d':
|
|
|
+ path += '.bmp'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(data)
|
|
|
+ return path
|
|
|
+ elif data[:6] == b'\x47\x49\x46\x38\x37\x61' or data[:6] == b'\x47\x49\x46\x38\x39\x61':
|
|
|
+ path += '.gif'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(data)
|
|
|
+ return path
|
|
|
+ elif data[:2] == b'\x4d\x4d' or data[:2] == b'\x49\x49':
|
|
|
+ path += '.tiff'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(data)
|
|
|
+ return path
|
|
|
+ else:
|
|
|
+ path += '.unk'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(data)
|
|
|
+ return path
|
|
|
+ elif raw_data:
|
|
|
+ if raw_data[:2] == b'\xff\xd8' and raw_data[-2:] == b'\xff\xd9':
|
|
|
+ path += '.jpg'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(raw_data)
|
|
|
+ return path
|
|
|
+ elif raw_data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a':
|
|
|
+ path += '.png'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(raw_data)
|
|
|
+ return path
|
|
|
+ elif raw_data[:2] == b'\x42\x4d':
|
|
|
+ path += '.bmp'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(raw_data)
|
|
|
+ return path
|
|
|
+ elif raw_data[:6] == b'\x47\x49\x46\x38\x37\x61' or raw_data[:6] == b'\x47\x49\x46\x38\x39\x61':
|
|
|
+ path += '.gif'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(raw_data)
|
|
|
+ return path
|
|
|
+ elif raw_data[:2] == b'\x4d\x4d' or raw_data[:2] == b'\x49\x49':
|
|
|
+ path += '.tiff'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(raw_data)
|
|
|
+ return path
|
|
|
+ else:
|
|
|
+ path += '.unk'
|
|
|
+ with open(path, 'wb') as file:
|
|
|
+ file.write(raw_data)
|
|
|
+ return path
|
|
|
+ else:
|
|
|
+ return None
|
|
|
+
|
|
|
+def main_parse(pdf_path: str, title_path: str, image_dir: str) -> None:
|
|
|
+ texts = []
|
|
|
+ images = []
|
|
|
+ # 读取PDF文件并提取页面
|
|
|
+ for page_number, page_layout in enumerate(extract_pages(pdf_path)):
|
|
|
+ title_index = 0
|
|
|
+ image_index = 0
|
|
|
+ for element in page_layout:
|
|
|
+ if isinstance(element, LTLine):
|
|
|
+ pass
|
|
|
+ elif isinstance(element, LTRect):
|
|
|
+ pass
|
|
|
+ elif isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1:
|
|
|
+ text = element.get_text().strip()
|
|
|
+ # # 假设标题通常是一行且字体较大
|
|
|
+ if text and (is_title(text) or element.height > 15):
|
|
|
+ texts.append({'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text})
|
|
|
+ title_index += 1
|
|
|
+ # elif isinstance(element, LTFigure):
|
|
|
+ # for e_obj in element._objs:
|
|
|
+ # if isinstance(e_obj, LTImage):
|
|
|
+ # # 提取图片数据
|
|
|
+ # image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}')
|
|
|
+ # image_file = export_image(e_obj, image_file)
|
|
|
+ # images.append(image_file)
|
|
|
+ # pprint(f'Image saved: {image_file}')
|
|
|
+ # image_index += 1
|
|
|
+
|
|
|
+ with open(title_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(texts, fp, indent=4, ensure_ascii=False)
|
|
|
+ return title_path,image_dir
|
|
|
+
|
|
|
+from typing import Optional, List
|
|
|
+def parse_title(file_path: str, title_path: Optional[str] = None) -> list:
|
|
|
+ """
|
|
|
+ 标题解析,用于报价唯一
|
|
|
+
|
|
|
+ Args:
|
|
|
+ title_path: 保存路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ results: 标题列表
|
|
|
+ """
|
|
|
+ results = []
|
|
|
+
|
|
|
+ seq_num = 0
|
|
|
+
|
|
|
+ for page_number, page_layout in enumerate(extract_pages(file_path)):
|
|
|
+ title_index = 0
|
|
|
+ for element in page_layout:
|
|
|
+ if isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1:
|
|
|
+ text = element.get_text().strip()
|
|
|
+ if text and (is_title(text) or element.height > 15):
|
|
|
+ results.append({
|
|
|
+ 'index': title_index,
|
|
|
+ 'page_number': page_number,
|
|
|
+ 'bbox': element.bbox,
|
|
|
+ 'text': text,
|
|
|
+ 'title': text,
|
|
|
+ 'seq_num': seq_num
|
|
|
+ })
|
|
|
+ seq_num += 1
|
|
|
+ title_index += 1
|
|
|
+
|
|
|
+ if title_path:
|
|
|
+ with open(title_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(results, fp, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ return title_path
|
|
|
+
|
|
|
+def parse_image(file_path: str, image_dir: str, image_meta_path: str) -> List[dict]:
|
|
|
+ """
|
|
|
+ 解析PDF中的图片
|
|
|
+ Args:
|
|
|
+ image_dir: 解析目录
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ image_list: 图片列表
|
|
|
+ """
|
|
|
+ image_list = []
|
|
|
+
|
|
|
+ for page_number, page_layout in enumerate(extract_pages(file_path)):
|
|
|
+ image_index = 0
|
|
|
+ for element in page_layout:
|
|
|
+ if isinstance(element, LTFigure):
|
|
|
+ for e_obj in element._objs:
|
|
|
+ if isinstance(e_obj, LTImage):
|
|
|
+ # 提取图片数据
|
|
|
+ image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}')
|
|
|
+ image_file = export_image(e_obj, image_file)
|
|
|
+ image_list.append({
|
|
|
+ "image_index": image_index,
|
|
|
+ "page_number": page_number,
|
|
|
+ "image_name": image_file
|
|
|
+ })
|
|
|
+ image_index += 1
|
|
|
+
|
|
|
+ if image_meta_path:
|
|
|
+ with open(image_meta_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(image_list, fp, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ return image_meta_path
|
|
|
+
|
|
|
+
|
|
|
+def table_parse(pdf_path: str,
|
|
|
+ title_path: str,
|
|
|
+ start_title: str = '第三章 评标办法(综合评估法)',
|
|
|
+ end_title: str = '第四章 合同条款及格式',
|
|
|
+ table_path: str = None,
|
|
|
+ start_page_number: int = None,
|
|
|
+ end_page_number: int = None
|
|
|
+ ) -> list:
|
|
|
+ """pdf表格解析功能
|
|
|
+ @pdf_path
|
|
|
+ @title_path
|
|
|
+ @start_title
|
|
|
+ @end_title
|
|
|
+ @table_path
|
|
|
+ @start_page_number
|
|
|
+ @end_page_number
|
|
|
+ """
|
|
|
+ tables = []
|
|
|
+
|
|
|
+ if (start_page_number == None) or (end_page_number == None):
|
|
|
+ df = pd.read_json(title_path)
|
|
|
+ start_page_number = df[df['text'] == start_title].page_number.max()
|
|
|
+ end_page_number = df[df['text'] == end_title].page_number.max()
|
|
|
+
|
|
|
+ def concat_table(tables, table):
|
|
|
+ """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表
|
|
|
+ @tables
|
|
|
+ @table
|
|
|
+ """
|
|
|
+ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]]
|
|
|
+ tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]]
|
|
|
+ if len(table) > 1:
|
|
|
+ second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]]
|
|
|
+ # pprint(first)
|
|
|
+ if len(HEADERS & set(first)) > 2:
|
|
|
+ # pprint("找到大量表头元素,判断为独立表头,生成新表!")
|
|
|
+ tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1})
|
|
|
+ elif ((i-1) in tables[-1]['page_numbers']) and (len(first) == tables[-1]['col_len']):
|
|
|
+ # pprint("有空列,不是单独表,直接合并")
|
|
|
+ tables[-1]['page_numbers'].append(i)
|
|
|
+ tables[-1]['table'].extend(table)
|
|
|
+ else:
|
|
|
+ tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0})
|
|
|
+ return tables
|
|
|
+
|
|
|
+ with pdfplumber.open(pdf_path) as pdf:
|
|
|
+ print(start_page_number, end_page_number)
|
|
|
+ for i in range(start_page_number, end_page_number):
|
|
|
+ for table in pdf.pages[i].extract_tables():
|
|
|
+ tables = concat_table(tables, table)
|
|
|
+
|
|
|
+ with open(table_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(tables, fp, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ return table_path
|
|
|
+
|
|
|
+
|
|
|
+class PdfExtractAttr_(object):
|
|
|
+ def __init__(self, file_path: str):
|
|
|
+ """PDF文件解析
|
|
|
+ @file_path
|
|
|
+ """
|
|
|
+ super(PdfExtractAttr_, self).__init__()
|
|
|
+ self.file_path = file_path
|
|
|
+ self.details = []
|
|
|
+ self.tables = []
|
|
|
+ self.content = []
|
|
|
+ self.chapters = []
|
|
|
+ self.references = []
|
|
|
+ self.detail_df = None
|
|
|
+ self.outlines = None
|
|
|
+
|
|
|
+ def parse_outline(self):
|
|
|
+ """PDF大纲解析
|
|
|
+ """
|
|
|
+ results = []
|
|
|
+ with open(self.file_path, "rb") as fp:
|
|
|
+ try:
|
|
|
+ parser = PDFParser(fp)
|
|
|
+ document = PDFDocument(parser)
|
|
|
+ ref_pagenum_resolver = RefPageNumberResolver(document)
|
|
|
+ outlines = document.get_outlines()
|
|
|
+ for (level, title, dest, a, se) in outlines:
|
|
|
+ if dest:
|
|
|
+ page_num = ref_pagenum_resolver.resolve(dest)
|
|
|
+ elif a:
|
|
|
+ page_num = ref_pagenum_resolver.resolve(a)
|
|
|
+ elif se:
|
|
|
+ page_num = ref_pagenum_resolver.resolve(se)
|
|
|
+ else:
|
|
|
+ page_num = None
|
|
|
+ results.append({'level': level, 'title': title, 'page_number': page_num})
|
|
|
+ except PDFNoOutlines:
|
|
|
+ print("No outlines found.")
|
|
|
+ except PDFSyntaxError:
|
|
|
+ print("Corrupted PDF or non-PDF file.")
|
|
|
+ finally:
|
|
|
+ parser.close()
|
|
|
+
|
|
|
+ with open('outlines.json', 'w', encoding='utf-8') as op:
|
|
|
+ json.dump(results, op, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ # print(results)
|
|
|
+
|
|
|
+ def extract_content(self, content_path: str = None) -> list:
|
|
|
+ with pdfplumber.open(self.file_path) as pdf:
|
|
|
+ for page in pdf.pages:
|
|
|
+ self.content.append({
|
|
|
+ 'page_number': page.page_number - 1,
|
|
|
+ 'text': page.extract_text()
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+ with open(content_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(self.content, fp, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ return content_path
|
|
|
+
|
|
|
+ def parse_text(self) -> None:
|
|
|
+ """文本解析
|
|
|
+ """
|
|
|
+ for page_number, page_layout in enumerate(extract_pages(self.file_path)):
|
|
|
+ for element in page_layout:
|
|
|
+ if isinstance(element, LTTextBoxHorizontal):
|
|
|
+ # 距离左侧
|
|
|
+ left = element.x0
|
|
|
+ # 距离右侧
|
|
|
+ right = (page_layout.width - element.x1)
|
|
|
+ # 距离上侧
|
|
|
+ top = (page_layout.height - element.y1)
|
|
|
+ # 距离下侧
|
|
|
+ button = element.y0
|
|
|
+ # 文本宽度
|
|
|
+ width = element.width
|
|
|
+ if (left > right) and (abs(left - right) > 100):
|
|
|
+ alignment = 'right'
|
|
|
+ elif (left > 100) and (abs(left - right) < 50) and ((abs(left - right) / width) < 0.5):
|
|
|
+ alignment = 'center'
|
|
|
+ else:
|
|
|
+ alignment = 'left'
|
|
|
+ self.details.append({
|
|
|
+ 'page_number': page_number,
|
|
|
+ 'index': element.index,
|
|
|
+ 'x0': element.bbox[0],
|
|
|
+ 'y0': element.bbox[1],
|
|
|
+ 'x1': element.bbox[2],
|
|
|
+ 'y1': element.bbox[3],
|
|
|
+ 'alignment': alignment,
|
|
|
+ 'lines': len(element._objs),
|
|
|
+ 'text': element.get_text().strip(),
|
|
|
+ 'is_table_name': element.get_text().strip().endswith('表')
|
|
|
+ })
|
|
|
+ self.detail_df = pd.DataFrame(self.details)
|
|
|
+
|
|
|
+ def concat_table(self, table: list, page_number: int, table_name: str = None, new: bool = False) -> None:
|
|
|
+ """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表
|
|
|
+ @table
|
|
|
+ """
|
|
|
+ first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]]
|
|
|
+
|
|
|
+ if new:
|
|
|
+ self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
+ return
|
|
|
+
|
|
|
+ tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]]
|
|
|
+ if len(table) > 1:
|
|
|
+ second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]]
|
|
|
+ else:
|
|
|
+ second = None
|
|
|
+ # pprint(first)
|
|
|
+ if not self.tables or len(HEADERS & set(first)) > 2:
|
|
|
+ # pprint("找到大量表头元素,判断为独立表头,生成新表!")
|
|
|
+ self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
+ elif second and (len(HEADERS & set(second)) > 2):
|
|
|
+ # pprint("找到大量表头元素,判断为独立表头,生成新表!")
|
|
|
+ if not table_name:
|
|
|
+ first = [i for i in first if i]
|
|
|
+ if len(first) == 1:
|
|
|
+ table_name = "".join(first)
|
|
|
+ self.tables.append({"page_numbers": [page_number], "title_len": len(second), "col_len": len(table[-1]), "table": table[1:], "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
+ elif ((page_number-1) in self.tables[-1]['page_numbers']) and (len(first) == self.tables[-1]['col_len']):
|
|
|
+ # pprint("有空列,不是单独表,直接合并")
|
|
|
+ self.tables[-1]['page_numbers'].append(page_number)
|
|
|
+ self.tables[-1]['table'].extend(table)
|
|
|
+ else:
|
|
|
+ self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0, "table_name": table_name if table_name else ""})
|
|
|
+
|
|
|
+
|
|
|
+ def parse_table(self) -> None:
|
|
|
+ """表格解析
|
|
|
+ """
|
|
|
+ with pdfplumber.open(self.file_path) as pdf:
|
|
|
+ for page_number, page_layout in enumerate(pdf.pages):
|
|
|
+ # 查询是否存在表格
|
|
|
+ tables = page_layout.find_tables()
|
|
|
+ # 检测到该页面存在一个表格,对其进行合并判断
|
|
|
+ if len(tables) == 1:
|
|
|
+ table = tables[0]
|
|
|
+ x0, y0, x1, y1 = table.bbox
|
|
|
+ table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''')
|
|
|
+ if table_title_df.empty:
|
|
|
+ self.concat_table(table.extract(), page_number=page_number)
|
|
|
+ else:
|
|
|
+ table_title_name = table_title_df.iloc[0]['text']
|
|
|
+ self.concat_table(table.extract(), page_number=page_number, table_name=table_title_name)
|
|
|
+ table = tables[0]
|
|
|
+ #self.concat_table(table.extract(), table_title_name)
|
|
|
+ # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干
|
|
|
+ elif len(tables) > 1:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def parse_table_pro(self, table_path: str = 'all_tables.json') -> str:
|
|
|
+ """表格解析
|
|
|
+ """
|
|
|
+ if self.detail_df == None:
|
|
|
+ self.parse_text()
|
|
|
+
|
|
|
+ with pdfplumber.open(self.file_path) as pdf:
|
|
|
+ for page_number, page_layout in enumerate(pdf.pages):
|
|
|
+ # 查询是否存在表格
|
|
|
+ tables = page_layout.find_tables()
|
|
|
+
|
|
|
+ if not tables:
|
|
|
+ continue
|
|
|
+
|
|
|
+ tables_pro = camelot.read_pdf(
|
|
|
+ self.file_path,
|
|
|
+ # flavor='stream',
|
|
|
+ pages=str(page_number+1),
|
|
|
+ # edge_tol=200,
|
|
|
+ )
|
|
|
+
|
|
|
+ if not tables_pro:
|
|
|
+ continue
|
|
|
+
|
|
|
+ print(len(tables), len(tables_pro))
|
|
|
+
|
|
|
+ # 检测到该页面存在一个表格,对其进行合并判断
|
|
|
+ if (len(tables) != 0) and (len(tables_pro) == 1):
|
|
|
+ print(f"解析PDF{page_number}页的表格")
|
|
|
+ # print(f"解析PDF{page_number}页的表格")
|
|
|
+ table = tables[0]
|
|
|
+ table_pro = tables_pro[0].df.to_dict(orient='split')['data']
|
|
|
+ x0, y0, x1, y1 = table.bbox
|
|
|
+ table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''')
|
|
|
+ if table_title_df.empty:
|
|
|
+ self.concat_table(table_pro, page_number=page_number)
|
|
|
+ else:
|
|
|
+ table_title_name = table_title_df.iloc[0]['text']
|
|
|
+ self.concat_table(table_pro, page_number=page_number, table_name=table_title_name)
|
|
|
+ table = tables[0]
|
|
|
+ # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干
|
|
|
+ elif len(tables_pro) > 1:
|
|
|
+ print(f"解析PDF{page_number}页的表格")
|
|
|
+ first_table = tables_pro[0]
|
|
|
+ self.concat_table(first_table.df.to_dict(orient='split')['data'], page_number=page_number)
|
|
|
+ for table_index in range(1, len(tables_pro)):
|
|
|
+ self.concat_table(tables_pro[table_index].df.to_dict(orient='split')['data'], page_number=page_number, new=True)
|
|
|
+
|
|
|
+ with open(table_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(self.tables, fp, indent=4, ensure_ascii=False)
|
|
|
+ return table_path
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # pdf_path = 'data/预审查数据/基于物联网技术的三峡坝区智慧仓储研究与建设招标文件-发出.pdf'
|
|
|
+ # image_dir = 'data/预审查数据/extracted_images'
|
|
|
+ # title_path = 'data/预审查数据/基于物联网技术的三峡坝区智慧仓储研究与建设招标文件-发出.json'
|
|
|
+
|
|
|
+ # pdf_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.pdf'
|
|
|
+ # image_dir = 'data/预审查数据/extracted_images'
|
|
|
+ # title_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json'
|
|
|
+
|
|
|
+ # os.makedirs(image_dir, exist_ok=True)
|
|
|
+ # main_parse(pdf_path=pdf_path, title_path=title_path, image_dir=image_dir)
|
|
|
+
|
|
|
+ # table_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json'
|
|
|
+ # content_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json'
|
|
|
+ # agent = PdfExtractAttr_(file_path=pdf_path)
|
|
|
+
|
|
|
+ ## agent.extract_content(content_path=content_path)
|
|
|
+ # contents = agent.output_()
|
|
|
+
|
|
|
+ # agent.parse_text()
|
|
|
+ # agent.parse_table()
|
|
|
+ ## agent.parse_table_pro(table_path=table_path)
|
|
|
+ # all_tables = agent.output()
|
|
|
+
|
|
|
+ import glob
|
|
|
+ dir_path = 'data/财报素材'
|
|
|
+ for pdf_path in glob.glob(f'{dir_path}/*.pdf'):
|
|
|
+ print(pdf_path)
|
|
|
+ if '600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告' not in pdf_path: continue
|
|
|
+ agent = PdfExtractAttr_(file_path=pdf_path)
|
|
|
+
|
|
|
+ content_path = f'{dir_path}/{pdf_path.split("/")[-1].split(".")[0]}_content.json'
|
|
|
+ agent.extract_content(content_path=content_path)
|
|
|
+
|
|
|
+ table_path = f'{dir_path}/{pdf_path.split("/")[-1].split(".")[0]}_table.json'
|
|
|
+ agent.parse_table_pro(table_path=table_path)
|
|
|
+
|