123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573 |
- '''招标文件内容提取'''
- import pandas as pd
- import numpy as np
- import pdfplumber
- import json
- import os
- import re
- import cv2
- from io import BytesIO
- from pdfminer.layout import LTRect, LTTextBoxHorizontal, LTLine, LTFigure, LTCurve, LTImage, LTChar
- from pdfminer.high_level import extract_pages
- from pdfminer.pdfcolor import LITERAL_DEVICE_CMYK
- from pdfminer.pdftypes import (
- LITERALS_DCT_DECODE,
- LITERALS_JBIG2_DECODE,
- LITERALS_JPX_DECODE,
- LITERALS_FLATE_DECODE,
- )
- from pprint import pprint
- from pdfminer.pdfparser import PDFParser, PDFSyntaxError
- from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines
- import pdfplumber
- import camelot
- from .tools import RefPageNumberResolver
- HEADERS = set({'序号', '项目编码', '项目名称', '项目特征', '单位', '工程量', '全费用综合单价', '合价', '备注', '主材名称', '规格型号', '不低于下列同档次品牌', '投标选用品牌及规格型号', '名称', '事项', '数量', '含税单价(元)', '含税合价(元)', '条款号', '评分因素', '评分标准', '页码'})
- HEADERS |= set({'条款号' ,'评审因素' ,'评审标准', ''})
- def is_title(line: str) -> bool:
- title_word = re.findall('^[(\(][一二三四五六七八九十]+[\))]|^\d\.|^1\d\.|^2\d\.|^[第][一二三四五六七八九十\d]+[章节条]|[一二三四五六七八九十]+[、要是]', line.strip())
- if title_word:
- return True
- title_word = re.findall('^附录|^参考文献|^附表', line.strip())
- if title_word:
- return True
- return False
- PIL_ERROR_MESSAGE = "PIL导入错误"
- def _save_jpeg(image: LTImage, path: str) -> str:
- """Save a JPEG encoded image"""
- raw_data = image.stream.get_rawdata()
- assert raw_data is not None
- path = path + ".jpg"
- with open(path, "wb") as fp:
- if LITERAL_DEVICE_CMYK in image.colorspace:
- try:
- from PIL import Image, ImageChops # type: ignore[import]
- except ImportError:
- raise ImportError(PIL_ERROR_MESSAGE)
-
- ifp = BytesIO(raw_data)
- i = Image.open(ifp)
- i = ImageChops.invert(i)
- i = i.convert("RGB")
- i.save(fp, "JPEG")
- else:
- fp.write(raw_data)
- return path
- def _save_jpeg2000(image: LTImage, path: str) -> str:
- """Save a JPEG 2000 encoded image"""
- raw_data = image.stream.get_rawdata()
- assert raw_data is not None
- path = path + ".png"
- try:
- from PIL import Image # type: ignore[import]
- except ImportError:
- raise ImportError(PIL_ERROR_MESSAGE)
- # 如果我们只写原始数据,我尝试过的大多数图像程序都无法打开文件。
- # 然而,使用OpenCV2打开和保存会生成一个文件,该文件似乎很容易被其他程序打开
- ifp = BytesIO(raw_data)
- i = Image.open(ifp)
- opencv_image = cv2.cvtColor(np.array(i), cv2.COLOR_RGB2BGR)
- cv2.imwrite(path, opencv_image)
- return path
- def export_image(image: LTImage, path: str) -> str:
- """Save an LTImage to disk"""
- (width, height) = image.srcsize
- filters = image.stream.get_filters()
- if len(filters) == 1 and filters[0][0] in LITERALS_DCT_DECODE:
- name = _save_jpeg(image, path)
- return name
- elif len(filters) == 1 and filters[0][0] in LITERALS_JPX_DECODE:
- name = _save_jpeg2000(image, path)
- return name
- data = image.stream.get_data()
- raw_data = image.stream.get_rawdata()
- if data:
- if data[:2] == b'\xff\xd8' and data[-2:] == b'\xff\xd9':
- path += '.jpg'
- with open(path, 'wb') as file:
- file.write(data)
- return path
- elif data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a':
- path += '.png'
- with open(path, 'wb') as file:
- file.write(data)
- return path
- elif data[:2] == b'\x42\x4d':
- path += '.bmp'
- with open(path, 'wb') as file:
- file.write(data)
- return path
- elif data[:6] == b'\x47\x49\x46\x38\x37\x61' or data[:6] == b'\x47\x49\x46\x38\x39\x61':
- path += '.gif'
- with open(path, 'wb') as file:
- file.write(data)
- return path
- elif data[:2] == b'\x4d\x4d' or data[:2] == b'\x49\x49':
- path += '.tiff'
- with open(path, 'wb') as file:
- file.write(data)
- return path
- else:
- path += '.unk'
- with open(path, 'wb') as file:
- file.write(data)
- return path
- elif raw_data:
- if raw_data[:2] == b'\xff\xd8' and raw_data[-2:] == b'\xff\xd9':
- path += '.jpg'
- with open(path, 'wb') as file:
- file.write(raw_data)
- return path
- elif raw_data[:8] == b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a':
- path += '.png'
- with open(path, 'wb') as file:
- file.write(raw_data)
- return path
- elif raw_data[:2] == b'\x42\x4d':
- path += '.bmp'
- with open(path, 'wb') as file:
- file.write(raw_data)
- return path
- elif raw_data[:6] == b'\x47\x49\x46\x38\x37\x61' or raw_data[:6] == b'\x47\x49\x46\x38\x39\x61':
- path += '.gif'
- with open(path, 'wb') as file:
- file.write(raw_data)
- return path
- elif raw_data[:2] == b'\x4d\x4d' or raw_data[:2] == b'\x49\x49':
- path += '.tiff'
- with open(path, 'wb') as file:
- file.write(raw_data)
- return path
- else:
- path += '.unk'
- with open(path, 'wb') as file:
- file.write(raw_data)
- return path
- else:
- return None
- def main_parse(pdf_path: str, title_path: str, image_dir: str) -> None:
- texts = []
- images = []
- # 读取PDF文件并提取页面
- for page_number, page_layout in enumerate(extract_pages(pdf_path)):
- title_index = 0
- image_index = 0
- for element in page_layout:
- if isinstance(element, LTLine):
- pass
- elif isinstance(element, LTRect):
- pass
- elif isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1:
- text = element.get_text().strip()
- # # 假设标题通常是一行且字体较大
- if text and (is_title(text) or element.height > 15):
- texts.append({'index': title_index, 'page_number': page_number, 'bbox': element.bbox, 'text': text})
- title_index += 1
- # elif isinstance(element, LTFigure):
- # for e_obj in element._objs:
- # if isinstance(e_obj, LTImage):
- # # 提取图片数据
- # image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}')
- # image_file = export_image(e_obj, image_file)
- # images.append(image_file)
- # pprint(f'Image saved: {image_file}')
- # image_index += 1
- with open(title_path, 'w', encoding='utf-8') as fp:
- json.dump(texts, fp, indent=4, ensure_ascii=False)
- return title_path,image_dir
- from typing import Optional, List
- def parse_title(file_path: str, title_path: Optional[str] = None) -> list:
- """
- 标题解析,用于报价唯一
- Args:
- title_path: 保存路径
- Returns:
- results: 标题列表
- """
- results = []
- seq_num = 0
- for page_number, page_layout in enumerate(extract_pages(file_path)):
- title_index = 0
- for element in page_layout:
- if isinstance(element, LTTextBoxHorizontal) and len(element._objs) == 1:
- text = element.get_text().strip()
- if text and (is_title(text) or element.height > 15):
- results.append({
- 'index': title_index,
- 'page_number': page_number,
- 'bbox': element.bbox,
- 'text': text,
- 'title': text,
- 'seq_num': seq_num
- })
- seq_num += 1
- title_index += 1
- if title_path:
- with open(title_path, 'w', encoding='utf-8') as fp:
- json.dump(results, fp, indent=4, ensure_ascii=False)
- return title_path
- def parse_image(file_path: str, image_dir: str, image_meta_path: str) -> List[dict]:
- """
- 解析PDF中的图片
- Args:
- image_dir: 解析目录
- Returns:
- image_list: 图片列表
- """
- image_list = []
- for page_number, page_layout in enumerate(extract_pages(file_path)):
- image_index = 0
- for element in page_layout:
- if isinstance(element, LTFigure):
- for e_obj in element._objs:
- if isinstance(e_obj, LTImage):
- # 提取图片数据
- image_file = os.path.join(image_dir, f'image_page_{page_number}_{image_index}')
- image_file = export_image(e_obj, image_file)
- image_list.append({
- "image_index": image_index,
- "page_number": page_number,
- "image_name": image_file
- })
- image_index += 1
- if image_meta_path:
- with open(image_meta_path, 'w', encoding='utf-8') as fp:
- json.dump(image_list, fp, indent=4, ensure_ascii=False)
- return image_meta_path
- def table_parse(pdf_path: str,
- title_path: str,
- start_title: str = '第三章 评标办法(综合评估法)',
- end_title: str = '第四章 合同条款及格式',
- table_path: str = None,
- start_page_number: int = None,
- end_page_number: int = None
- ) -> list:
- """pdf表格解析功能
- @pdf_path
- @title_path
- @start_title
- @end_title
- @table_path
- @start_page_number
- @end_page_number
- """
- tables = []
- if (start_page_number == None) or (end_page_number == None):
- df = pd.read_json(title_path)
- start_page_number = df[df['text'] == start_title].page_number.max()
- end_page_number = df[df['text'] == end_title].page_number.max()
- def concat_table(tables, table):
- """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表
- @tables
- @table
- """
- first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]]
- tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]]
- if len(table) > 1:
- second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]]
- # pprint(first)
- if len(HEADERS & set(first)) > 2:
- # pprint("找到大量表头元素,判断为独立表头,生成新表!")
- tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1})
- elif ((i-1) in tables[-1]['page_numbers']) and (len(first) == tables[-1]['col_len']):
- # pprint("有空列,不是单独表,直接合并")
- tables[-1]['page_numbers'].append(i)
- tables[-1]['table'].extend(table)
- else:
- tables.append({"page_numbers": [i], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0})
- return tables
- with pdfplumber.open(pdf_path) as pdf:
- print(start_page_number, end_page_number)
- for i in range(start_page_number, end_page_number):
- for table in pdf.pages[i].extract_tables():
- tables = concat_table(tables, table)
- with open(table_path, 'w', encoding='utf-8') as fp:
- json.dump(tables, fp, indent=4, ensure_ascii=False)
- return table_path
- class PdfExtractAttr_(object):
- def __init__(self, file_path: str):
- """PDF文件解析
- @file_path
- """
- super(PdfExtractAttr_, self).__init__()
- self.file_path = file_path
- self.details = []
- self.tables = []
- self.content = []
- self.chapters = []
- self.references = []
- self.detail_df = None
- self.outlines = None
- def parse_outline(self):
- """PDF大纲解析
- """
- results = []
- with open(self.file_path, "rb") as fp:
- try:
- parser = PDFParser(fp)
- document = PDFDocument(parser)
- ref_pagenum_resolver = RefPageNumberResolver(document)
- outlines = document.get_outlines()
- for (level, title, dest, a, se) in outlines:
- if dest:
- page_num = ref_pagenum_resolver.resolve(dest)
- elif a:
- page_num = ref_pagenum_resolver.resolve(a)
- elif se:
- page_num = ref_pagenum_resolver.resolve(se)
- else:
- page_num = None
- results.append({'level': level, 'title': title, 'page_number': page_num})
- except PDFNoOutlines:
- print("No outlines found.")
- except PDFSyntaxError:
- print("Corrupted PDF or non-PDF file.")
- finally:
- parser.close()
- with open('outlines.json', 'w', encoding='utf-8') as op:
- json.dump(results, op, indent=4, ensure_ascii=False)
- # print(results)
-
- def extract_content(self, content_path: str = None) -> list:
- with pdfplumber.open(self.file_path) as pdf:
- for page in pdf.pages:
- self.content.append({
- 'page_number': page.page_number - 1,
- 'text': page.extract_text()
- })
-
-
- with open(content_path, 'w', encoding='utf-8') as fp:
- json.dump(self.content, fp, indent=4, ensure_ascii=False)
- return content_path
- def parse_text(self) -> None:
- """文本解析
- """
- for page_number, page_layout in enumerate(extract_pages(self.file_path)):
- for element in page_layout:
- if isinstance(element, LTTextBoxHorizontal):
- # 距离左侧
- left = element.x0
- # 距离右侧
- right = (page_layout.width - element.x1)
- # 距离上侧
- top = (page_layout.height - element.y1)
- # 距离下侧
- button = element.y0
- # 文本宽度
- width = element.width
- if (left > right) and (abs(left - right) > 100):
- alignment = 'right'
- elif (left > 100) and (abs(left - right) < 50) and ((abs(left - right) / width) < 0.5):
- alignment = 'center'
- else:
- alignment = 'left'
- self.details.append({
- 'page_number': page_number,
- 'index': element.index,
- 'x0': element.bbox[0],
- 'y0': element.bbox[1],
- 'x1': element.bbox[2],
- 'y1': element.bbox[3],
- 'alignment': alignment,
- 'lines': len(element._objs),
- 'text': element.get_text().strip(),
- 'is_table_name': element.get_text().strip().endswith('表')
- })
- self.detail_df = pd.DataFrame(self.details)
- def concat_table(self, table: list, page_number: int, table_name: str = None, new: bool = False) -> None:
- """尝试将表添加到结果列中,有两种情况,直接添加一个新表;拼接最后一个表
- @table
- """
- first = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[0]]
- if new:
- self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""})
- return
- tail = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[-1]]
- if len(table) > 1:
- second = [''.join([i for i in cell.split() if i]) if cell else cell for cell in table[1]]
- else:
- second = None
- # pprint(first)
- if not self.tables or len(HEADERS & set(first)) > 2:
- # pprint("找到大量表头元素,判断为独立表头,生成新表!")
- self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 1, "table_name": table_name if table_name else ""})
- elif second and (len(HEADERS & set(second)) > 2):
- # pprint("找到大量表头元素,判断为独立表头,生成新表!")
- if not table_name:
- first = [i for i in first if i]
- if len(first) == 1:
- table_name = "".join(first)
- self.tables.append({"page_numbers": [page_number], "title_len": len(second), "col_len": len(table[-1]), "table": table[1:], "confidence": 1, "table_name": table_name if table_name else ""})
- elif ((page_number-1) in self.tables[-1]['page_numbers']) and (len(first) == self.tables[-1]['col_len']):
- # pprint("有空列,不是单独表,直接合并")
- self.tables[-1]['page_numbers'].append(page_number)
- self.tables[-1]['table'].extend(table)
- else:
- self.tables.append({"page_numbers": [page_number], "title_len": len(first), "col_len": len(table[-1]), "table": table, "confidence": 0, "table_name": table_name if table_name else ""})
-
- def parse_table(self) -> None:
- """表格解析
- """
- with pdfplumber.open(self.file_path) as pdf:
- for page_number, page_layout in enumerate(pdf.pages):
- # 查询是否存在表格
- tables = page_layout.find_tables()
- # 检测到该页面存在一个表格,对其进行合并判断
- if len(tables) == 1:
- table = tables[0]
- x0, y0, x1, y1 = table.bbox
- table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''')
- if table_title_df.empty:
- self.concat_table(table.extract(), page_number=page_number)
- else:
- table_title_name = table_title_df.iloc[0]['text']
- self.concat_table(table.extract(), page_number=page_number, table_name=table_title_name)
- table = tables[0]
- #self.concat_table(table.extract(), table_title_name)
- # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干
- elif len(tables) > 1:
- pass
-
- def parse_table_pro(self, table_path: str = 'all_tables.json') -> str:
- """表格解析
- """
- if self.detail_df == None:
- self.parse_text()
- with pdfplumber.open(self.file_path) as pdf:
- for page_number, page_layout in enumerate(pdf.pages):
- # 查询是否存在表格
- tables = page_layout.find_tables()
- if not tables:
- continue
- tables_pro = camelot.read_pdf(
- self.file_path,
- # flavor='stream',
- pages=str(page_number+1),
- # edge_tol=200,
- )
- if not tables_pro:
- continue
- print(len(tables), len(tables_pro))
- # 检测到该页面存在一个表格,对其进行合并判断
- if (len(tables) != 0) and (len(tables_pro) == 1):
- print(f"解析PDF{page_number}页的表格")
- # print(f"解析PDF{page_number}页的表格")
- table = tables[0]
- table_pro = tables_pro[0].df.to_dict(orient='split')['data']
- x0, y0, x1, y1 = table.bbox
- table_title_df = self.detail_df.query(f''' page_number == {page_number} and is_table_name == True and alignment == "center" ''')
- if table_title_df.empty:
- self.concat_table(table_pro, page_number=page_number)
- else:
- table_title_name = table_title_df.iloc[0]['text']
- self.concat_table(table_pro, page_number=page_number, table_name=table_title_name)
- table = tables[0]
- # 检测到存在多个表格,对第一个表格进行合并判断之后的表格一定不相干
- elif len(tables_pro) > 1:
- print(f"解析PDF{page_number}页的表格")
- first_table = tables_pro[0]
- self.concat_table(first_table.df.to_dict(orient='split')['data'], page_number=page_number)
- for table_index in range(1, len(tables_pro)):
- self.concat_table(tables_pro[table_index].df.to_dict(orient='split')['data'], page_number=page_number, new=True)
- with open(table_path, 'w', encoding='utf-8') as fp:
- json.dump(self.tables, fp, indent=4, ensure_ascii=False)
- return table_path
- if __name__ == '__main__':
- # pdf_path = 'data/预审查数据/基于物联网技术的三峡坝区智慧仓储研究与建设招标文件-发出.pdf'
- # image_dir = 'data/预审查数据/extracted_images'
- # title_path = 'data/预审查数据/基于物联网技术的三峡坝区智慧仓储研究与建设招标文件-发出.json'
- # pdf_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.pdf'
- # image_dir = 'data/预审查数据/extracted_images'
- # title_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json'
- # os.makedirs(image_dir, exist_ok=True)
- # main_parse(pdf_path=pdf_path, title_path=title_path, image_dir=image_dir)
- # table_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json'
- # content_path = '/mnt/d/Work_PWS/财报素材/财报素材/财报素材/600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告.json'
- # agent = PdfExtractAttr_(file_path=pdf_path)
- ## agent.extract_content(content_path=content_path)
- # contents = agent.output_()
-
- # agent.parse_text()
- # agent.parse_table()
- ## agent.parse_table_pro(table_path=table_path)
- # all_tables = agent.output()
- import glob
- dir_path = 'data/财报素材'
- for pdf_path in glob.glob(f'{dir_path}/*.pdf'):
- print(pdf_path)
- if '600000_20241031_上海浦东发展银行股份有限公司2024年第三季度报告' not in pdf_path: continue
- agent = PdfExtractAttr_(file_path=pdf_path)
- content_path = f'{dir_path}/{pdf_path.split("/")[-1].split(".")[0]}_content.json'
- agent.extract_content(content_path=content_path)
- table_path = f'{dir_path}/{pdf_path.split("/")[-1].split(".")[0]}_table.json'
- agent.parse_table_pro(table_path=table_path)
|