|
@@ -2,7 +2,7 @@
|
|
|
# @Author: privacy
|
|
|
# @Date: 2024-06-11 13:43:14
|
|
|
# @Last Modified by: privacy
|
|
|
-# @Last Modified time: 2024-08-08 17:07:49
|
|
|
+# @Last Modified time: 2024-07-25 16:36:24
|
|
|
|
|
|
# import os
|
|
|
|
|
@@ -82,12 +82,14 @@ from io import BytesIO
|
|
|
from pprint import pprint
|
|
|
|
|
|
# 第三方包导入
|
|
|
-import cv2
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
+import cv2
|
|
|
from pdfminer.high_level import extract_pages
|
|
|
from pdfminer.layout import LTRect, LTTextBoxHorizontal, LTLine, LTFigure, LTCurve, LTImage, LTChar
|
|
|
from pdfminer.pdfcolor import LITERAL_DEVICE_CMYK
|
|
|
+from pdfminer.pdfcolor import LITERAL_DEVICE_GRAY
|
|
|
+from pdfminer.pdfcolor import LITERAL_DEVICE_RGB
|
|
|
from pdfminer.pdftypes import (
|
|
|
LITERALS_DCT_DECODE,
|
|
|
LITERALS_JBIG2_DECODE,
|
|
@@ -96,18 +98,41 @@ from pdfminer.pdftypes import (
|
|
|
)
|
|
|
from pdfminer.pdfparser import PDFParser, PDFSyntaxError
|
|
|
from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines
|
|
|
+from pdfminer.image import BMPWriter
|
|
|
+from pdfminer.pdfinterp import resolve1
|
|
|
import pdfplumber
|
|
|
-import camelot
|
|
|
+from tqdm import tqdm
|
|
|
|
|
|
# 自定义包导入
|
|
|
from tools import RefPageNumberResolver
|
|
|
|
|
|
-HEADERS = set({'序号', '项目编码', '项目名称', '项目特征', '单位', '工程量', '全费用综合单价', '合价', '备注', '主材名称', '规格型号', '不低于下列同档次品牌', '投标选用品牌及规格型号', '名称', '事项', '数量', '含税单价(元)', '含税合价(元)', '条款号', '评分因素', '评分标准', '页码'})
|
|
|
+HEADERS = set(
|
|
|
+ {'序号', '项目编码', '项目名称', '项目特征', '单位', '工程量', '全费用综合单价', '合价', '备注', '主材名称',
|
|
|
+ '规格型号', '不低于下列同档次品牌', '投标选用品牌及规格型号', '名称', '事项', '数量', '含税单价(元)',
|
|
|
+ '含税合价(元)', '条款号', '评分因素', '评分标准', '页码'})
|
|
|
+
|
|
|
+
|
|
|
+def load_json(data_path: str):
|
|
|
+ try:
|
|
|
+ with open(data_path, 'r', encoding='utf-8') as f:
|
|
|
+ data = json.load(f)
|
|
|
+ return data
|
|
|
+ except FileNotFoundError:
|
|
|
+ print(f"Error: The file '{data_path}' was not found.")
|
|
|
+ return None
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f"Error decoding JSON from '{data_path}': {e}")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error loading JSON from '{data_path}': {e}")
|
|
|
+ return None
|
|
|
|
|
|
|
|
|
+# 定义函数is_title,用于判断输入字符line是否为标题
|
|
|
def is_title(line: str) -> bool:
|
|
|
- # title_word = re.findall('^[(\(][一二三四五六七八九十]+[\))]|^\d\.|^1\d\.|^2\d\.|^[第][一二三四五六七八九十\d]+[章节条]|[一二三四五六七八九十]+[、要是]', line.strip())
|
|
|
- title_word = re.findall('^第[一二三四五六七八九十]+|^[一二三四五六七八九十\d]+、|^[\(\(][一二三四五六七八九十]+[\)\)]', line.strip())
|
|
|
+ title_word = re.findall(
|
|
|
+ '^[(\(][一二三四五六七八九十]+[\))]|^\d\.|^1\d\.|^2\d\.|^[第][一二三四五六七八九十\d]+[章节条]|[一二三四五六七八九十]+[、要是]|^[(\(][1-9]+[\))]',
|
|
|
+ line.strip())
|
|
|
if title_word:
|
|
|
return True
|
|
|
title_word = re.findall('^附录|^参考文献|^附表', line.strip())
|
|
@@ -115,6 +140,7 @@ def is_title(line: str) -> bool:
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
+
|
|
|
def export_image(image: LTImage, path: str) -> str:
|
|
|
"""Save an LTImage to disk"""
|
|
|
(width, height) = image.srcsize
|
|
@@ -129,6 +155,20 @@ def export_image(image: LTImage, path: str) -> str:
|
|
|
name = _save_jpeg2000(image, path)
|
|
|
return name
|
|
|
|
|
|
+ # elif image.bits == 1:
|
|
|
+ # name = _save_bmp(image, width, height, (width + 7) // 8, image.bits, path)
|
|
|
+
|
|
|
+ # elif image.bits == 8 and LITERAL_DEVICE_RGB in image.colorspace:
|
|
|
+ # name = _save_bmp(image, width, height, width * 3, image.bits * 3, path)
|
|
|
+
|
|
|
+ # elif image.bits == 8 and LITERAL_DEVICE_GRAY in image.colorspace:
|
|
|
+ # name = _save_bmp(image, width, height, width, image.bits, path)
|
|
|
+
|
|
|
+ # elif len(filters) == 1 and filters[0][0] in LITERALS_FLATE_DECODE:
|
|
|
+ # name = _save_bytes(image)
|
|
|
+
|
|
|
+ # else:
|
|
|
+ # name = _save_raw(image)
|
|
|
data = image.stream.get_data()
|
|
|
raw_data = image.stream.get_rawdata()
|
|
|
|
|
@@ -200,6 +240,7 @@ def export_image(image: LTImage, path: str) -> str:
|
|
|
else:
|
|
|
return None
|
|
|
|
|
|
+
|
|
|
def _save_j2k(image: LTImage, path: str) -> str:
|
|
|
try:
|
|
|
from PIL import Image
|
|
@@ -215,6 +256,7 @@ def _save_j2k(image: LTImage, path: str) -> str:
|
|
|
|
|
|
return path
|
|
|
|
|
|
+
|
|
|
def _save_jpeg(image: LTImage, path: str) -> str:
|
|
|
"""Save a JPEG encoded image"""
|
|
|
raw_data = image.stream.get_rawdata()
|
|
@@ -239,6 +281,7 @@ def _save_jpeg(image: LTImage, path: str) -> str:
|
|
|
|
|
|
return path
|
|
|
|
|
|
+
|
|
|
def _save_jpeg2000(image: LTImage, path: str) -> str:
|
|
|
"""Save a JPEG 2000 encoded image"""
|
|
|
raw_data = image.stream.get_rawdata()
|
|
@@ -259,6 +302,7 @@ def _save_jpeg2000(image: LTImage, path: str) -> str:
|
|
|
cv2.imwrite(path, opencv_image)
|
|
|
return path
|
|
|
|
|
|
+
|
|
|
def _save_bmp(image: LTImage, width: int, height: int, bytes_per_line: int, bits: int, path: str) -> str:
|
|
|
"""Save a BMP encoded image"""
|
|
|
data = image.stream.get_data()
|
|
@@ -268,7 +312,41 @@ def _save_bmp(image: LTImage, width: int, height: int, bytes_per_line: int, bits
|
|
|
return path
|
|
|
|
|
|
|
|
|
-def table_parse(pdf_path: str, title_path: str, start_title: str = '六、已标价工程量清单', end_title: str = '七、施工组织设计', table_path: str = 'table.json', start_page_number: int = None, end_page_number: int = None) -> list:
|
|
|
+def main_parse(pdf_path: str, title_path: str, image_dir: str) -> None:
|
|
|
+ texts = []
|
|
|
+ images = []
|
|
|
+ # 读取PDF文件并提取页面
|
|
|
+ for page_number, page_layout in enumerate(extract_pages(pdf_path)):
|
|
|
+ title_index = 0
|
|
|
+ image_index = 0
|
|
|
+ for element in page_layout:
|
|
|
+ if isinstance(element, LTLine):
|
|
|
+ pass
|
|
|
+ elif isinstance(element, LTRect):
|
|
|
+ pass
|
|
|
+ elif isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1:
|
|
|
+ text = element.get_text().strip()
|
|
|
+ # # 假设标题通常是一行且字体较大
|
|
|
+ if text and (is_title(text) or element.height > 15):
|
|
|
+ texts.append({'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text})
|
|
|
+ title_index += 1
|
|
|
+ elif isinstance(element, LTFigure):
|
|
|
+ for e_obj in element._objs:
|
|
|
+ if isinstance(e_obj, LTImage):
|
|
|
+ # 提取图片数据
|
|
|
+ image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}')
|
|
|
+ image_file = export_image(e_obj, image_file)
|
|
|
+ images.append(image_file)
|
|
|
+ pprint(f'Image saved: {image_file}')
|
|
|
+ image_index += 1
|
|
|
+
|
|
|
+ with open(title_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(texts, fp, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+
|
|
|
+def table_parse(pdf_path: str, title_path: str, start_title: str = '六、已标价工程量清单',
|
|
|
+ end_title: str = '七、施工组织设计', table_path: str = 'table.json', start_page_number: int = None,
|
|
|
+ end_page_number: int = None) -> list:
|
|
|
"""pdf表格解析功能
|
|
|
@pdf_path
|
|
|
@title_path
|
|
@@ -297,13 +375,15 @@ def table_parse(pdf_path: str, title_path: str, start_title: str = '六、已标
|
|
|
# pprint(first)
|
|
|
if len(HEADERS & set(first)) > 2:
|
|
|
# pprint("找到大量表头元素,判断为独立表头,生成新表!")
|
|
|
- tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1})
|
|
|
- elif ((i-1) in tables[-1]['page_numbers']) and (len(first) == tables[-1]['col_len']):
|
|
|
+ tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table,
|
|
|
+ "confidence": 1})
|
|
|
+ elif ((i - 1) in tables[-1]['page_numbers']) and (len(first) == tables[-1]['col_len']):
|
|
|
# pprint("有空列,不是单独表,直接合并")
|
|
|
tables[-1]['page_numbers'].append(i)
|
|
|
tables[-1]['table'].extend(table)
|
|
|
else:
|
|
|
- tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0})
|
|
|
+ tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table,
|
|
|
+ "confidence": 0})
|
|
|
return tables
|
|
|
|
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
@@ -327,361 +407,243 @@ class PdfExtractAttr(object):
|
|
|
self.details = []
|
|
|
self.tables = []
|
|
|
self.content = []
|
|
|
- self.chapters = []
|
|
|
- self.references = []
|
|
|
- self.detail_df = None
|
|
|
- self.outlines = None
|
|
|
+ self.total_page = resolve1(PDFDocument(PDFParser(open(file_path, 'rb'))).catalog['Pages'])['Count']
|
|
|
|
|
|
- def can_merge_lines(self, line1, line2) -> bool:
|
|
|
- """判断两行文本是否可以合并为一段
|
|
|
- """
|
|
|
- # line1 已结束
|
|
|
- if line1.x1 < self.right:
|
|
|
- return False
|
|
|
- # line2 有缩进
|
|
|
- if line2.x0 > self.left:
|
|
|
- return False
|
|
|
- return True
|
|
|
-
|
|
|
- def main_parse(self, title_path: str = None, section_path: str = None, image_dir: str = None) -> None:
|
|
|
- """解析PDF
|
|
|
- 参数:
|
|
|
- - title_path: str, 标题保存路径
|
|
|
- - sections_path: str, 正文保存目录
|
|
|
- - image_dir: str, 图片保存目录
|
|
|
- """
|
|
|
- self.outlines['text'] = ''
|
|
|
- # 标题
|
|
|
- texts = []
|
|
|
- # 图片
|
|
|
- images = []
|
|
|
-
|
|
|
- # 读取PDF文件并提取页面
|
|
|
- for page_number, page_layout in enumerate(extract_pages(self.file_path)):
|
|
|
-
|
|
|
- max_start_row = self.outlines.query(f''' page_number <= {page_number+1} ''').query(''' page_number == page_number.max() ''').query(''' level == level.max() ''')
|
|
|
-
|
|
|
- if not max_start_row.empty:
|
|
|
- idx = max_start_row.index.values[0]
|
|
|
- else:
|
|
|
- idx = len(self.outlines.index)
|
|
|
- self.outlines.loc[idx] = {'level': 6, 'title': '', 'page_number': 0, 'text': ''}
|
|
|
-
|
|
|
- # 左侧坐标
|
|
|
- x0s = []
|
|
|
- # 右侧坐标
|
|
|
- x1s = []
|
|
|
-
|
|
|
- title_index = 0
|
|
|
- image_index = 0
|
|
|
-
|
|
|
- for element in page_layout:
|
|
|
- if isinstance(element, LTTextBoxHorizontal):
|
|
|
- x0s.append(element.x0)
|
|
|
- x1s.append(element.x1)
|
|
|
-
|
|
|
- if x0s and x1s:
|
|
|
- # 左侧边缘
|
|
|
- self.left = min(x0s) + 15
|
|
|
- # 右侧边缘
|
|
|
- self.right = max(x1s) - 15
|
|
|
-
|
|
|
- current = None
|
|
|
-
|
|
|
- for element in page_layout:
|
|
|
-
|
|
|
- if isinstance(element, LTLine):
|
|
|
- pass
|
|
|
-
|
|
|
- elif isinstance(element, LTRect):
|
|
|
- pass
|
|
|
-
|
|
|
- elif isinstance(element, LTTextBoxHorizontal):
|
|
|
- # 文本
|
|
|
- text = element.get_text().strip()
|
|
|
-
|
|
|
- # 假设标题通常是一行且字体较大
|
|
|
- if len(element._objs) == 1 and text and (is_title(text) or element.height > 15):
|
|
|
- texts.append({'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text})
|
|
|
- title_index += 1
|
|
|
- self.outlines.at[idx, 'text'] += '\n'
|
|
|
- self.outlines.at[idx, 'text'] += text
|
|
|
-
|
|
|
- # 正文部分
|
|
|
- elif not current or self.can_merge_lines(current, element):# 可以合并
|
|
|
- current = element
|
|
|
- for line in element:
|
|
|
- self.outlines.at[idx, 'text'] += line.get_text().strip()
|
|
|
-
|
|
|
- else:# 不可以合并
|
|
|
- for line in element:
|
|
|
- self.outlines.at[idx, 'text'] += '\n'
|
|
|
- self.outlines.at[idx, 'text'] += line.get_text().strip()
|
|
|
-
|
|
|
- elif image_dir and isinstance(element, LTFigure):
|
|
|
- for e_obj in element._objs:
|
|
|
- if isinstance(e_obj, LTImage):
|
|
|
- # 提取图片数据
|
|
|
- image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}')
|
|
|
- image_file = export_image(e_obj, image_file)
|
|
|
- images.append(image_file)
|
|
|
- pprint(f'Image saved: {image_file}')
|
|
|
- image_index += 1
|
|
|
-
|
|
|
-
|
|
|
- if title_path:
|
|
|
- with open(title_path, 'w', encoding='utf-8') as fp:
|
|
|
- json.dump(texts, fp, indent=4, ensure_ascii=False)
|
|
|
-
|
|
|
- if section_path:
|
|
|
- self.outlines.to_json(section_path, orient='records', lines=True, force_ascii=False)
|
|
|
-
|
|
|
- def extract_toc(self) -> list:
|
|
|
- """PDF大纲解析,依据内容解析
|
|
|
+ # parse_outline用于解析pdf文件大纲,并将解析结果存储为json文件并打印
|
|
|
+ def parse_outline(self, out_path: str = ''):
|
|
|
+ """PDF大纲解析
|
|
|
"""
|
|
|
- results = []
|
|
|
-
|
|
|
- for page_number, page in enumerate(extract_pages(self.file_path)):
|
|
|
-
|
|
|
- is_outline = False
|
|
|
-
|
|
|
- if page_number < 1:
|
|
|
- continue
|
|
|
-
|
|
|
- if page_number > 20:
|
|
|
- break
|
|
|
-
|
|
|
- lines = []
|
|
|
- for element in page:
|
|
|
- if isinstance(element, LTTextBoxHorizontal):
|
|
|
- for line in element:
|
|
|
- lines.append(line.get_text().strip())
|
|
|
-
|
|
|
- for line in lines:
|
|
|
- # 检查是否符合目录格式
|
|
|
- if line and '......' in line and (line[0].isdigit() or '\u4e00' <= line[0] <= '\u9fff') and line[-1].isdigit():
|
|
|
- is_outline = True
|
|
|
- # 计算缩进级别
|
|
|
- indent_level = 1
|
|
|
- # 获取内容
|
|
|
- title = re.findall('^[\d\.、]{0,}[\u4e00-\u9fff、()\s]+', line).pop()
|
|
|
- # 计算页码
|
|
|
- page_n = int(re.findall('\d+$', line).pop())
|
|
|
- # 添加到目录结构中
|
|
|
- # directory_structure.append({
|
|
|
- results.append({
|
|
|
- "level": indent_level,
|
|
|
- "title": title,
|
|
|
- "page_number": page_n
|
|
|
- })
|
|
|
-
|
|
|
- if not is_outline:
|
|
|
- break
|
|
|
|
|
|
+ if os.path.exists(out_path):
|
|
|
+ results = load_json(out_path)
|
|
|
+ else:
|
|
|
+ results = []
|
|
|
+ with open(self.file_path, "rb") as fp:
|
|
|
+ try:
|
|
|
+ parser = PDFParser(fp)
|
|
|
+ document = PDFDocument(parser)
|
|
|
+ ref_pagenum_resolver = RefPageNumberResolver(document)
|
|
|
+ outlines = document.get_outlines()
|
|
|
+ for (level, title, dest, a, se) in outlines:
|
|
|
+ if dest:
|
|
|
+ page_num = ref_pagenum_resolver.resolve(dest)
|
|
|
+ elif a:
|
|
|
+ page_num = ref_pagenum_resolver.resolve(a)
|
|
|
+ elif se:
|
|
|
+ page_num = ref_pagenum_resolver.resolve(se)
|
|
|
+ else:
|
|
|
+ page_num = None
|
|
|
+ results.append({'level': level, 'title': title, 'page_number': page_num})
|
|
|
+ except PDFNoOutlines:
|
|
|
+ print("No outlines found.")
|
|
|
+ except PDFSyntaxError:
|
|
|
+ print("Corrupted PDF or non-PDF file.")
|
|
|
+ finally:
|
|
|
+ parser.close()
|
|
|
+
|
|
|
+ # 将results存储为outlines.json文件
|
|
|
+ if out_path:
|
|
|
+ with open(out_path, 'w', encoding='utf-8') as op:
|
|
|
+ json.dump(results, op, indent=4, ensure_ascii=False)
|
|
|
+ # 输出results
|
|
|
+ print(results)
|
|
|
return results
|
|
|
|
|
|
- def extract_content(self, content_path: str = None) -> list:
|
|
|
+ def extract_content(self) -> list:
|
|
|
with pdfplumber.open(self.file_path) as pdf:
|
|
|
for page in pdf.pages:
|
|
|
self.content.append({
|
|
|
'page_number': page.page_number - 1,
|
|
|
'text': page.extract_text()
|
|
|
})
|
|
|
-
|
|
|
- if content_path:
|
|
|
- with open(content_path, 'w', encoding='utf-8') as fp:
|
|
|
- json.dump(self.content, fp, indent=4, ensure_ascii=False)
|
|
|
-
|
|
|
return self.content
|
|
|
|
|
|
- def parse_outline(self, outline_path: str = None) -> list:
|
|
|
- """PDF大纲解析,依据元数据解析,解析失败则调用内容解析
|
|
|
- """
|
|
|
- results = []
|
|
|
-
|
|
|
- with open(self.file_path, "rb") as fp:
|
|
|
- try:
|
|
|
- parser = PDFParser(fp)
|
|
|
- document = PDFDocument(parser)
|
|
|
- ref_pagenum_resolver = RefPageNumberResolver(document)
|
|
|
- outlines = document.get_outlines()
|
|
|
-
|
|
|
- for (level, title, dest, a, se) in outlines:
|
|
|
- if dest:
|
|
|
- page_num = ref_pagenum_resolver.resolve(dest)
|
|
|
- elif a:
|
|
|
- page_num = ref_pagenum_resolver.resolve(a)
|
|
|
- elif se:
|
|
|
- page_num = ref_pagenum_resolver.resolve(se)
|
|
|
- else:
|
|
|
- page_num = None
|
|
|
- results.append({'level': level, 'title': title, 'page_number': page_num})
|
|
|
-
|
|
|
- except PDFNoOutlines:
|
|
|
- print("No outlines found.")
|
|
|
- except PDFSyntaxError:
|
|
|
- print("Corrupted PDF or non-PDF file.")
|
|
|
- finally:
|
|
|
- parser.close()
|
|
|
-
|
|
|
- if not results:
|
|
|
- results = self.extract_toc()
|
|
|
-
|
|
|
- if outline_path:
|
|
|
- with open(outline_path, 'w', encoding='utf-8') as op:
|
|
|
- json.dump(results, op, indent=4, ensure_ascii=False)
|
|
|
-
|
|
|
- self.outlines = pd.DataFrame(results)
|
|
|
-
|
|
|
- return results
|
|
|
-
|
|
|
- def parse_text(self) -> None:
|
|
|
+ def parse_text(self, out_path: str = ''):
|
|
|
"""文本解析
|
|
|
"""
|
|
|
- for page_number, page_layout in enumerate(extract_pages(self.file_path)):
|
|
|
- for element in page_layout:
|
|
|
- if isinstance(element, LTTextBoxHorizontal):
|
|
|
- # 距离左侧
|
|
|
- left = element.x0
|
|
|
- # 距离右侧
|
|
|
- right = (page_layout.width - element.x1)
|
|
|
- # 距离上侧
|
|
|
- top = (page_layout.height - element.y1)
|
|
|
- # 距离下侧
|
|
|
- button = element.y0
|
|
|
- # 文本宽度
|
|
|
- width = element.width
|
|
|
- if (left > right) and (abs(left - right) > 100):
|
|
|
- alignment = 'right'
|
|
|
- elif (left > 100) and (abs(left - right) < 50) and ((abs(left - right) / width) < 0.5):
|
|
|
- alignment = 'center'
|
|
|
- else:
|
|
|
- alignment = 'left'
|
|
|
- self.details.append({
|
|
|
- 'page_number': page_number,
|
|
|
- 'index': element.index,
|
|
|
- 'x0': element.bbox[0],
|
|
|
- 'y0': element.bbox[1],
|
|
|
- 'x1': element.bbox[2],
|
|
|
- 'y1': element.bbox[3],
|
|
|
- 'alignment': alignment,
|
|
|
- 'lines': len(element._objs),
|
|
|
- 'text': element.get_text().strip(),
|
|
|
- 'is_table_name': element.get_text().strip().endswith('表')
|
|
|
- })
|
|
|
+ if os.path.exists(out_path):
|
|
|
+ self.details = load_json(out_path)
|
|
|
+ else:
|
|
|
+ # 循环遍历每一页的布局
|
|
|
+ for page_number, page_layout in tqdm(enumerate(extract_pages(self.file_path)), total=self.total_page):
|
|
|
+ # 遍历当前页面中的元素
|
|
|
+ for element in page_layout:
|
|
|
+ # 如果当前元素属于LTTextBoxHorizontal类型
|
|
|
+ # 计算文本框左侧、右侧、上侧、下侧距离页边界的距离以及文本框的宽度
|
|
|
+ if isinstance(element, LTTextBoxHorizontal):
|
|
|
+ # 距离左侧
|
|
|
+ left = element.x0
|
|
|
+ # 距离右侧
|
|
|
+ right = (page_layout.width - element.x1)
|
|
|
+ # 距离上侧
|
|
|
+ top = (page_layout.height - element.y1)
|
|
|
+ # 距离下侧
|
|
|
+ button = element.y0
|
|
|
+ # 文本宽度
|
|
|
+ width = element.width
|
|
|
+ # 确认文本框的对齐方式
|
|
|
+ if (left > right) and (abs(left - right) > 100):
|
|
|
+ alignment = 'right'
|
|
|
+ elif (left > 100) and (abs(left - right) < 50) and ((abs(left - right) / width) < 0.5):
|
|
|
+ alignment = 'center'
|
|
|
+ else:
|
|
|
+ alignment = 'left'
|
|
|
+ # 将element的解析结果存储到列表中
|
|
|
+ self.details.append({
|
|
|
+ 'page_number': page_number,
|
|
|
+ 'index': element.index,
|
|
|
+ 'x0': element.bbox[0],
|
|
|
+ 'y0': element.bbox[1],
|
|
|
+ 'x1': element.bbox[2],
|
|
|
+ 'y1': element.bbox[3],
|
|
|
+ 'alignment': alignment,
|
|
|
+ 'lines': len(element._objs),
|
|
|
+ 'text': element.get_text().strip(),
|
|
|
+ 'is_table_name': element.get_text().strip().endswith('表')
|
|
|
+ })
|
|
|
+ if out_path:
|
|
|
+ with open(out_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(self.details, fp, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ # 转为pandas的DataFrame格式,存储到self.detail_df中
|
|
|
self.detail_df = pd.DataFrame(self.details)
|
|
|
+ return self.details
|
|
|
|
|
|
+ # 与之前的函数一致,此函数专注于解析某一页的表格数据
|
|
|
def concat_table(self, table: list, page_number: int, table_name: str = None, new: bool = False) -> None:
|
|
|
"""尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表
|
|
|
@table
|
|
|
"""
|
|
|
first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]]
|
|
|
|
|
|
+ # 如果指定当前table为新表(即new=True),直接添加为新表
|
|
|
if new:
|
|
|
- self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
+ self.tables.append(
|
|
|
+ {"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table,
|
|
|
+ "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
return
|
|
|
|
|
|
tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]]
|
|
|
+ # 表格行数 > 1
|
|
|
if len(table) > 1:
|
|
|
second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]]
|
|
|
else:
|
|
|
second = None
|
|
|
# pprint(first)
|
|
|
- if not self.tables or len(HEADERS & set(first)) > 2:
|
|
|
+ if len(HEADERS & set(first)) > 2:
|
|
|
# pprint("找到大量表头元素,判断为独立表头,生成新表!")
|
|
|
- self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
+ self.tables.append(
|
|
|
+ {"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table,
|
|
|
+ "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
elif second and (len(HEADERS & set(second)) > 2):
|
|
|
# pprint("找到大量表头元素,判断为独立表头,生成新表!")
|
|
|
if not table_name:
|
|
|
first = [i for i in first if i]
|
|
|
if len(first) == 1:
|
|
|
table_name = "".join(first)
|
|
|
- self.tables.append({"page_numbers": [page_number], "title_len": len(second), "col_len": len(table[-1]), "table": table[1:], "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
- elif ((page_number-1) in self.tables[-1]['page_numbers']) and (len(first) == self.tables[-1]['col_len']):
|
|
|
+ self.tables.append(
|
|
|
+ {"page_numbers": [page_number], "title_len": len(second), "col_len": len(table[-1]), "table": table[1:],
|
|
|
+ "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
+ # TODO 目前会因为开头一页具备多张表格而导致此时self.tables=[],因此使用self.tables[-1]出现list index out of range的情况
|
|
|
+ # TODO 处理self.tables[-1]出现list index out of range的情况
|
|
|
+
|
|
|
+ # 添加判断条件(当self.tables中没有表格项时自动添加为新表)
|
|
|
+ elif len(self.tables) == 0:
|
|
|
+ # pprint("判断为起始表格,生成新表!")
|
|
|
+ self.tables.append(
|
|
|
+ {"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table,
|
|
|
+ "confidence": 1, "table_name": table_name if table_name else ""})
|
|
|
+
|
|
|
+ elif ((page_number - 1) in self.tables[-1]['page_numbers']) and (len(first) == self.tables[-1]['col_len']):
|
|
|
# pprint("有空列,不是单独表,直接合并")
|
|
|
self.tables[-1]['page_numbers'].append(page_number)
|
|
|
self.tables[-1]['table'].extend(table)
|
|
|
else:
|
|
|
- self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0, "table_name": table_name if table_name else ""})
|
|
|
+ self.tables.append(
|
|
|
+ {"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table,
|
|
|
+ "confidence": 0, "table_name": table_name if table_name else ""})
|
|
|
|
|
|
- def parse_table_pro(self, table_path: str = 'all_tables.json') -> None:
|
|
|
+ # 表格解析的主函数
|
|
|
+ ### 注意!!self.detail_df存储所有LTTextBoxHorizontal类(文本框)的元素细节
|
|
|
+ def parse_table(self, out_path: str = '', start: int = None, end: int = None):
|
|
|
"""表格解析
|
|
|
"""
|
|
|
- if self.detail_df == None:
|
|
|
- self.parse_text()
|
|
|
|
|
|
+ assert ((start is None and end is None) or (start is not None and end is not None))
|
|
|
+ if start is None:
|
|
|
+ start = 0
|
|
|
+ end = float('inf')
|
|
|
with pdfplumber.open(self.file_path) as pdf:
|
|
|
- for page_number, page_layout in enumerate(pdf.pages):
|
|
|
+ # 遍历pdf的每一页
|
|
|
+ for page_number, page_layout in tqdm(enumerate(pdf.pages), total=self.total_page):
|
|
|
# 查询是否存在表格
|
|
|
- tables = page_layout.find_tables()
|
|
|
-
|
|
|
- if not tables:
|
|
|
+ if not (start <= page_number <= end):
|
|
|
continue
|
|
|
-
|
|
|
-
|
|
|
- tables_pro = camelot.read_pdf(
|
|
|
- self.file_path,
|
|
|
- # flavor='stream',
|
|
|
- pages=str(page_number+1),
|
|
|
- # edge_tol=200,
|
|
|
- )
|
|
|
-
|
|
|
- if not tables_pro:
|
|
|
- continue
|
|
|
-
|
|
|
- print(len(tables), len(tables_pro))
|
|
|
-
|
|
|
+ tables = page_layout.find_tables()
|
|
|
# 检测到该页面存在一个表格,对其进行合并判断
|
|
|
- if (len(tables) != 0) and (len(tables_pro) == 1):
|
|
|
- print(f"解析PDF{page_number}页的表格")
|
|
|
- # print(f"解析PDF{page_number}页的表格")
|
|
|
+ if len(tables) == 1:
|
|
|
table = tables[0]
|
|
|
- table_pro = tables_pro[0].df.to_dict(orient='split')['data']
|
|
|
+ # 获取当前表格的边检框坐标
|
|
|
x0, y0, x1, y1 = table.bbox
|
|
|
- table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''')
|
|
|
+ # 查询detail_df中是否有符合条件的表格标题
|
|
|
+ table_title_df = self.detail_df.query(
|
|
|
+ f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''')
|
|
|
+ # 如果找不到符合条件的表格标题
|
|
|
+ # 则调用concat_table()
|
|
|
+ # 将表格内容连接起来
|
|
|
+ # 如果找到了符合条件的表格标题
|
|
|
+ # 则先获取表格标题
|
|
|
+ # 将表格标题和内容一起传递给concat_table()
|
|
|
if table_title_df.empty:
|
|
|
- self.concat_table(table_pro, page_number=page_number)
|
|
|
+ print(f'processing page_number: {page_number}')
|
|
|
+ self.concat_table(table.extract(), page_number=page_number)
|
|
|
else:
|
|
|
table_title_name = table_title_df.iloc[0]['text']
|
|
|
- self.concat_table(table_pro, page_number=page_number, table_name=table_title_name)
|
|
|
+ print(f'processing page_number with table_name: {table_title_name}')
|
|
|
+ self.concat_table(table.extract(), page_number=page_number, table_name=table_title_name)
|
|
|
table = tables[0]
|
|
|
+ # self.concat_table(table.extract(), table_title_name)
|
|
|
# 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干
|
|
|
- elif len(tables_pro) > 1:
|
|
|
- print(f"解析PDF{page_number}页的表格")
|
|
|
- first_table = tables_pro[0]
|
|
|
- self.concat_table(first_table.df.to_dict(orient='split')['data'], page_number=page_number)
|
|
|
- for table_index in range(1, len(tables_pro)):
|
|
|
- self.concat_table(tables_pro[table_index].df.to_dict(orient='split')['data'], page_number=page_number, new=True)
|
|
|
-
|
|
|
- if table_path:
|
|
|
- with open(table_path, 'w', encoding='utf-8') as fp:
|
|
|
+ ### 暂时未对一页具有多个表格的情况进行处理
|
|
|
+ elif len(tables) > 1:
|
|
|
+ print(f'current page {page_number} has multiple tables')
|
|
|
+ # import pdb; pdb.set_trace()
|
|
|
+ # TODO 暂未对一页多张表格的table_name匹配算法进行实现
|
|
|
+ # 对第一个表格进行合并判断
|
|
|
+ first_table = tables[0]
|
|
|
+ self.concat_table(first_table.extract(), page_number=page_number)
|
|
|
+ # 剩余表格指定new = True
|
|
|
+ for table_index in range(1, len(tables)):
|
|
|
+ self.concat_table(tables[table_index].extract(), page_number=page_number, new=True)
|
|
|
+
|
|
|
+ if out_path:
|
|
|
+ with open(out_path, 'w', encoding='utf-8') as fp:
|
|
|
json.dump(self.tables, fp, indent=4, ensure_ascii=False)
|
|
|
+ return self.tables
|
|
|
+
|
|
|
+ def output(self, table_path: str = 'all_tables.json'):
|
|
|
+ """结果输出
|
|
|
+ """
|
|
|
+ with open(table_path, 'w', encoding='utf-8') as fp:
|
|
|
+ json.dump(self.tables, fp, indent=4, ensure_ascii=False)
|
|
|
|
|
|
return self.tables
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- pdf_path = './投标文件-修改版9-5-1-1.pdf'
|
|
|
- # pdf_path = './南方电网数字研究院有限公司.pdf'
|
|
|
- # pdf_path = './2022年度工程类-公招采购资料/2022-2025年度三峡电站9台机组检修密封加工制作重新招标/2022-2025年度三峡电站9台机组检修密封加工制作重新招标招标文件印刷版.pdf'
|
|
|
+ # pdf_path = './投标文件-修改版9-5-1-1.pdf'
|
|
|
+ pdf_path = './南方电网数字研究院有限公司.pdf'
|
|
|
# title_path = './投标文件-修改版9-5-1-1.json'
|
|
|
- # title_path = './投标文件-修改版9-5-1-1-title.json'
|
|
|
- # title_path = './南方电网数字研究院有限公司.json'
|
|
|
- # section_path = './投标文件-修改版9-5-1-1-section.json'
|
|
|
- # section_path = './南方电网数字研究院有限公司-section.json'
|
|
|
- # image_dir = './extracted_images'
|
|
|
- # os.makedirs(image_dir, exist_ok=True)
|
|
|
-
|
|
|
+ title_path = './南方电网数字研究院有限公司.json'
|
|
|
+ image_dir = './extracted_images'
|
|
|
+ os.makedirs(image_dir, exist_ok=True)
|
|
|
+ main_parse(pdf_path=pdf_path, title_path=title_path, image_dir=image_dir)
|
|
|
# tables = table_parse(pdf_path=pdf_path, title_path=title_path, start_title='六、已标价工程量清单', end_title = '七、施工组织设计')
|
|
|
# tables = table_parse(pdf_path=pdf_path, title_path=title_path, start_page_number=0, end_page_number=725)
|
|
|
|
|
|
- # pdf_path = './2022年度工程类-公招采购资料/三峡右岸电站35kV及10kV厂用电系统保护装置换型/三峡右岸电站35kV和10kV厂用电系统保护装置换型招标文件审批稿 (3).pdf'
|
|
|
- # table_path = './2022年度工程类-公招采购资料/三峡右岸电站35kV及10kV厂用电系统保护装置换型/三峡右岸电站35kV和10kV厂用电系统保护装置换型招标文件审批稿 (3)-table.json'
|
|
|
-
|
|
|
- pdf_path = './2022年度工程类-公招采购资料/三峡左岸及地下电站地坪整治/三峡左岸及地下电站地坪整治招标文件(发售版).pdf'
|
|
|
- table_path = './2022年度工程类-公招采购资料/三峡左岸及地下电站地坪整治/三峡左岸及地下电站地坪整治招标文件(发售版)-table.json'
|
|
|
-
|
|
|
agent = PdfExtractAttr(file_path=pdf_path)
|
|
|
- # agent.parse_outline()
|
|
|
- # agent.main_parse(title_path=title_path, section_path=section_path)
|
|
|
- agent.parse_table_pro(table_path=table_path)
|
|
|
+ print(agent.extract_content())
|
|
|
+ agent.parse_outline()
|
|
|
+ agent.parse_text()
|
|
|
+ agent.parse_table()
|
|
|
+ agent.output()
|