|
- ### 解析所有pdf文件并提取信息进行测试的框架
- ### PdfExtractAttr作为提取pdf信息的基类
- # 子类在其基础上实现匹配功能
- # 标准包导入
- import os
- import re
- import json
- import re
- import shutil
- import pandas as pd
- import pdb
- import base64
- from io import BytesIO
- from pprint import pprint
- # 第三方包导入
- import numpy as np
- import pandas as pd
- import cv2
- import torch
- import glob
- import logging
- import requests
- import time
- import datetime
- from tqdm import tqdm
- from tools import RefPageNumberResolver
- from get_info import PdfExtractAttr
- from get_info import is_title, export_image, _save_jpeg, _save_jpeg2000, _save_bmp, main_parse, table_parse, load_json
- from PIL import Image
- from pdfminer.image import ImageWriter
- # tools function
- def create_logger(log_path):
- """
- 将日志输出到日志文件和控制台
- """
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- formatter = logging.Formatter(
- '%(asctime)s - %(levelname)s - %(message)s')
- # 创建一个handler,用于写入日志文件
- file_handler = logging.FileHandler(
- filename=log_path, mode='w')
- file_handler.setFormatter(formatter)
- file_handler.setLevel(logging.INFO)
- logger.addHandler(file_handler)
- # 创建一个handler,用于将日志输出到控制台
- console = logging.StreamHandler()
- console.setLevel(logging.DEBUG)
- console.setFormatter(formatter)
- logger.addHandler(console)
- return logger
- # ocr外部接口
- class OcrAgent():
- def __init__(self, url):
- self.url = url
- self.datetime_re = r'\d{4}年\d{1,2}月\d{1,2}日至(?:\d{4}年\d{1,2}月\d{1,2}日|长期)'
- # 不同类型证书资质正则
- self.re_dict = {
- "business_license" : r'营业执照',
- "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$',
- "production_license": r'\b[\u4e00-\u9fff]*许可证\b',
- "qualtifications" : r'\b[\u4e00-\u9fff]*证书',
- "proof": r'\b[\u4e00-\u9fff]*证明',
- }
- # 字迹阈值
- self.sign_threshold = 0.05
- # 获取图像的ocr信息
- def get_content(self, image_path):
- try:
- with open(image_path, 'rb') as image_file:
- files = {"file": ("image.jpg", image_file, "image/jpeg")}
- response = requests.post(self.url, files=files)
- return response.json()
-
- except:
- raise ValueError(f"传入图像{image_path}已损坏")
- # 移除图像上的红色印章
- def remove_red_seal(self, image_path):
- # 读取图像
- input_img = cv2.imread(image_path)
- # 分离图片的通道
- blue_c, green_c, red_c = cv2.split(input_img)
- #利用大津法自动选择阈值
- thresh, ret = cv2.threshold(red_c, 0, 255, cv2.THRESH_OTSU)
- #对阈值进行调整
- filter_condition = int(thresh * 1.0)
- #移除红色的印章
- _, red_thresh = cv2.threshold(red_c, filter_condition, 255, cv2.THRESH_BINARY)
- # 把图片转回3通道
- result_img = np.expand_dims(red_thresh, axis=2)
- result_img = np.concatenate((result_img, result_img, result_img), axis=-1)
- return result_img
- # 判断图像是否为某公司的营业执照或资质证书信息,并返回提取到的信息
- def judge(self, image_path: str, firm_name: str):
- # 以下实现要求image_path的路径如下例所示:
- # ./test/image_page_12_0.jpg
- # 12代表当前图像在pdf中的第12页
- # 0代表当前图像为该页提取的第1张图像
- image_prefix = image_path.split('/')[-1]
- logger.info(f'processing img: {image_prefix}')
- page_number = image_prefix.split('_')[-2]
-
- response_item = {
- "qualtified": None, # 是否为证书
- "matched": None, # 是否出现匹配的公司名称
- "license_name": None, # 证书名
- "license_page": page_number, # 证书所在页
- "start_datetime": None, # 有效起始时间
- "end_datetime": None # 有效终止时间
- }
- content = self.get_content(image_path=image_path)
- image_info = content["rawjson"]["ret"]
- # 必须包含公司名称信息
- if not self.search(image_info=image_info, key=firm_name):
- return None
- else:
- response_item['matched'] = True
-
- # 是否匹配营业执照或资质证书
- for key, format in self.re_dict.items():
- if key == 'business_license':
- match_name = self.re_match(image_info=image_info, format=format)
- else:
- match_name = self.re_search(image_info=image_info, format=format)
- if match_name and key == 'business_license':
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_license_datetime(image_info=image_info, response_item=response_item)
- return response_item
- elif match_name:
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item)
- return response_item
- return response_item
- # 资质证书有效期定位
- def find_certificate_datetime(self, image_info, response_item):
- # keyword
- start_keywords = ['颁发日期', '发证日期', '生效日期']
- end_keywords = ['终止日期']
- priority_keywords = ['有效期', '使用期限', '有效日期']
- keywords_list = ['有效期', '使用期限', '有效日期', '终止日期', '颁发日期', '发证日期', '生效日期']
- # re format
- format = r'(?:[自至])?\d{4}年\d{1,2}月\d{1,2}日(?:至)?(?:\d{4}年\d{1,2}月\d{1,2}日)?'
- special_format = r'\d{4}-\d{1,2}-\d{1,2}'
- # 判断是否存在日期关键字
- flag = False
- keyword_dict = {}
- for info in image_info:
- word = info['word']
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- for keyword in keywords_list:
- # 该证书存在日期关键字
- if keyword in word:
- flag = True
- charset_list = info['charset']
- for char_dc in charset_list:
- if char_dc['word'] == keyword[-1]:
- right = char_dc['rect']['left'] + char_dc['rect']['width']
- keyword_dict[keyword] = {
- "left": left,
- "top": top,
- "right": right
- }
-
-
- if flag:
- for info in image_info:
- word = info['word']
- if '年' in word or re.search(r'\d', word):
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- if '年' in word:
- find_list = re.findall(pattern=format, string=word)
- else:
- find_list = re.findall(pattern=special_format, string=word)
- # logger.info(f'word {word} has find_list{find_list}')
- # if self.check:
- # pdb.set_trace()
- if len(find_list) == 1:
- find_string = find_list[0]
- if '至' in find_string:
- start_prefix = find_string.split('至')[0].replace('自', '')
- end_prefix = find_string.split('至')[-1]
- if '年' in start_prefix:
- response_item['start_datetime'] = start_prefix
- if end_prefix != '':
- response_item['end_datetime'] = end_prefix
- return response_item
- # 不存在{至}的情况下通过位置和已有期限关键字来分配日期
- else:
- for k, k_info in keyword_dict.items():
- k_left = k_info['left']
- k_right = k_info['right']
- k_top = k_info['top']
- # 捕获关键字
- if left == k_left:
- if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
- response_item['end_datetime'] = find_string
- elif k in start_keywords and response_item['start_datetime'] is None:
- response_item['start_datetime'] = find_string
- break
- elif left >= k_right and top >= k_top:
- if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
- response_item['end_datetime'] = find_string
- elif k in start_keywords and response_item['start_datetime'] is None:
- response_item['start_datetime'] = find_string
-
- elif len(find_list) == 2:
- start_prefix = find_list[0].replace('自', '')
- end_prefix = find_list[-1].replace('至', '')
- if response_item['start_datetime'] is None:
- response_item['start_datetime'] = start_prefix
- if response_item['end_datetime'] is None:
- response_item['end_datetime'] = end_prefix
-
- else:
- logger.info(f'wrong word: {word} ...')
- else:
- continue
-
- return response_item
-
- # 营业执照有效期定位
- def find_license_datetime(self, image_info, response_item):
-
- for info in image_info:
- word = info['word']
- # id
- if (word.startswith('证照编号:') and len(word) == 25) or (word.isdigit() and len(word) == 20):
- response_item['id'] = word if word.isdigit() else word[5:]
- elif bool(re.match(self.datetime_re, word)):
- split = word.split('至')
- start_datetime = split[0]
- end_datetime = split[-1]
- response_item['start_datetime'] = start_datetime
- response_item['end_datetime'] = end_datetime
- elif word == '长期':
- response_item['start_datetime'] = response_item['end_datetime'] = '长期'
- return response_item
- # 在image_info中搜寻word中包含key的内容
- def search(self, image_info, key):
-
- for info in image_info:
- word = info['word']
- if key in word:
- return True
- return False
-
- # 在image_info中使用re.search搜寻满足{format}正则的信息
- def re_search(self, image_info, format):
- for info in image_info:
- word = info['word']
- match = re.search(format, word)
- if match:
- return match.group(0)
- return False
- # 在image_info中使用re.match搜寻满足{format}正则的信息
- def re_match(self, image_info, format):
- for info in image_info:
- word = info['word']
- match = re.match(format, word)
- if match:
- return word
- return False
- # 用于识别固定位置是否有公司法人签名
- def signature_recognition(self, image_path: str):
- keywords = ['投标函', '(法定代表人CA电子印章)','(法定代表人CA电子印章或签字)', '(签字)', '法定代表人或其委托代理人:', '法定代表人:']
- key_pos = {}
- image_prefix = image_path.split('/')[0]
- image_name = image_path.split('/')[-1][:-4]
- removed_image_name = image_name + '_roi' + image_path.split('/')[-1][-4:]
- ink_image_name = image_name + '_ink' + image_path.split('/')[-1][-4:]
- removed_image_path = os.path.join(image_prefix, removed_image_name)
- ink_image_path = os.path.join(image_prefix, ink_image_name)
- if not os.path.exists(removed_image_path):
- removed_seal_img = self.remove_red_seal(image_path=image_path)
- cv2.imwrite(removed_image_name, removed_seal_img)
- content = self.get_content(image_path=removed_image_path)
- image_info = content["rawjson"]["ret"]
- for info in image_info:
- word = info['word']
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- right = left + width
- bottom = top + height
- for keyword in keywords:
- if keyword in word:
- key_pos[keyword] = {
- "word": word,
- "left": left,
- "right": right,
- "top": top,
- "bottom": bottom
- }
- break
-
- # 如果不存在"投标函"、"法定代表人"等关键字,则返回False
- if len(key_pos) == 0:
- return False
-
- # 定位到法定代表人所在位置
- if ((key_pos.get('法定代表人:') is not None) or (key_pos.get('法定代表人或其委托代理人:') is not None)) and \
- ((key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None)):
- if key_pos.get('法定代表人或其委托代理人:') is not None:
- l_info = key_pos['法定代表人或其委托代理人:']
- l_cnt = 13
- l_string = '法定代表人或其委托代理人:'
- else:
- l_info = key_pos['法定代表人:']
- l_cnt = 6
- l_string = '法定代表人:'
- if key_pos.get('(法定代表人CA电子印章)') is not None:
- r_info = key_pos['(法定代表人CA电子印章)']
- r_string = '(法定代表人CA电子印章)'
- elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
- r_info = key_pos['(法定代表人CA电子印章或签字)']
- r_string = '(法定代表人CA电子印章或签字)'
- else:
- r_info = key_pos['(签字)']
- r_string = '(签字)'
-
- # 此时签名应在两者之间
- l = l_info['right']
- l_word = l_info['word']
- r = r_info['left']
- r_word = r_info['word']
- t = max(l_info['top'], r_info['top'])
- b = min(l_info['bottom'], r_info['bottom']) - 5
- if l_word[-l_cnt:] != l_string or r_word != r_string:
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=removed_seal_img,
- out_path=ink_image_path,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- elif (key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None):
- # 此时签名应已包含
- if key_pos.get('(法定代表人CA电子印章)') is not None:
- key = key_pos['(法定代表人CA电子印章)']
- elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
- key = key_pos['(法定代表人CA电子印章或签字)']
- elif key_pos.get('(签字)') is not None:
- key = key_pos['(签字)']
- key_word = key['word']
- key_word = key_word.replace('(法定代表人CA电子印章)','').replace('(法定代表人CA电子印章或签字)', '').replace('(签字)','').replace('法定代表人或其委托代理人:', '').replace('法定代表人:', '')
- if key_word != '':
- return True
- return False
- elif key_pos.get('法定代表人:') is not None:
- # 此时签名在右边或已包含
- word = key_pos['法定代表人:']['word']
- l = key_pos['法定代表人:']['left']
- r = l + 100
- t = key_pos['法定代表人:']['top']
- b = key_pos['法定代表人:']['bottom'] - 5
- if word[-6:] != '法定代表人:':
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=removed_seal_img,
- out_path=ink_image_path,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- elif key_pos.get('法定代表人或其委托代理人:') is not None:
- # 此时签名在右边或已包含
- word = key_pos['法定代表人或其委托代理人:']['word']
- l = key_pos['法定代表人或其委托代理人:']['left']
- r = l + 100
- t = key_pos['法定代表人或其委托代理人:']['top']
- b = key_pos['法定代表人或其委托代理人:']['bottom'] - 5
- if word[-13:] != '法定代表人或其委托代理人:':
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=removed_seal_img,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- else:
- return False
-
- # 用于判断固定位置的长方形框内是否存在签名字迹
-
- # 用于识别图像固定位置黑色字迹所占比例,并将该位置的图像截取保存
- def ink_recognition(self, input_img, out_path, meta: dict):
- left = meta["left"]
- right = meta["right"]
- top = meta["top"]
- bottom = meta["bottom"]
- crop_img = input_img[top:bottom, left:right, :]
- cv2.imwrite(out_path, crop_img)
-
- gray_img = cv2.cvtColor(crop_img, cv2.COLOR_BGR2GRAY)
- thresh, ret = cv2.threshold(gray_img, 0, 255, cv2.THRESH_OTSU)
- filter_condition = int(thresh * 0.90)
- _, black_thresh = cv2.threshold(gray_img, filter_condition, 255, cv2.THRESH_BINARY_INV)
-
- total_pixels = black_thresh.size
- black_pixels = np.count_nonzero(black_thresh)
- black_ratio = black_pixels / total_pixels
- return black_ratio
- # 提供pdf解析,并基于提取文本信息进行位置匹配
- class PdfMatcher(PdfExtractAttr):
- # file_path为提供的pdf文件路径
- def __init__(self, file_path: str):
- super(PdfMatcher, self).__init__(
- file_path=file_path
- )
- # 投标书名称
- self.bid_name = file_path.split('/')[-1][:-4]
- # 投标书数据文件夹
- self.bid_dir = os.path.join(os.path.dirname(file_path), self.bid_name)
- # 公司名称
- self.firm_name = file_path.split('/')[-2]
- # title list
- title_path = os.path.join(self.bid_dir, "title.json")
- # image list
- self.image_dir = os.path.join(self.bid_dir, "extracted_images")
- if (not os.path.exists(title_path)) or (not os.path.exists(self.image_dir)):
- os.makedirs(self.image_dir, exist_ok=True)
- self.main_parse(pdf_path=file_path, title_path=title_path, image_dir=self.image_dir)
- self.title = load_json(title_path)
- # outline list
- outline_path = os.path.join(self.bid_dir, "outlines.json")
- self.outline = self.parse_outline(out_path=outline_path)
- # text list
- text_path = os.path.join(self.bid_dir, "all_texts.json")
- self.details = self.parse_text(out_path=text_path)
- # table list
- table_path = os.path.join(self.bid_dir, "all_tables.json")
- if os.path.exists(table_path):
- self.table = load_json(table_path)
- else:
- self.tables = self.parse_table(out_path=table_path)
- # image format
- self.image_format = "image_page_{}*"
- # image filter threshold
- self.start_threshold = 10
- self.distance_threshold = 6
- self.search_threshold = 20
- # 用于定位营业执照、资质证书的页面范围
- def search_interval(self):
- '''定位营业执照、资质证书的区间范围'''
- # 通过关键字模糊定位
- keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','其他资料','附件', '影印件']
- search_interval = []
- # locate in title.json
- left_pos = -1 # 左指针
- right_pos = -1 # 右指针
- for title_block in self.title:
- block_text = title_block['text'].replace(' ', '').strip()
-
- # 先进行左区间判定
- if left_pos != -1 and '证书' not in block_text:
- right_pos = title_block['page_number']
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- for keyword in keywords:
- if keyword in block_text:
- # 先进行模糊的outline定位
- center_page = None
- if '.' in block_text:
- center_page = block_text.split('.')[-1]
- if center_page.isdigit():
- center_page = eval(center_page)
- left_pos = min(title_block['page_number'], center_page)
- else:
- left_pos = title_block['page_number']
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- right_pos = -1
-
- # locate in outlines.json
- if len(self.outline) > 0:
- for outline_block in self.outline:
- if left_pos != -1:
- right_pos = outline_block["page_number"]
- right_pos = right_pos if right_pos is not None else -1
- search_interval.append((left_pos, right_pos))
- left_pos = -1
- outline_text = outline_block['title'].strip()
- for keyword in keywords:
- if keyword in outline_text:
- if outline_block["page_number"] is not None:
- left_pos = outline_block["page_number"]
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- # 搜寻区间合并
- search_interval.sort()
- merge_interval = []
- if len(search_interval) > 0:
- left = -1
- right = -1
- for interval in search_interval:
- l, r = interval
- if r < l:
- continue
- if left == -1 and right == -1:
- left = l
- right = r
- elif l <= right:
- right = r
- else:
- merge_interval.append((left, right))
- left = l
- right = r
- merge_interval.append((left, right))
- return merge_interval
- # 返回可能为营业执照或资质证书的图像集
- def find_candidate_images(self):
- candidate_images = set()
- merge_intervals = self.search_interval()
-
- for interval in merge_intervals:
- start_page, end_page = interval
- if start_page <= self.start_threshold:
- continue
-
- if end_page == -1:
- end_page = start_page + 20
- candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images)
-
- candidate_images = list(candidate_images)
- return candidate_images
- # 使用正则查询符合格式的图像
- def image_regularization(self, start_page: int, end_page:int, candidate_images: set):
- for index in range(start_page, end_page + 1):
- current_format = self.image_format.format(index)
- files = glob.glob(os.path.join(self.image_dir, current_format))
- filter_files = [file for file in files if not file.endswith('.unk')]
- candidate_images.update(filter_files)
- return candidate_images
- class PdfParse_pipeline():
- def __init__(self,
- ocr, # ocr接口
- firm_dir, # 存储所有公司的路径
- out_path # 输出地址
- ):
- self.ocr = ocr
- self.firm_dir = firm_dir
- self.out_path = out_path
- def parse_pipeline(self):
- data = {}
- for firm_name in tqdm(os.listdir(self.firm_dir)):
- logger.info(f'processing firm {firm_name} ...')
- firm_path = os.path.join(self.firm_dir, firm_name)
- for bid_name in tqdm(os.listdir(firm_path)):
- if bid_name.endswith('.pdf'):
- document=os.path.join(firm_path, bid_name)
- bid_dir = os.path.join(firm_path, bid_name[:-4])
- os.makedirs(bid_dir, exist_ok=True)
- document_data = self.parse_single_document(pdf_path=document)
- data[firm_name] = document_data
- # 以下将data的数据存入out_path
- with open(self.out_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
-
- return data
- def parse_single_document(self, pdf_path: str):
- agent = PdfMatcher(file_path=pdf_path)
- firm_name = agent.firm_name
- data = {
- "license_list":[]
- }
- candidate_images = agent.find_candidate_images()
- if len(candidate_images) == 0:
- pass
- else:
- for img in candidate_images:
- try:
- response = ocr.judge(image_path=img, firm_name=firm_name)
- if response == None or response['qualtified'] == None:
- continue
- else:
- data["license_list"].append({
- "license_name": response["license_name"],
- "license_path": img,
- "license_page": response["license_page"],
- "start_datetime": response["start_datetime"],
- "end_datetime": response["end_datetime"]
- })
- except ValueError as e:
- print(e)
- return data
-
- if __name__ == "__main__":
- # [测试demo]
- start_time = time.time()
- # 请针对自己的环境进行修改log_path
- global logger
- log_path = "/home/stf/miner_pdf/interface/test_logs/info.log"
- logger = create_logger(log_path=log_path)
- # [环境参数]
- # ocr url
- url = "http://120.48.103.13:18000/ctr_ocr"
- # seal_ocr url
- base_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/seal?access_token="
- # seal_ocr access_token
- access_token = "24.6bbe9987c6bd19ba65e4402917811657.2592000.1724573148.282335-86574608"
- # seal_ocr headers
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- # data_path为存储所有投标公司的起始路径
- data_path = "/home/stf/miner_pdf/data/投标公司pdf"
- # test_data_path为存储测试投标公司的起始路径
- test_data_path = "/home/stf/miner_pdf/interface/test_files"
- # pipeline_out_path为执行所有公司pipeline逻辑后的输出位置
- # 其为存放营业执照和资质证书位置信息的json文件
- pipeline_out_path = "/home/stf/miner_pdf/interface/outdir/test_pipeline.json"
- # single_out_path为执行单个公司pdf解析逻辑后的输出位置
- # 其为存放营业执照和资质证书位置信息的json文件
- single_out_path = "/home/stf/miner_pdf/interface/outdir/test_single.json"
- # ground_truth目前为存储所有非扫描公司在pdf中营业执照与资质证书的json文件
- ground_truth = "/home/stf/miner_pdf/ground_truth.json"
- # 用于区分该公司提供的pdf文件为(扫描件 or 非扫描件)
- firm_excel_file = "/home/stf/miner_pdf/data/certificate.xlsx"
- df = pd.read_excel(firm_excel_file)
- # 封装好的ocr接口
- ocr = OcrAgent(url=url)
- # 封装好的pipeline
- pipeline = PdfParse_pipeline(
- ocr=ocr,
- firm_dir=test_data_path,
- out_path=single_out_path
- )
- # start
- data = pipeline.parse_pipeline()
- # caculate time cost
- cost_time = time.time() - start_time
- logger.info(f"total cost {cost_time // 60} min {cost_time % 60} sec ...")
|