|
|
@@ -0,0 +1,1931 @@
|
|
|
+# 在pdf_miner的基础上进行优化
|
|
|
+
|
|
|
+# 标准包导入
|
|
|
+import os
|
|
|
+import re
|
|
|
+import json
|
|
|
+import re
|
|
|
+import shutil
|
|
|
+import pandas as pd
|
|
|
+import pdb
|
|
|
+import base64
|
|
|
+from io import BytesIO
|
|
|
+from pprint import pprint
|
|
|
+from paddleocr import PPStructure, draw_structure_result, save_structure_res
|
|
|
+from pypdf import PdfReader
|
|
|
+from pdf2image import convert_from_path
|
|
|
+
|
|
|
+# 第三方包导入
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+import cv2
|
|
|
+import torch
|
|
|
+import glob
|
|
|
+import logging
|
|
|
+import requests
|
|
|
+import time
|
|
|
+import datetime
|
|
|
+import subprocess
|
|
|
+from tqdm import tqdm
|
|
|
+from tooklit import RefPageNumberResolver
|
|
|
+from get_info import PdfExtractAttr
|
|
|
+from get_info import is_title, export_image, _save_jpeg, _save_jpeg2000, _save_bmp, main_parse, table_parse, load_json
|
|
|
+from PIL import Image
|
|
|
+from pdfminer.image import ImageWriter
|
|
|
+from tooklit import remove_red_seal, remove_blue_seal
|
|
|
+
|
|
|
+
|
|
|
+# tools function
|
|
|
+def create_logger(log_path):
|
|
|
+ """
|
|
|
+ 将日志输出到日志文件和控制台
|
|
|
+ """
|
|
|
+ logger = logging.getLogger()
|
|
|
+ logger.setLevel(logging.INFO)
|
|
|
+
|
|
|
+ formatter = logging.Formatter(
|
|
|
+ '%(asctime)s - %(levelname)s - %(message)s')
|
|
|
+
|
|
|
+ # 创建一个handler,用于写入日志文件
|
|
|
+ file_handler = logging.FileHandler(
|
|
|
+ filename=log_path, mode='w')
|
|
|
+ file_handler.setFormatter(formatter)
|
|
|
+ file_handler.setLevel(logging.INFO)
|
|
|
+ logger.addHandler(file_handler)
|
|
|
+
|
|
|
+ # 创建一个handler,用于将日志输出到控制台
|
|
|
+ console = logging.StreamHandler()
|
|
|
+ console.setLevel(logging.DEBUG)
|
|
|
+ console.setFormatter(formatter)
|
|
|
+ logger.addHandler(console)
|
|
|
+
|
|
|
+ return logger
|
|
|
+
|
|
|
+
|
|
|
+# 页面信息缓存
|
|
|
+class PageBuffer():
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.page_cache = {}
|
|
|
+
|
|
|
+ # 查询某一页的信息属性
|
|
|
+ def query(self, page):
|
|
|
+ if self.page_cache.get(page, -1) == -1:
|
|
|
+ return None
|
|
|
+ page_info = self.page_cache[page]
|
|
|
+ return page_info
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class SealAgent():
|
|
|
+ def __init__(self, url, headers):
|
|
|
+ self.url = url
|
|
|
+ self.headers = headers
|
|
|
+
|
|
|
+ def get_content(self, image_path):
|
|
|
+ f = open(image_path, 'rb')
|
|
|
+ img = base64.b64encode(f.read())
|
|
|
+ params = {"image":img}
|
|
|
+ try:
|
|
|
+ response = requests.post(url=self.url, data=params, headers=self.headers)
|
|
|
+ return response.json()
|
|
|
+ except:
|
|
|
+ logger.info(f"当前图像:{image_path}在印章识别ocr接口中网络不稳定 ...")
|
|
|
+
|
|
|
+
|
|
|
+ def seal_parse(self, image_path):
|
|
|
+ meta = {
|
|
|
+ "firm_seals": [],
|
|
|
+ "indiv_seals": []
|
|
|
+ }
|
|
|
+ content = self.get_content(image_path=image_path)
|
|
|
+ seal_num = content["result_num"]
|
|
|
+ seal_result = content["result"]
|
|
|
+ if seal_num == 0:
|
|
|
+ return meta
|
|
|
+ for seal_info in seal_result:
|
|
|
+ seal_type = seal_info["type"]
|
|
|
+ seal_content = seal_info["major"]["words"].strip().replace(' ', '')
|
|
|
+ top = seal_info["location"]["top"]
|
|
|
+ left = seal_info["location"]["left"]
|
|
|
+ width = seal_info["location"]["width"]
|
|
|
+ height = seal_info["location"]["height"]
|
|
|
+ if '公司' in seal_content:
|
|
|
+ meta['firm_seals'].append(
|
|
|
+ {
|
|
|
+ "seal_type": seal_type,
|
|
|
+ "firm_name": seal_content
|
|
|
+ }
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ meta['indiv_seals'].append({
|
|
|
+ "seal_type": seal_type,
|
|
|
+ "indiv_name": seal_content
|
|
|
+ })
|
|
|
+ return meta
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# ocr外部接口
|
|
|
+class OcrAgent():
|
|
|
+ def __init__(self, url):
|
|
|
+ self.url = url
|
|
|
+ self.datetime_re = r'\d{4}年\d{1,2}月\d{1,2}日至(?:\d{4}年\d{1,2}月\d{1,2}日|长期)'
|
|
|
+ # 不同类型证书资质正则
|
|
|
+ self.re_dict = {
|
|
|
+ "business_license" : r'营业执照',
|
|
|
+ "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$',
|
|
|
+ "production_license": r'\b[\u4e00-\u9fff]*许可证\b',
|
|
|
+ "qualtifications" : r'\b[\u4e00-\u9fff]*证书',
|
|
|
+ "proof": r'\b[\u4e00-\u9fff]*证明',
|
|
|
+ }
|
|
|
+ # 字迹阈值
|
|
|
+ self.sign_threshold = 0.05
|
|
|
+ self.font_threshold = 39
|
|
|
+
|
|
|
+ # 集成印章ocr
|
|
|
+ def integrate_sealagent(self, url, headers):
|
|
|
+ self.sealagent = SealAgent(url=url, headers=headers)
|
|
|
+
|
|
|
+ # 获取图像的ocr信息
|
|
|
+ def get_content(self, image_path):
|
|
|
+ try:
|
|
|
+ with open(image_path, 'rb') as image_file:
|
|
|
+ files = {"file": ("image.jpg", image_file, "image/jpeg")}
|
|
|
+ response = requests.post(self.url, files=files)
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+ except:
|
|
|
+ raise ValueError(f"传入图像{image_path}已损坏")
|
|
|
+
|
|
|
+ def judge_pro(self, image_path: str, firm_name: str):
|
|
|
+ # 以下实现要求image_path的路径如下例所示:
|
|
|
+ # ./test/page-0.jpg
|
|
|
+ image_name = image_path.split('/')[-1]
|
|
|
+ logger.info(f'processing img: {image_name}')
|
|
|
+ page_number = image_name.split('-')[-1].split('.')[0]
|
|
|
+ response_item = {
|
|
|
+ "qualtified": None, # 是否为证书
|
|
|
+ "matched": None, # 是否出现匹配的公司名称
|
|
|
+ "license_name": None, # 证书名
|
|
|
+ "license_page": page_number, # 证书所在页
|
|
|
+ "start_datetime": None, # 有效起始时间
|
|
|
+ "end_datetime": None # 有效终止时间
|
|
|
+ }
|
|
|
+
|
|
|
+ content = self.get_content(image_path=image_path)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+
|
|
|
+ # 必须包含公司名称信息
|
|
|
+ if not self.search(image_info=image_info, key_list=[firm_name]):
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ response_item['matched'] = True
|
|
|
+
|
|
|
+ # 是否匹配营业执照或资质证书
|
|
|
+ for key, format in self.re_dict.items():
|
|
|
+ if key == 'business_license':
|
|
|
+ match_name = self.re_match(image_info=image_info, format=format)
|
|
|
+ else:
|
|
|
+ match_name = self.re_search(image_info=image_info, format=format)
|
|
|
+ if match_name and key == 'business_license':
|
|
|
+ response_item["qualtified"] = True
|
|
|
+ response_item["license_name"] = match_name
|
|
|
+ response_item = self.find_license_datetime(image_info=image_info, response_item=response_item)
|
|
|
+ return response_item
|
|
|
+ elif match_name:
|
|
|
+ response_item["qualtified"] = True
|
|
|
+ response_item["license_name"] = match_name
|
|
|
+ response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item)
|
|
|
+ return response_item
|
|
|
+ return response_item
|
|
|
+
|
|
|
+ # 判断图像是否为某公司的营业执照或资质证书信息,并返回提取到的信息
|
|
|
+ def judge(self, image_path: str, firm_name: str):
|
|
|
+ # 以下实现要求image_path的路径如下例所示:
|
|
|
+ # ./test/image_page_12_0.jpg
|
|
|
+ # 12代表当前图像在pdf中的第12页
|
|
|
+ # 0代表当前图像为该页提取的第1张图像
|
|
|
+ image_prefix = image_path.split('/')[-1]
|
|
|
+ logger.info(f'processing img: {image_prefix}')
|
|
|
+ page_number = image_prefix.split('_')[-2]
|
|
|
+ response_item = {
|
|
|
+ "qualtified": None, # 是否为证书
|
|
|
+ "matched": None, # 是否出现匹配的公司名称
|
|
|
+ "license_name": None, # 证书名
|
|
|
+ "license_page": page_number, # 证书所在页
|
|
|
+ "start_datetime": None, # 有效起始时间
|
|
|
+ "end_datetime": None # 有效终止时间
|
|
|
+ }
|
|
|
+
|
|
|
+ content = self.get_content(image_path=image_path)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+
|
|
|
+ # 必须包含公司名称信息
|
|
|
+ if not self.search(image_info=image_info, key=firm_name):
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ response_item['matched'] = True
|
|
|
+
|
|
|
+ # 是否匹配营业执照或资质证书
|
|
|
+ for key, format in self.re_dict.items():
|
|
|
+ if key == 'business_license':
|
|
|
+ match_name = self.re_match(image_info=image_info, format=format)
|
|
|
+ else:
|
|
|
+ match_name = self.re_search(image_info=image_info, format=format)
|
|
|
+ if match_name and key == 'business_license':
|
|
|
+ response_item["qualtified"] = True
|
|
|
+ response_item["license_name"] = match_name
|
|
|
+ response_item = self.find_license_datetime(image_info=image_info, response_item=response_item)
|
|
|
+ return response_item
|
|
|
+ elif match_name:
|
|
|
+ response_item["qualtified"] = True
|
|
|
+ response_item["license_name"] = match_name
|
|
|
+ response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item)
|
|
|
+ return response_item
|
|
|
+ return response_item
|
|
|
+
|
|
|
+ # 资质证书有效期定位
|
|
|
+ def find_certificate_datetime(self, image_info, response_item):
|
|
|
+ # keyword
|
|
|
+ start_keywords = ['颁发日期', '发证日期', '生效日期']
|
|
|
+ end_keywords = ['终止日期']
|
|
|
+ priority_keywords = ['有效期', '使用期限', '有效日期']
|
|
|
+ keywords_list = ['有效期', '使用期限', '有效日期', '终止日期', '颁发日期', '发证日期', '生效日期']
|
|
|
+ # re format
|
|
|
+ format = r'(?:[自至])?\d{4}年\d{1,2}月\d{1,2}日(?:至)?(?:\d{4}年\d{1,2}月\d{1,2}日)?'
|
|
|
+ special_format = r'\d{4}-\d{1,2}-\d{1,2}'
|
|
|
+
|
|
|
+ # 判断是否存在日期关键字
|
|
|
+ flag = False
|
|
|
+ keyword_dict = {}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word']
|
|
|
+ left = info['rect']['left']
|
|
|
+ top = info['rect']['top']
|
|
|
+ width = info['rect']['width']
|
|
|
+ height = info['rect']['height']
|
|
|
+ for keyword in keywords_list:
|
|
|
+ # 该证书存在日期关键字
|
|
|
+ if keyword in word:
|
|
|
+ flag = True
|
|
|
+ charset_list = info['charset']
|
|
|
+ for char_dc in charset_list:
|
|
|
+ if char_dc['word'] == keyword[-1]:
|
|
|
+ right = char_dc['rect']['left'] + char_dc['rect']['width']
|
|
|
+ keyword_dict[keyword] = {
|
|
|
+ "left": left,
|
|
|
+ "top": top,
|
|
|
+ "right": right
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if flag:
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word']
|
|
|
+ if '年' in word or re.search(r'\d', word):
|
|
|
+ left = info['rect']['left']
|
|
|
+ top = info['rect']['top']
|
|
|
+ width = info['rect']['width']
|
|
|
+ if '年' in word:
|
|
|
+ find_list = re.findall(pattern=format, string=word)
|
|
|
+ else:
|
|
|
+ find_list = re.findall(pattern=special_format, string=word)
|
|
|
+ # logger.info(f'word {word} has find_list{find_list}')
|
|
|
+ # if self.check:
|
|
|
+ # pdb.set_trace()
|
|
|
+ if len(find_list) == 1:
|
|
|
+ find_string = find_list[0]
|
|
|
+ if '至' in find_string:
|
|
|
+ start_prefix = find_string.split('至')[0].replace('自', '')
|
|
|
+ end_prefix = find_string.split('至')[-1]
|
|
|
+ if '年' in start_prefix:
|
|
|
+ response_item['start_datetime'] = start_prefix
|
|
|
+ if end_prefix != '':
|
|
|
+ response_item['end_datetime'] = end_prefix
|
|
|
+ return response_item
|
|
|
+ # 不存在{至}的情况下通过位置和已有期限关键字来分配日期
|
|
|
+ else:
|
|
|
+ for k, k_info in keyword_dict.items():
|
|
|
+ k_left = k_info['left']
|
|
|
+ k_right = k_info['right']
|
|
|
+ k_top = k_info['top']
|
|
|
+ # 捕获关键字
|
|
|
+ if left == k_left:
|
|
|
+ if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
|
|
|
+ response_item['end_datetime'] = find_string
|
|
|
+ elif k in start_keywords and response_item['start_datetime'] is None:
|
|
|
+ response_item['start_datetime'] = find_string
|
|
|
+ break
|
|
|
+ elif left >= k_right and top >= k_top:
|
|
|
+ if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
|
|
|
+ response_item['end_datetime'] = find_string
|
|
|
+ elif k in start_keywords and response_item['start_datetime'] is None:
|
|
|
+ response_item['start_datetime'] = find_string
|
|
|
+
|
|
|
+ elif len(find_list) == 2:
|
|
|
+ start_prefix = find_list[0].replace('自', '')
|
|
|
+ end_prefix = find_list[-1].replace('至', '')
|
|
|
+ if response_item['start_datetime'] is None:
|
|
|
+ response_item['start_datetime'] = start_prefix
|
|
|
+ if response_item['end_datetime'] is None:
|
|
|
+ response_item['end_datetime'] = end_prefix
|
|
|
+
|
|
|
+ else:
|
|
|
+ logger.info(f'wrong word: {word} ...')
|
|
|
+
|
|
|
+
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+
|
|
|
+ return response_item
|
|
|
+
|
|
|
+ # 营业执照有效期定位
|
|
|
+ def find_license_datetime(self, image_info, response_item):
|
|
|
+
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word']
|
|
|
+ # id
|
|
|
+ if (word.startswith('证照编号:') and len(word) == 25) or (word.isdigit() and len(word) == 20):
|
|
|
+ response_item['id'] = word if word.isdigit() else word[5:]
|
|
|
+ elif bool(re.match(self.datetime_re, word)):
|
|
|
+ split = word.split('至')
|
|
|
+ start_datetime = split[0]
|
|
|
+ end_datetime = split[-1]
|
|
|
+ response_item['start_datetime'] = start_datetime
|
|
|
+ response_item['end_datetime'] = end_datetime
|
|
|
+ elif word == '长期':
|
|
|
+ response_item['start_datetime'] = response_item['end_datetime'] = '长期'
|
|
|
+
|
|
|
+ return response_item
|
|
|
+
|
|
|
+ # 在目录中找到正文pos右侧对应的数字标签
|
|
|
+ def digit_label(self, image_info, pos: dict):
|
|
|
+
|
|
|
+ gold_left = pos['left']
|
|
|
+ gold_right = pos['right']
|
|
|
+ gold_top = pos['top']
|
|
|
+ gold_bottom = pos['bottom']
|
|
|
+
|
|
|
+ # 判断字符串中是否包含数字
|
|
|
+ def contain_digit(word):
|
|
|
+ for c in word:
|
|
|
+ if c.isdigit():
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ mini_distance = 10000
|
|
|
+ mini_word = ""
|
|
|
+
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word']
|
|
|
+ left = info['rect']['left']
|
|
|
+ top = info['rect']['top']
|
|
|
+ width = info['rect']['width']
|
|
|
+ height = info['rect']['height']
|
|
|
+ right = left + width
|
|
|
+ bottom = top + height
|
|
|
+ if contain_digit(word=word) and left >= gold_left:
|
|
|
+ distance = abs(top - gold_top)
|
|
|
+ if distance < mini_distance:
|
|
|
+ mini_distance = distance
|
|
|
+ mini_word = word
|
|
|
+
|
|
|
+ # 提取最终的mini_word
|
|
|
+ label_page = None
|
|
|
+ if '.' in mini_word:
|
|
|
+ label_page = mini_word.split('.')[-1]
|
|
|
+ elif mini_word.isdigit():
|
|
|
+ label_page = mini_word
|
|
|
+
|
|
|
+ return label_page
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # 在image_info中搜寻word中包含key_list的内容,并打包信息返回
|
|
|
+ def pack_search(self, image_info, key_list):
|
|
|
+ meta = []
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word'].strip().replace(' ', '')
|
|
|
+ left = info['rect']['left']
|
|
|
+ top = info['rect']['top']
|
|
|
+ width = info['rect']['width']
|
|
|
+ height = info['rect']['height']
|
|
|
+ right = left + width
|
|
|
+ bottom = top + height
|
|
|
+ for key in key_list:
|
|
|
+ if key in word:
|
|
|
+ meta.append({
|
|
|
+ "word": word,
|
|
|
+ "contain_key": key,
|
|
|
+ "bbox": {
|
|
|
+ "left": left,
|
|
|
+ "right": right,
|
|
|
+ "top": top,
|
|
|
+ "bottom": bottom,
|
|
|
+ "width": width,
|
|
|
+ "height": height
|
|
|
+ }
|
|
|
+ })
|
|
|
+ return meta
|
|
|
+
|
|
|
+ # 在image_info中搜寻word中包含key_list的内容
|
|
|
+ def search(self, image_info, key_list):
|
|
|
+
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word'].strip().replace(' ', '')
|
|
|
+ for key in key_list:
|
|
|
+ if key in word:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 精确匹配key_list中的内容
|
|
|
+ def exact_search(self, image_info, key_list):
|
|
|
+
|
|
|
+ meta = []
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word'].strip().replace(' ', '')
|
|
|
+ for key in key_list:
|
|
|
+ if key == word:
|
|
|
+ height = info['rect']['height']
|
|
|
+ meta.append({
|
|
|
+ "keyword": word,
|
|
|
+ "font_size": height
|
|
|
+ })
|
|
|
+ return meta
|
|
|
+
|
|
|
+ # 在image_info中使用re.search搜寻满足{format}正则的信息
|
|
|
+ def re_search(self, image_info, format):
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word']
|
|
|
+ match = re.search(format, word)
|
|
|
+ if match:
|
|
|
+ return match.group(0)
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 在image_info中使用re.match搜寻满足{format}正则的信息
|
|
|
+ def re_match(self, image_info, format):
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word']
|
|
|
+ match = re.match(format, word)
|
|
|
+ if match:
|
|
|
+ return word
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 用于识别固定位置是否有公司法人签名或公司盖章
|
|
|
+ def signature_recognition(self, image_path: str):
|
|
|
+
|
|
|
+ # 先调用接口判断公司盖章
|
|
|
+ meta = self.sealagent.seal_parse(image_path=image_path)
|
|
|
+ if len(meta["firm_seals"]) > 0 or len(meta["indiv_seals"]) > 0:
|
|
|
+ logger.info("检测到当前页面具备印章 ...")
|
|
|
+ return True
|
|
|
+ keywords = ['投标函', '(法定代表人CA电子印章)','(法定代表人CA电子印章或签字)', '(签字)', '法定代表人或其委托代理人:', '法定代表人:']
|
|
|
+ key_pos = {}
|
|
|
+ image_prefix = os.path.dirname(image_path)
|
|
|
+ image_name = image_path.split('/')[-1][:-4]
|
|
|
+ removed_red_image_name = image_name + '_red_roi' + image_path.split('/')[-1][-4:]
|
|
|
+ removed_blue_image_name = image_name + '_blue_roi' + image_path.split('/')[-1][-4:]
|
|
|
+ red_ink_image_name = image_name + '_red_ink' + image_path.split('/')[-1][-4:]
|
|
|
+ blue_ink_image_name = image_name + '_blue_ink' + image_path.split('/')[-1][-4:]
|
|
|
+ removed_red_image_path = os.path.join(image_prefix, removed_red_image_name)
|
|
|
+ removed_blue_image_path = os.path.join(image_prefix, removed_blue_image_name)
|
|
|
+ red_ink_image_path = os.path.join(image_prefix, red_ink_image_name)
|
|
|
+ blue_ink_image_path = os.path.join(image_prefix, blue_ink_image_name)
|
|
|
+ if not os.path.exists(removed_red_image_path):
|
|
|
+ removed_red_seal_img = remove_red_seal(image_path=image_path)
|
|
|
+ cv2.imwrite(removed_red_image_path, removed_red_seal_img)
|
|
|
+ else:
|
|
|
+ removed_red_seal_img = cv2.imread(removed_red_image_path)
|
|
|
+
|
|
|
+ if not os.path.exists(removed_blue_image_path):
|
|
|
+ removed_blue_seal_img = remove_blue_seal(image_path=image_path)
|
|
|
+ cv2.imwrite(removed_blue_image_path, removed_blue_seal_img)
|
|
|
+ else:
|
|
|
+ removed_blue_seal_img = cv2.imread(removed_blue_image_path)
|
|
|
+
|
|
|
+ red_content = self.get_content(image_path=removed_red_image_path)
|
|
|
+ red_image_info = red_content["rawjson"]["ret"]
|
|
|
+ blue_content = self.get_content(image_path=removed_blue_image_path)
|
|
|
+ blue_image_info = blue_content["rawjson"]["ret"]
|
|
|
+
|
|
|
+ def identify(image_info, input_img, out_path):
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word'].replace(' ', '')
|
|
|
+ left = info['rect']['left']
|
|
|
+ top = info['rect']['top']
|
|
|
+ width = info['rect']['width']
|
|
|
+ height = info['rect']['height']
|
|
|
+ right = left + width
|
|
|
+ bottom = top + height
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in word:
|
|
|
+ key_pos[keyword] = {
|
|
|
+ "word": word,
|
|
|
+ "left": left,
|
|
|
+ "right": right,
|
|
|
+ "top": top,
|
|
|
+ "bottom": bottom
|
|
|
+ }
|
|
|
+
|
|
|
+ break
|
|
|
+
|
|
|
+ # 如果不存在"投标函"、"法定代表人"等关键字,则返回False
|
|
|
+ if len(key_pos) == 0:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 定位到法定代表人所在位置
|
|
|
+ # import pdb; pdb.set_trace()
|
|
|
+ if ((key_pos.get('法定代表人:') is not None) or (key_pos.get('法定代表人或其委托代理人:') is not None)) and \
|
|
|
+ ((key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None)):
|
|
|
+ if key_pos.get('法定代表人或其委托代理人:') is not None:
|
|
|
+ l_info = key_pos['法定代表人或其委托代理人:']
|
|
|
+ l_cnt = 13
|
|
|
+ l_string = '法定代表人或其委托代理人:'
|
|
|
+ else:
|
|
|
+ l_info = key_pos['法定代表人:']
|
|
|
+ l_cnt = 6
|
|
|
+ l_string = '法定代表人:'
|
|
|
+
|
|
|
+ if key_pos.get('(法定代表人CA电子印章)') is not None:
|
|
|
+ r_info = key_pos['(法定代表人CA电子印章)']
|
|
|
+ r_string = '(法定代表人CA电子印章)'
|
|
|
+ elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
|
|
|
+ r_info = key_pos['(法定代表人CA电子印章或签字)']
|
|
|
+ r_string = '(法定代表人CA电子印章或签字)'
|
|
|
+ else:
|
|
|
+ r_info = key_pos['(签字)']
|
|
|
+ r_string = '(签字)'
|
|
|
+
|
|
|
+ # 此时签名应在两者之间
|
|
|
+ l = l_info['right']
|
|
|
+ l_word = l_info['word']
|
|
|
+ r = r_info['left']
|
|
|
+ r_word = r_info['word']
|
|
|
+ t = max(l_info['top'], r_info['top'])
|
|
|
+ b = min(l_info['bottom'], r_info['bottom']) - 5
|
|
|
+ if l_word[-l_cnt:] != l_string or r_word != r_string:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+
|
|
|
+ black_ratio = self.ink_recognition(
|
|
|
+ input_img=input_img,
|
|
|
+ out_path=out_path,
|
|
|
+ meta={
|
|
|
+ "left": l,
|
|
|
+ "right": r,
|
|
|
+ "top": t,
|
|
|
+ "bottom": b
|
|
|
+ }
|
|
|
+ )
|
|
|
+ if black_ratio >= self.sign_threshold:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ elif (key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None):
|
|
|
+ # 此时签名应已包含
|
|
|
+ if key_pos.get('(法定代表人CA电子印章)') is not None:
|
|
|
+ key = key_pos['(法定代表人CA电子印章)']
|
|
|
+ elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
|
|
|
+ key = key_pos['(法定代表人CA电子印章或签字)']
|
|
|
+ elif key_pos.get('(签字)') is not None:
|
|
|
+ key = key_pos['(签字)']
|
|
|
+
|
|
|
+ key_word = key['word']
|
|
|
+ key_word = key_word.replace('(法定代表人CA电子印章)','').replace('(法定代表人CA电子印章或签字)', '').replace('(签字)','').replace('法定代表人或其委托代理人:', '').replace('法定代表人:', '')
|
|
|
+ if key_word != '':
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ elif key_pos.get('法定代表人:') is not None:
|
|
|
+ # 此时签名在右边或已包含
|
|
|
+ word = key_pos['法定代表人:']['word']
|
|
|
+ l = key_pos['法定代表人:']['left']
|
|
|
+ r = l + 100
|
|
|
+ t = key_pos['法定代表人:']['top']
|
|
|
+ b = key_pos['法定代表人:']['bottom'] - 5
|
|
|
+
|
|
|
+ if word[-6:] != '法定代表人:':
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ black_ratio = self.ink_recognition(
|
|
|
+ input_img=input_img,
|
|
|
+ out_path=out_path,
|
|
|
+ meta={
|
|
|
+ "left": l,
|
|
|
+ "right": r,
|
|
|
+ "top": t,
|
|
|
+ "bottom": b
|
|
|
+ }
|
|
|
+ )
|
|
|
+ if black_ratio >= self.sign_threshold:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ elif key_pos.get('法定代表人或其委托代理人:') is not None:
|
|
|
+ # 此时签名在右边或已包含
|
|
|
+ word = key_pos['法定代表人或其委托代理人:']['word']
|
|
|
+ l = key_pos['法定代表人或其委托代理人:']['left']
|
|
|
+ r = l + 100
|
|
|
+ t = key_pos['法定代表人或其委托代理人:']['top']
|
|
|
+ b = key_pos['法定代表人或其委托代理人:']['bottom'] - 5
|
|
|
+
|
|
|
+ if word[-13:] != '法定代表人或其委托代理人:':
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ black_ratio = self.ink_recognition(
|
|
|
+ input_img=input_img,
|
|
|
+ out_path=out_path,
|
|
|
+ meta={
|
|
|
+ "left": l,
|
|
|
+ "right": r,
|
|
|
+ "top": t,
|
|
|
+ "bottom": b
|
|
|
+ }
|
|
|
+
|
|
|
+ )
|
|
|
+ if black_ratio >= self.sign_threshold:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ return identify(red_image_info, removed_red_seal_img, red_ink_image_path) \
|
|
|
+ or identify(blue_image_info, removed_blue_seal_img, blue_ink_image_path)
|
|
|
+ # 用于判断固定位置的长方形框内是否存在签名字迹
|
|
|
+
|
|
|
+ # 用于识别图像固定位置黑色字迹所占比例,并将该位置的图像截取保存
|
|
|
+ def ink_recognition(self, input_img, out_path, meta: dict):
|
|
|
+ left = meta["left"]
|
|
|
+ right = meta["right"]
|
|
|
+ top = meta["top"]
|
|
|
+ bottom = meta["bottom"]
|
|
|
+ crop_img = input_img[top:bottom, left:right, :]
|
|
|
+ cv2.rectangle(input_img, (left, top), (right, bottom), (255, 255, 0), 2) # 绿色框,线宽为2
|
|
|
+ test_path = out_path[:-4] + '*' + out_path[-4:]
|
|
|
+ if crop_img is None or crop_img.size == 0:
|
|
|
+ logger.info("Error: crop_img is empty")
|
|
|
+ return 0.0
|
|
|
+ else:
|
|
|
+ cv2.imwrite(out_path, crop_img)
|
|
|
+ cv2.imwrite(test_path, input_img)
|
|
|
+
|
|
|
+ gray_img = cv2.cvtColor(crop_img, cv2.COLOR_BGR2GRAY)
|
|
|
+ thresh, ret = cv2.threshold(gray_img, 0, 255, cv2.THRESH_OTSU)
|
|
|
+ filter_condition = int(thresh * 0.90)
|
|
|
+ _, black_thresh = cv2.threshold(gray_img, filter_condition, 255, cv2.THRESH_BINARY_INV)
|
|
|
+
|
|
|
+ total_pixels = black_thresh.size
|
|
|
+ black_pixels = np.count_nonzero(black_thresh)
|
|
|
+ black_ratio = black_pixels / total_pixels
|
|
|
+ return black_ratio
|
|
|
+
|
|
|
+ # 用于判别字体大小
|
|
|
+ def font_judge(self, kw_search_meta):
|
|
|
+ if len(kw_search_meta) == 0:
|
|
|
+ # 即未搜寻到关键字,非相关页
|
|
|
+ return False
|
|
|
+ for meta in kw_search_meta:
|
|
|
+ keyword = meta["keyword"]
|
|
|
+ font_size = meta["font_size"]
|
|
|
+ logger.info(f"keyword:{keyword} has font_size: {font_size}")
|
|
|
+ if font_size >= self.font_threshold:
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # 基于paddlepaddle的table ocr接口
|
|
|
+ def table_parse(self, image_path: str, save_folder: str = ''):
|
|
|
+ table_engine = PPStructure(show_log=True)
|
|
|
+ img = cv2.imread(image_path)
|
|
|
+ result = table_engine(img)
|
|
|
+ expectation = {
|
|
|
+ "table": {
|
|
|
+ "title": [],
|
|
|
+ "title_confidence": [],
|
|
|
+ "content": [],
|
|
|
+ "content_confidence": [],
|
|
|
+ },
|
|
|
+ "figure": {
|
|
|
+ "content": [],
|
|
|
+ "content_confidence": [],
|
|
|
+ "caption": [],
|
|
|
+ "caption_confidence": [],
|
|
|
+ },
|
|
|
+ "page_numbers": [],
|
|
|
+ "others": []
|
|
|
+ }
|
|
|
+ for res in result:
|
|
|
+ if res['type'] == 'title' or res['type'] == 'table_caption':
|
|
|
+ if len(res['res']) > 0:
|
|
|
+ expectation['table']['title_confidence'].append(res['res'][0]['confidence'])
|
|
|
+ expectation['table']['title'].append(res['res'][0]['text'])
|
|
|
+ elif res['type'] == 'table':
|
|
|
+ expectation['table']['content_confidence'].append(res['score'])
|
|
|
+ expectation['table']['content'].append(pd.read_html(res['res']['html'])[0].values.tolist())
|
|
|
+ elif res['type'] == 'figure':
|
|
|
+ expectation['figure']['content_confidence'].append(res['score'])
|
|
|
+ expectation['figure']['content'].append(res['res'])
|
|
|
+ elif res['type'] == 'figure_caption':
|
|
|
+ expectation['figure']['caption_confidence'].append(res['score'])
|
|
|
+ expectation['figure']['caption'].append(res['res'])
|
|
|
+ else:
|
|
|
+ expectation['others'].append(res)
|
|
|
+
|
|
|
+ if save_folder:
|
|
|
+ # 存储为save_folder/save_name
|
|
|
+ save_structure_res(result, save_folder, os.path.basename(image_path).split('.')[0])
|
|
|
+
|
|
|
+ return expectation
|
|
|
+
|
|
|
+
|
|
|
+# 提供pdf解析,并基于提取文本信息进行位置匹配
|
|
|
+class PdfMatcher(PdfExtractAttr):
|
|
|
+ # file_path为提供的pdf文件路径
|
|
|
+ def __init__(self, file_path: str):
|
|
|
+ super(PdfMatcher, self).__init__(
|
|
|
+ file_path=file_path
|
|
|
+ )
|
|
|
+ # 投标书路径
|
|
|
+ self.document = file_path
|
|
|
+ # 投标书名称
|
|
|
+ self.bid_name = file_path.split('/')[-1][:-4]
|
|
|
+ # 投标书数据文件夹
|
|
|
+ self.bid_dir = os.path.join(os.path.dirname(file_path), self.bid_name)
|
|
|
+ # 公司名称
|
|
|
+ self.firm_name = file_path.split('/')[-2]
|
|
|
+ # title list
|
|
|
+ title_path = os.path.join(self.bid_dir, "title.json")
|
|
|
+ # image list
|
|
|
+ # self.image_dir = os.path.join(self.bid_dir, "extracted_images")
|
|
|
+ # if (not os.path.exists(title_path)) or (not os.path.exists(self.image_dir)):
|
|
|
+ # os.makedirs(self.image_dir, exist_ok=True)
|
|
|
+ if not os.path.exists(title_path):
|
|
|
+ self.main_parse(pdf_path=file_path, title_path=title_path)
|
|
|
+ # self.main_parse(pdf_path=file_path, title_path=title_path, image_dir=self.image_dir)
|
|
|
+ self.title = load_json(title_path)
|
|
|
+ # outline list
|
|
|
+ outline_path = os.path.join(self.bid_dir, "outlines.json")
|
|
|
+ self.outline = self.parse_outline(out_path=outline_path)
|
|
|
+ # text list
|
|
|
+ text_path = os.path.join(self.bid_dir, "all_texts.json")
|
|
|
+ self.details = self.parse_text(out_path=text_path)
|
|
|
+ # table list
|
|
|
+ table_path = os.path.join(self.bid_dir, "all_tables.json")
|
|
|
+ if os.path.exists(table_path):
|
|
|
+ self.table = load_json(table_path)
|
|
|
+ else:
|
|
|
+ self.table = self.parse_table_pro(table_path=table_path)
|
|
|
+ # image format
|
|
|
+ # self.image_format = "image_page_{}*"
|
|
|
+ # image filter threshold
|
|
|
+ self.start_threshold = 10
|
|
|
+ self.distance_threshold = 6
|
|
|
+ self.search_threshold = 20
|
|
|
+ # total pages
|
|
|
+ self.total_pages = self.count_pages()
|
|
|
+ # 证书正则
|
|
|
+ self.license_dict = {
|
|
|
+ "business_license" : r'营业执照',
|
|
|
+ "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$',
|
|
|
+ "production_license": r'\b[\u4e00-\u9fff]*许可证\b',
|
|
|
+ "qualtifications" : r'\b[\u4e00-\u9fff]*证书',
|
|
|
+ "proof": r'\b[\u4e00-\u9fff]*证明',
|
|
|
+ }
|
|
|
+
|
|
|
+ # 在title中找寻包含keyword的信息
|
|
|
+ # digit_limit表明是否使用数字限制
|
|
|
+ def search_in_title(self, keyword, digit_limit=False):
|
|
|
+ meta = []
|
|
|
+ digits = "一二三四五六七八九十"
|
|
|
+ for title_block in self.title:
|
|
|
+ block_text = title_block['text'].replace(' ', '').strip()
|
|
|
+ if digit_limit:
|
|
|
+ if keyword in block_text:
|
|
|
+ # 确保keyword左右不包含digit中的内容
|
|
|
+ cnt = block_text.find(keyword)
|
|
|
+ length = len(keyword)
|
|
|
+ check_left = cnt - 1
|
|
|
+ check_right = cnt + length
|
|
|
+ if (check_left >= 0 and block_text[check_left] in digits) or (check_right < len(block_text) and block_text[check_right] in digits):
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ if keyword in block_text:
|
|
|
+ meta.append({
|
|
|
+ "page_number": title_block["page_number"],
|
|
|
+ "text": block_text
|
|
|
+ })
|
|
|
+ return meta
|
|
|
+
|
|
|
+
|
|
|
+ # 在outline中找寻包含keywords的信息
|
|
|
+ def search_in_outline(self, keyword):
|
|
|
+ meta = []
|
|
|
+ for outline_block in self.outline:
|
|
|
+ block_text = outline_block['text'].replace(' ', '').strip()
|
|
|
+ if keyword in block_text:
|
|
|
+ meta.append({
|
|
|
+ "page_number": outline_block["page_number"],
|
|
|
+ "text": block_text
|
|
|
+ })
|
|
|
+ return meta
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # 用于定位营业执照、资质证书的页面范围
|
|
|
+ def search_license_interval(self, necessity_interval=None):
|
|
|
+ '''定位营业执照、资质证书的区间范围'''
|
|
|
+ # 通过关键字模糊定位
|
|
|
+ keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','其他资料','附件', '影印件']
|
|
|
+
|
|
|
+ search_interval = []
|
|
|
+ license_pages = []
|
|
|
+
|
|
|
+ # locate in title.json
|
|
|
+ left_pos = -1 # 左指针
|
|
|
+ right_pos = -1 # 右指针
|
|
|
+ for title_block in self.title:
|
|
|
+ block_text = title_block['text'].replace(' ', '').strip()
|
|
|
+
|
|
|
+ # TODO 先进行证书正则判断
|
|
|
+ '''
|
|
|
+ for key, format in self.license_dict.items():
|
|
|
+ match = re.search(format, block_text)
|
|
|
+ if match:
|
|
|
+ license_pages.append(title_block['page_number'])
|
|
|
+ '''
|
|
|
+
|
|
|
+ # 先进行左区间判定
|
|
|
+ if left_pos != -1 and '证书' not in block_text:
|
|
|
+ right_pos = title_block['page_number']
|
|
|
+ search_interval.append((left_pos, right_pos))
|
|
|
+ # 重置
|
|
|
+ left_pos = -1
|
|
|
+
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in block_text:
|
|
|
+ # 先进行模糊的outline定位
|
|
|
+ center_page = None
|
|
|
+ if '.' in block_text:
|
|
|
+ center_page = block_text.split('.')[-1]
|
|
|
+ if center_page.isdigit():
|
|
|
+ center_page = eval(center_page)
|
|
|
+ left_pos = min(title_block['page_number'], center_page)
|
|
|
+ else:
|
|
|
+ left_pos = title_block['page_number']
|
|
|
+
|
|
|
+
|
|
|
+ # 最终判定
|
|
|
+ if left_pos != -1:
|
|
|
+ search_interval.append((left_pos, right_pos))
|
|
|
+
|
|
|
+
|
|
|
+ # 重置
|
|
|
+ left_pos = -1
|
|
|
+ right_pos = -1
|
|
|
+
|
|
|
+ # locate in outlines.json
|
|
|
+ if len(self.outline) > 0:
|
|
|
+ for outline_block in self.outline:
|
|
|
+
|
|
|
+ if left_pos != -1:
|
|
|
+ right_pos = outline_block["page_number"]
|
|
|
+ right_pos = right_pos if right_pos is not None else -1
|
|
|
+ search_interval.append((left_pos, right_pos))
|
|
|
+ left_pos = -1
|
|
|
+
|
|
|
+ outline_text = outline_block['title'].strip()
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in outline_text:
|
|
|
+ if outline_block["page_number"] is not None:
|
|
|
+ left_pos = outline_block["page_number"]
|
|
|
+
|
|
|
+ # 最终判定
|
|
|
+ if left_pos != -1:
|
|
|
+ search_interval.append((left_pos, right_pos))
|
|
|
+
|
|
|
+ if necessity_interval is not None:
|
|
|
+ search_interval += necessity_interval
|
|
|
+
|
|
|
+ # 搜寻区间合并
|
|
|
+ search_interval.sort()
|
|
|
+
|
|
|
+ logger.info(f"search_interval: {search_interval} ...")
|
|
|
+
|
|
|
+ merge_interval = []
|
|
|
+ if len(search_interval) > 0:
|
|
|
+ left = -1
|
|
|
+ right = -1
|
|
|
+ for interval in search_interval:
|
|
|
+ l, r = interval
|
|
|
+ if r < l:
|
|
|
+ continue
|
|
|
+ # 初始化
|
|
|
+ if left == -1 and right == -1:
|
|
|
+ left = l
|
|
|
+ right = r
|
|
|
+
|
|
|
+ elif l <= right and r > right:
|
|
|
+ right = r
|
|
|
+
|
|
|
+ elif l <= right:
|
|
|
+ continue
|
|
|
+
|
|
|
+ else:
|
|
|
+ merge_interval.append((left, right))
|
|
|
+ left = l
|
|
|
+ right = r
|
|
|
+ merge_interval.append((left, right))
|
|
|
+
|
|
|
+ return merge_interval
|
|
|
+
|
|
|
+ # 用于定位相关业绩的页面范围
|
|
|
+ def search_perf_info(self, ):
|
|
|
+ flag = False
|
|
|
+ keywords = ['资格审查资料','资格审查材料']
|
|
|
+ meta = {
|
|
|
+ "perf_page_number": -1,
|
|
|
+ "qual_page_number": set(),
|
|
|
+ "table": None
|
|
|
+ }
|
|
|
+ # 先从表格数据中查询是否直接提取到相关业绩表信息
|
|
|
+ for table_block in self.table:
|
|
|
+ page_number = table_block["page_numbers"]
|
|
|
+ table_name = table_block["table_name"]
|
|
|
+ table_name = table_name.strip().replace("\n", "").replace(" ", "")
|
|
|
+ if ('类似' in table_name) and (('项目' in table_name) or ('业绩' in table_name)):
|
|
|
+ flag = True
|
|
|
+ meta["perf_page_number"] = page_number
|
|
|
+ meta["table"] = table_block["table"]
|
|
|
+ break
|
|
|
+ if flag:
|
|
|
+ return meta
|
|
|
+ # 从outlines中模糊匹配
|
|
|
+ for outline_block in self.outline:
|
|
|
+ page_number = outline_block["page_number"]
|
|
|
+ text = outline_block["title"]
|
|
|
+ text = text.strip().replace("\n", "").replace(" ", "")
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in text:
|
|
|
+ qual_page = page_number
|
|
|
+ meta["qual_page_number"].add(qual_page)
|
|
|
+ if ('类似' in text) and (('项目' in text) or ('业绩' in text)):
|
|
|
+ flag = True
|
|
|
+ meta["perf_page_number"] = page_number
|
|
|
+ break
|
|
|
+ if flag:
|
|
|
+ return meta
|
|
|
+ # 从title中模糊匹配
|
|
|
+ for title_block in self.title:
|
|
|
+ page_number = title_block["page_number"]
|
|
|
+ text = title_block["text"]
|
|
|
+ text = text.strip().replace("\n", "").replace(" ", "")
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in text:
|
|
|
+ qual_page = page_number
|
|
|
+ meta["qual_page_number"].add(qual_page)
|
|
|
+ if ('类似' in text) and (('项目' in text) or ('业绩' in text)):
|
|
|
+ flag = True
|
|
|
+ meta["perf_page_number"] = page_number
|
|
|
+ break
|
|
|
+
|
|
|
+
|
|
|
+ return meta
|
|
|
+
|
|
|
+ # 返回可能为营业执照或资质证书的图像集
|
|
|
+ def find_candidate_images(self):
|
|
|
+
|
|
|
+ candidate_images = set()
|
|
|
+
|
|
|
+ merge_intervals = self.search_license_interval()
|
|
|
+ logger.info(f"merge_intervals: {merge_intervals}")
|
|
|
+
|
|
|
+ for interval in merge_intervals:
|
|
|
+ start_page, end_page = interval
|
|
|
+
|
|
|
+ if start_page <= self.start_threshold:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if end_page == -1:
|
|
|
+ end_page = start_page + 20
|
|
|
+
|
|
|
+ candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images)
|
|
|
+
|
|
|
+ candidate_images = list(candidate_images)
|
|
|
+ return candidate_images
|
|
|
+
|
|
|
+ # 使用正则查询符合格式的图像
|
|
|
+ def image_regularization(self, start_page: int, end_page:int, candidate_images: set):
|
|
|
+ for index in range(start_page, end_page + 1):
|
|
|
+ current_format = self.image_format.format(index)
|
|
|
+ files = glob.glob(os.path.join(self.image_dir, current_format))
|
|
|
+ filter_files = [file for file in files if not file.endswith('.unk')]
|
|
|
+ candidate_images.update(filter_files)
|
|
|
+ return candidate_images
|
|
|
+
|
|
|
+ # 返回可能为营业执照或资质证书的pdf2img图像集
|
|
|
+ def find_candidate_images_pro(self, necessity_interval=None):
|
|
|
+
|
|
|
+ scanned_dir = self.pdf2img()
|
|
|
+ candidate_images = set()
|
|
|
+ merge_intervals = self.search_license_interval(necessity_interval=necessity_interval)
|
|
|
+ logger.info(f"merge_intervals: {merge_intervals}")
|
|
|
+
|
|
|
+
|
|
|
+ for interval in merge_intervals:
|
|
|
+ start_page, end_page = interval
|
|
|
+
|
|
|
+ if start_page <= self.start_threshold:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if end_page == -1:
|
|
|
+ end_page = start_page + 20
|
|
|
+
|
|
|
+ for index in range(start_page, end_page + 1):
|
|
|
+ img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
|
|
|
+ processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
|
|
|
+ if os.path.exists(img_path) and (not os.path.exists(processed_img_path)):
|
|
|
+ processed_img = remove_red_seal(image_path=img_path)
|
|
|
+ cv2.imwrite(processed_img_path, processed_img)
|
|
|
+ candidate_images.add(img_path)
|
|
|
+ candidate_images.add(processed_img_path)
|
|
|
+
|
|
|
+
|
|
|
+ candidate_images = list(candidate_images)
|
|
|
+ return candidate_images
|
|
|
+
|
|
|
+ # 在表格数据中查询是否提取到投标报价表的数据
|
|
|
+ def find_bid_quotation_form(self):
|
|
|
+ keywords = ["投标报价总表", "投标报价汇总表"]
|
|
|
+ key_column = '增值税金额'
|
|
|
+ tables = []
|
|
|
+ flag = False
|
|
|
+ for table_block in self.table:
|
|
|
+ page_number = table_block["page_numbers"]
|
|
|
+ table_name = table_block["table_name"]
|
|
|
+ table_name = table_name.replace(' ', '')
|
|
|
+ # 根据关键词找寻table
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in table_name:
|
|
|
+ tables = table_block["table"]
|
|
|
+ flag = True
|
|
|
+ break
|
|
|
+ # 再根据关键列名找寻table
|
|
|
+ if len(tables) == 0:
|
|
|
+ column_num = len(table_block["table"])
|
|
|
+ cnt = 0
|
|
|
+ while cnt < column_num:
|
|
|
+ column_list = table_block["table"][cnt]
|
|
|
+ for column_name in column_list:
|
|
|
+ if column_name is not None:
|
|
|
+ column_name = column_name.replace("\n", "").replace(" ", "").strip()
|
|
|
+ if key_column in column_name:
|
|
|
+ tables = table_block["table"]
|
|
|
+ flag = True
|
|
|
+ break
|
|
|
+ if '其中' in column_name:
|
|
|
+ cnt += 1
|
|
|
+ if (not cnt) or flag:
|
|
|
+ break
|
|
|
+ if flag:
|
|
|
+ break
|
|
|
+ # 当前表格中存在投标报价表的信息
|
|
|
+ if flag:
|
|
|
+ parsed_table = self.extract_table(table=tables)
|
|
|
+ return page_number, parsed_table
|
|
|
+ # 当前表格中不存在投标报价表的信息
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 在表格数据中查询是否提取到拟投入本项目人员配备情况表 or 项目管理机构组成表的数据
|
|
|
+ def find_itempeople_form(self):
|
|
|
+ keywords = ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表']
|
|
|
+ flag = False # 标记是否通过table_name查询到表格
|
|
|
+ meta = {
|
|
|
+ "candidate_page": set(),
|
|
|
+ "table_list": [],
|
|
|
+ }
|
|
|
+ for table_block in self.table:
|
|
|
+
|
|
|
+ if len(table_block["table"]) == 0:
|
|
|
+ continue
|
|
|
+
|
|
|
+ page_number = table_block["page_numbers"]
|
|
|
+ table_name = table_block["table_name"]
|
|
|
+ table_name = table_name.strip().replace("\n", "").replace(" ", "")
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in table_name:
|
|
|
+ meta["table_list"].append({
|
|
|
+ "page_number":page_number,
|
|
|
+ "table": table_block["table"]
|
|
|
+ })
|
|
|
+ flag = True
|
|
|
+ break
|
|
|
+ if flag:
|
|
|
+ return meta
|
|
|
+
|
|
|
+ column_name_list = table_block["table"][0]
|
|
|
+ for column_name in column_name_list:
|
|
|
+ if column_name is not None:
|
|
|
+ column_name = column_name.strip().replace("\n", "").replace(" ", "")
|
|
|
+ if '职务' in column_name or '职称' in column_name:
|
|
|
+ meta["table_list"].append({
|
|
|
+ "page_number":page_number,
|
|
|
+ "table": table_block["table"]
|
|
|
+ })
|
|
|
+ break
|
|
|
+
|
|
|
+ sec_keywords = ['拟投入本项目人员配备情况表', '项目管理机构', '项目管理机构组成表']
|
|
|
+ # 在outlines中定位项目管理机构等位置
|
|
|
+ for outline_block in self.outline:
|
|
|
+ page_number = outline_block["page_number"]
|
|
|
+ text = outline_block["title"]
|
|
|
+ text = text.strip().replace("\n", "").replace(" ", "")
|
|
|
+ for sec_keyword in sec_keywords:
|
|
|
+ if sec_keyword in text:
|
|
|
+ if '.' in text:
|
|
|
+ page = text.split('.')[-1]
|
|
|
+ if page.isdigit():
|
|
|
+ page = eval(page)
|
|
|
+ else:
|
|
|
+ page = page_number
|
|
|
+ meta["candidate_page"].add(page)
|
|
|
+
|
|
|
+ # 在titles中定位项目管理机构等位置
|
|
|
+ for title_block in self.title:
|
|
|
+ page_number = title_block["page_number"]
|
|
|
+ text = title_block["text"]
|
|
|
+ text = text.strip().replace("\n", "").replace(" ", "")
|
|
|
+ for sec_keyword in sec_keywords:
|
|
|
+ if sec_keyword in text:
|
|
|
+ if '.' in text:
|
|
|
+ page = text.split('.')[-1]
|
|
|
+ if page.isdigit():
|
|
|
+ page = eval(page)
|
|
|
+ else:
|
|
|
+ page = page_number
|
|
|
+ meta["candidate_page"].add(page)
|
|
|
+
|
|
|
+ return meta
|
|
|
+
|
|
|
+ # 用于解析提取到的表格信息
|
|
|
+ def extract_table(self, table):
|
|
|
+ row_num = len(table)
|
|
|
+ if row_num == 0:
|
|
|
+ return [], []
|
|
|
+ column_num = len(table[0])
|
|
|
+ new_table = []
|
|
|
+ # first step: 完善列名
|
|
|
+ cnt = 0 # 从第一行开始
|
|
|
+ column_list = []
|
|
|
+ while len(column_list) < column_num and cnt < row_num:
|
|
|
+ current_column_list = table[cnt]
|
|
|
+ for column_name in current_column_list:
|
|
|
+ column_name = str(column_name).strip().replace("\n", "").replace(" ", "")
|
|
|
+ if (column_name != None) and ('其中' not in column_name) and (column_name not in column_list):
|
|
|
+ column_list.append(column_name)
|
|
|
+ if len(column_list) < column_num:
|
|
|
+ cnt += 1
|
|
|
+ # second step: 填入表格
|
|
|
+ new_table.append(column_list)
|
|
|
+ for i in range(cnt + 1, row_num):
|
|
|
+ tmp = []
|
|
|
+ for j in range(column_num):
|
|
|
+ element = table[i][j]
|
|
|
+ tmp.append(element)
|
|
|
+ new_table.append(tmp)
|
|
|
+
|
|
|
+ return column_list, new_table
|
|
|
+
|
|
|
+ # 查询pdf总页数
|
|
|
+ def count_pages(self):
|
|
|
+ reader = PdfReader(self.file_path)
|
|
|
+ return len(reader.pages)
|
|
|
+
|
|
|
+ # 用于自动创建pdf->image的scanned文件夹
|
|
|
+ def pdf2img(self):
|
|
|
+ scanned_dir = os.path.join(self.bid_dir, 'scanned')
|
|
|
+ if os.path.exists(scanned_dir):
|
|
|
+ logger.info(f"检测到当前投标文件{self.bid_dir}存在扫描文件夹 ...")
|
|
|
+ else:
|
|
|
+ os.makedirs(scanned_dir, exist_ok=True)
|
|
|
+ logger.info(f"开始转换pdf2img页面")
|
|
|
+ convert_start_time = time.time()
|
|
|
+ try:
|
|
|
+ images = convert_from_path(pdf_path=self.document)
|
|
|
+ for i, image in enumerate(images):
|
|
|
+ image.save(os.path.join(scanned_dir, f'page-{i}.jpg'), 'JPEG')
|
|
|
+ logger.info("convert successfully !")
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
+ logger.info(f"convert failure: {e}")
|
|
|
+ convert_cost_time = time.time() - convert_start_time
|
|
|
+ logger.info(f"转化pdf2img花费{convert_cost_time // 60} min {convert_cost_time % 60} sec ...")
|
|
|
+
|
|
|
+ return scanned_dir
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class PdfParse_pipeline():
|
|
|
+ def __init__(self,
|
|
|
+ ocr, # ocr接口
|
|
|
+ firm_dir, # 存储所有公司的路径
|
|
|
+ out_path, # 输出地址
|
|
|
+ ):
|
|
|
+ self.ocr = ocr
|
|
|
+ self.firm_dir = firm_dir
|
|
|
+ self.out_path = out_path
|
|
|
+
|
|
|
+ def parse_pipeline(self):
|
|
|
+ data = {}
|
|
|
+
|
|
|
+ for firm_name in tqdm(os.listdir(self.firm_dir)):
|
|
|
+ logger.info(f'processing firm {firm_name} ...')
|
|
|
+ firm_path = os.path.join(self.firm_dir, firm_name)
|
|
|
+ for bid_name in tqdm(os.listdir(firm_path)):
|
|
|
+ if bid_name.endswith('.pdf'):
|
|
|
+ document=os.path.join(firm_path, bid_name)
|
|
|
+ bid_dir = os.path.join(firm_path, bid_name[:-4])
|
|
|
+ os.makedirs(bid_dir, exist_ok=True)
|
|
|
+
|
|
|
+ document_data = self.parse_single_document(pdf_path=document)
|
|
|
+ data[firm_name] = document_data
|
|
|
+
|
|
|
+ # 以下将data的数据存入out_path
|
|
|
+ with open(self.out_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(data, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+ def parse_single_document(self, pdf_path: str):
|
|
|
+ agent = PdfMatcher(file_path=pdf_path)
|
|
|
+ firm_name = agent.firm_name
|
|
|
+ total_pages = agent.total_pages
|
|
|
+ data = {
|
|
|
+ "necessity_interval": [],
|
|
|
+ # 投标函中是否有签字 or 盖章
|
|
|
+ "has_signature_or_seal": False,
|
|
|
+ "formatting_img": None,
|
|
|
+ # 资质证书 & 营业执照信息
|
|
|
+ "license_list":[],
|
|
|
+ # 投标报价汇总表
|
|
|
+ "bid_form": None,
|
|
|
+ # 相关业绩表
|
|
|
+ "perf_info": [],
|
|
|
+ # 项目经理相关信息
|
|
|
+ "manager": [],
|
|
|
+ "kw_meta": {}
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info("start finding the kw info in directory ...")
|
|
|
+ kw_meta = self.find_kw_from_dc(agent=agent, data=data, total_pages=total_pages)
|
|
|
+
|
|
|
+ logger.info("start processing the nextiter information ...")
|
|
|
+ # iter = self.parse_nextiter(agent=agent, data=data, total_pages=total_pages)
|
|
|
+
|
|
|
+ # for signature or seal
|
|
|
+ logger.info("start judging the signature & seal information ...")
|
|
|
+ # self.parse_bid(agent=agent, data=data, total_pages=total_pages)
|
|
|
+
|
|
|
+ # for license_list
|
|
|
+ logger.info("start finding license information ...")
|
|
|
+ # self.parse_license(agent=agent, data=data, iter=iter, firm_name=firm_name)
|
|
|
+
|
|
|
+ # for bid_form
|
|
|
+ logger.info("start finding bid form ...")
|
|
|
+ # self.parse_bid_form(agent=agent, data=data)
|
|
|
+
|
|
|
+ # for perf information
|
|
|
+ logger.info("start finding perf information ...")
|
|
|
+ # self.parse_perf(agent=agent, data=data)
|
|
|
+
|
|
|
+ # for manager
|
|
|
+ logger.info("start finding manager information ...")
|
|
|
+ self.parse_manager(agent=agent, data=data, kw_meta=kw_meta["manager"])
|
|
|
+
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+ # 从目录中查询是否存在关键词以及该关键字对应页码
|
|
|
+ def find_kw_from_dc(self, agent, data, total_pages):
|
|
|
+
|
|
|
+ meta = {}
|
|
|
+ keywords = {
|
|
|
+ "manager": ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表']
|
|
|
+ }
|
|
|
+ # 初始化
|
|
|
+ for kw in keywords:
|
|
|
+ meta[kw] = []
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+ # 目录一般位于前20页
|
|
|
+ start = 0
|
|
|
+ end = 20 if total_pages > 20 else total_pages
|
|
|
+ is_enter = False
|
|
|
+ for index in range(start, end):
|
|
|
+ logger.info(f"find kw from index {index} ...")
|
|
|
+ img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
|
|
|
+ processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
|
|
|
+ # 去除红章
|
|
|
+ if not os.path.exists(processed_img_path):
|
|
|
+ processed_img = remove_red_seal(image_path=img_path)
|
|
|
+ cv2.imwrite(processed_img_path, processed_img)
|
|
|
+ # 对处理过红章的页面进行ocr
|
|
|
+ content = self.ocr.get_content(image_path=processed_img_path)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+
|
|
|
+ if not is_enter and self.ocr.search(image_info, '目录'):
|
|
|
+ # 当前为目录页面首页,标记is_enter
|
|
|
+ is_enter = True
|
|
|
+
|
|
|
+ # 已经进入目录页面
|
|
|
+ if is_enter:
|
|
|
+ # 整体搜寻关键字
|
|
|
+ for kw, elements in keywords.items():
|
|
|
+
|
|
|
+ pack_info = self.ocr.pack_search(image_info=image_info, key_list=elements)
|
|
|
+
|
|
|
+ logger.info(pack_info)
|
|
|
+ # 找出对应数值标签
|
|
|
+ if len(pack_info) > 0:
|
|
|
+ for info in pack_info:
|
|
|
+ word = info["word"]
|
|
|
+ contain_key = info["contain_key"]
|
|
|
+ pos = info["bbox"]
|
|
|
+ # 如果word中包含了页码
|
|
|
+ if word[-1].isdigit():
|
|
|
+ label_page = word.split('.')[-1]
|
|
|
+ meta[kw].append(
|
|
|
+ {
|
|
|
+ "element": contain_key,
|
|
|
+ "word": word,
|
|
|
+ "label_page": label_page
|
|
|
+ }
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ meta[kw].append(
|
|
|
+ {
|
|
|
+ "element": contain_key,
|
|
|
+ "word": word,
|
|
|
+ "label_page": self.ocr.digit_label(image_info=image_info, pos=pos)
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ data["kw_meta"] = meta
|
|
|
+ return meta
|
|
|
+
|
|
|
+ def parse_nextiter(self, agent, data, total_pages):
|
|
|
+
|
|
|
+
|
|
|
+ # 目录一般都会带有关键字:目录
|
|
|
+ keyword = '目录'
|
|
|
+ # 需要定位下一章的关键字
|
|
|
+ iter_keywords = {
|
|
|
+ '1': ['资格审查资料', '资格审查材料'],
|
|
|
+ '2': ['其他材料', '其它材料', '其他资料', '其它资料'],
|
|
|
+ '3': ['附件'],
|
|
|
+ '4': ['影印件']
|
|
|
+ }
|
|
|
+ index_keywords = {
|
|
|
+ '1': ['一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、'],
|
|
|
+ '2': ['一章', '二章', '三章', '四章', '五章', '六章', '七章', '八章', '九章', '十章']
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ # 找寻下一层级
|
|
|
+ def find_next(current_index):
|
|
|
+ logger.info(f"processing current_index: {current_index}")
|
|
|
+ cycle = {
|
|
|
+ "一": "二",
|
|
|
+ "二": "三",
|
|
|
+ "三": "四",
|
|
|
+ "四": "五",
|
|
|
+ "五": "六",
|
|
|
+ "六": "七",
|
|
|
+ "七": "八",
|
|
|
+ "八": "九",
|
|
|
+ "九": "十",
|
|
|
+ "十": "二",
|
|
|
+ }
|
|
|
+ if current_index.isdigit():
|
|
|
+ next_index = str(eval(current_index) + 1)
|
|
|
+ return next_index
|
|
|
+ next_index = ""
|
|
|
+ # 涉及进位
|
|
|
+ if len(current_index) == 1:
|
|
|
+ if current_index in cycle.keys():
|
|
|
+ if current_index == "十":
|
|
|
+ next_index = "十一"
|
|
|
+ else:
|
|
|
+ next_index = cycle[current_index]
|
|
|
+ else:
|
|
|
+ raise ValueError(f"筛选current index {current_index} 有误 ...")
|
|
|
+
|
|
|
+ return next_index
|
|
|
+
|
|
|
+ if current_index[-1] == '九':
|
|
|
+ if current_index[0] in cycle.keys():
|
|
|
+ next_index = cycle[current_index[0]] + '十'
|
|
|
+ else:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ elif current_index[-1] == '十':
|
|
|
+ next_index = current_index + '一'
|
|
|
+
|
|
|
+ else:
|
|
|
+ if current_index[-1] in cycle.keys():
|
|
|
+ next_index = current_index[:-1] + cycle[current_index[-1]]
|
|
|
+ else:
|
|
|
+ return ""
|
|
|
+ return next_index
|
|
|
+
|
|
|
+ # 用于提取字符串的当前层级,并返回下一层级
|
|
|
+ def refine(string: str):
|
|
|
+
|
|
|
+ digit_keywords = "123456789一二三四五六七八九十"
|
|
|
+ string = string.strip().replace(' ', '').replace('(', '').replace(')', '').replace('(', '').replace(')', '')
|
|
|
+ flag = False
|
|
|
+ for digit_kw in digit_keywords:
|
|
|
+ if digit_kw in string:
|
|
|
+ flag = True
|
|
|
+
|
|
|
+ if not flag:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ if '、' in string and '章' in string:
|
|
|
+ index_string = string.split('、')[0]
|
|
|
+ current_index = ""
|
|
|
+ next_index = ""
|
|
|
+ is_start = False
|
|
|
+ for c in index_string:
|
|
|
+ if c == "第":
|
|
|
+ is_start = True
|
|
|
+ elif (not is_start) and c in digit_keywords:
|
|
|
+ is_start = True
|
|
|
+ current_index += c
|
|
|
+ elif c == "章":
|
|
|
+ next_index = find_next(current_index)
|
|
|
+ elif is_start and c in digit_keywords:
|
|
|
+ current_index += c
|
|
|
+ return next_index
|
|
|
+
|
|
|
+ if '、' in string:
|
|
|
+ index_string = string.split('、')[0]
|
|
|
+ next_index = find_next(index_string)
|
|
|
+ return next_index
|
|
|
+
|
|
|
+ if '章' in string and '第' in string:
|
|
|
+ l = string.find('第')
|
|
|
+ r = string.find('章')
|
|
|
+ index_string = string[l+1:r]
|
|
|
+ next_index = find_next(index_string)
|
|
|
+ return next_index
|
|
|
+
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 传入当前keyword的bounding box,返回其对应的index
|
|
|
+ def find_ocr_index(image_info, bbox: dict):
|
|
|
+
|
|
|
+ meta = {}
|
|
|
+
|
|
|
+ candidate_distance = 10000
|
|
|
+ candidate_word = ""
|
|
|
+
|
|
|
+ keywords = "123456789一二三四五六七八九十"
|
|
|
+ match_left = bbox['left']
|
|
|
+ match_right = bbox['right']
|
|
|
+ match_top = bbox['top']
|
|
|
+ match_bottom = bbox['bottom']
|
|
|
+
|
|
|
+ for info in image_info:
|
|
|
+ word = info['word'].replace(' ', '')
|
|
|
+ left = info['rect']['left']
|
|
|
+ top = info['rect']['top']
|
|
|
+ width = info['rect']['width']
|
|
|
+ height = info['rect']['height']
|
|
|
+ right = left + width
|
|
|
+ bottom = top + height
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in word and left < match_left and right < match_right:
|
|
|
+ distance = abs(top - match_top)
|
|
|
+ if distance < candidate_distance:
|
|
|
+ candidate_word = word
|
|
|
+ candidate_distance = distance
|
|
|
+
|
|
|
+ meta["candidate_word"] = candidate_word
|
|
|
+ meta["candidate_distance"] = candidate_distance
|
|
|
+ return meta
|
|
|
+
|
|
|
+
|
|
|
+ iter = []
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+ # 目录一般位于前20页
|
|
|
+ start = 0
|
|
|
+ end = 20 if total_pages > 20 else total_pages
|
|
|
+ is_enter = False
|
|
|
+ for index in range(start, end):
|
|
|
+ img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
|
|
|
+ processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
|
|
|
+ # 去除红章
|
|
|
+ if not os.path.exists(processed_img_path):
|
|
|
+ processed_img = remove_red_seal(image_path=img_path)
|
|
|
+ cv2.imwrite(processed_img_path, processed_img)
|
|
|
+ # 对处理过红章的页面进行ocr
|
|
|
+ content = self.ocr.get_content(image_path=processed_img_path)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+
|
|
|
+ if not is_enter and self.ocr.search(image_info, keyword):
|
|
|
+ # 当前为目录页面首页,标记is_enter
|
|
|
+ is_enter = True
|
|
|
+
|
|
|
+ # 已经进入目录页面
|
|
|
+ if is_enter:
|
|
|
+ for id, cover_keywords in iter_keywords.items():
|
|
|
+ meta = self.ocr.pack_search(image_info, cover_keywords)
|
|
|
+ if len(meta) == 0:
|
|
|
+ continue
|
|
|
+ for meta_info in meta:
|
|
|
+ word = meta_info['word']
|
|
|
+ logger.info(f"processing iter word: {word}")
|
|
|
+ contain_key = meta_info['contain_key']
|
|
|
+ bbox = meta_info['bbox']
|
|
|
+ # 查看word所对应序列号
|
|
|
+ # check word first
|
|
|
+ if '、' in word or ('章' in word and '第' in word):
|
|
|
+ next_index = refine(word)
|
|
|
+ if next_index != "":
|
|
|
+ iter.append({
|
|
|
+ "current_key": contain_key,
|
|
|
+ "next_index": next_index
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ # check ocr second
|
|
|
+ meta = find_ocr_index(image_info, bbox)
|
|
|
+ candidate_word = meta["candidate_word"]
|
|
|
+ next_index = refine(candidate_word)
|
|
|
+ iter.append({
|
|
|
+ "current_key": contain_key,
|
|
|
+ "next_index": next_index
|
|
|
+ })
|
|
|
+ data["iter"] = iter
|
|
|
+ return iter
|
|
|
+
|
|
|
+ def parse_bid(self, agent, data, total_pages):
|
|
|
+ # TODO 由于投标函主要出现在前30页,暂时只搜寻前30页
|
|
|
+ start_page = 0
|
|
|
+ end_page = 30 if total_pages > 30 else total_pages
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+ key_list = ['一、投标函及投标函附录', '1投标函及投标函附录', '1、投标函及投标函附录', '投标函及投标函附录', '投标函', '一、投标函', '1.投标函', '1投标函', '一投标函', '(一)投标函', '(一)投标函', '(一)、投标函', '(一)、投标函']
|
|
|
+
|
|
|
+ for index in range(start_page, end_page + 1):
|
|
|
+ img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
|
|
|
+ # 先判断该页内容是否为投标函
|
|
|
+ content = self.ocr.get_content(image_path=img_path)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+ kw_search_meta = self.ocr.exact_search(image_info, key_list)
|
|
|
+ kw_search_res = self.ocr.font_judge(kw_search_meta)
|
|
|
+ ol_search_res = self.ocr.search(image_info, ['目录'])
|
|
|
+ if (not kw_search_res) or ol_search_res:
|
|
|
+ continue
|
|
|
+
|
|
|
+ result = self.ocr.signature_recognition(image_path=img_path)
|
|
|
+ if result:
|
|
|
+ data["has_signature_or_seal"] = True
|
|
|
+ data["formatting_img"] = img_path
|
|
|
+ return
|
|
|
+
|
|
|
+ def parse_license(self, agent, iter, data, firm_name):
|
|
|
+
|
|
|
+
|
|
|
+ # 先找寻contain_key的page,再找寻next_index的page
|
|
|
+ necessity_interval = []
|
|
|
+ # 遍历得到的每一个上下章
|
|
|
+ for unit_iter in iter:
|
|
|
+ contain_key = unit_iter["current_key"]
|
|
|
+ next_index = unit_iter["next_index"]
|
|
|
+ kw_title_meta = agent.search_in_title(contain_key)
|
|
|
+ iter_title_meta = agent.search_in_title(next_index, digit_limit=True)
|
|
|
+
|
|
|
+ left = 10000
|
|
|
+ right = -1
|
|
|
+ left_kw = ""
|
|
|
+ right_kw = ""
|
|
|
+ # 先确定right page
|
|
|
+ if len(iter_title_meta) == 0:
|
|
|
+ right = agent.total_pages
|
|
|
+ else:
|
|
|
+ for iter_meta in iter_title_meta:
|
|
|
+ page_number = iter_meta["page_number"]
|
|
|
+ iter_text = iter_meta["text"]
|
|
|
+ if page_number < 20:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ if page_number > right:
|
|
|
+ right = page_number
|
|
|
+ right_kw = iter_text
|
|
|
+
|
|
|
+ if right == -1:
|
|
|
+ right = agent.total_pages
|
|
|
+ # 再确定left page
|
|
|
+
|
|
|
+ if len(kw_title_meta) == 0:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ for kw_meta in kw_title_meta:
|
|
|
+ page_number = kw_meta["page_number"]
|
|
|
+ title_text = kw_meta["text"]
|
|
|
+ if page_number < 20 or page_number > right:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ if page_number < left:
|
|
|
+ left = page_number
|
|
|
+ left_kw = title_text
|
|
|
+
|
|
|
+ if left == 10000:
|
|
|
+ continue
|
|
|
+ necessity_interval.append((left, right))
|
|
|
+ data["necessity_interval"].append(
|
|
|
+ {
|
|
|
+ "left_kw": left_kw,
|
|
|
+ "right_kw": right_kw,
|
|
|
+ "left_page": left,
|
|
|
+ "right_page": right
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ candidate_images = agent.find_candidate_images_pro(necessity_interval=necessity_interval)
|
|
|
+
|
|
|
+
|
|
|
+ # candidate_images = agent.find_candidate_images()
|
|
|
+ logger.info(candidate_images)
|
|
|
+ # import pdb; pdb.set_trace()
|
|
|
+ if len(candidate_images) == 0:
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+ for index in range(0, agent.total_pages):
|
|
|
+ img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
|
|
|
+ processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
|
|
|
+ if not os.path.exists(processed_img_path):
|
|
|
+ processed_img = remove_red_seal(image_path=img_path)
|
|
|
+ cv2.imwrite(processed_img_path, processed_img)
|
|
|
+ try:
|
|
|
+ response = self.ocr.judge_pro(image_path=processed_img_path, firm_name=firm_name)
|
|
|
+ if response == None or response['qualtified'] == None:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ data["license_list"].append({
|
|
|
+ "license_name": response["license_name"],
|
|
|
+ "license_path": img_path,
|
|
|
+ "license_page": response["license_page"],
|
|
|
+ "start_datetime": response["start_datetime"],
|
|
|
+ "end_datetime": response["end_datetime"]
|
|
|
+ })
|
|
|
+ except ValueError as e:
|
|
|
+ print(e)
|
|
|
+ else:
|
|
|
+ for img in candidate_images:
|
|
|
+ try:
|
|
|
+ response = self.ocr.judge_pro(image_path=img, firm_name=firm_name)
|
|
|
+ if response == None or response['qualtified'] == None:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ data["license_list"].append({
|
|
|
+ "license_name": response["license_name"],
|
|
|
+ "license_path": img,
|
|
|
+ "license_page": response["license_page"],
|
|
|
+ "start_datetime": response["start_datetime"],
|
|
|
+ "end_datetime": response["end_datetime"]
|
|
|
+ })
|
|
|
+
|
|
|
+ except ValueError as e:
|
|
|
+ print(e)
|
|
|
+
|
|
|
+ def parse_bid_form(self, agent, data):
|
|
|
+ result = agent.find_bid_quotation_form()
|
|
|
+ if result is None:
|
|
|
+ # 先转扫描件
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+ key_column = '增值税金额'
|
|
|
+ img_list = glob.glob(os.path.join(scanned_dir, '*.jpg'))
|
|
|
+ for img_prefix in img_list:
|
|
|
+ img_name = os.path.basename(img_prefix)
|
|
|
+ if ('roi' in img_name) or ('ink' in img_name):
|
|
|
+ continue
|
|
|
+ img_index = int(img_name.split('-')[1].split('.')[0])
|
|
|
+ if img_index > 50:
|
|
|
+ continue
|
|
|
+ img_path = os.path.join(scanned_dir, img_name)
|
|
|
+ #TODO 添加对"投标报价汇总表"字样的ocr辅助
|
|
|
+ expectation = self.ocr.table_parse(image_path=img_path, save_folder=scanned_dir)
|
|
|
+ content = self.ocr.get_content(image_path=img_path)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+ kw_res = self.ocr.search(image_info=image_info, key_list=['投标报价汇总表'])
|
|
|
+ table_list = expectation['table']['content']
|
|
|
+ if len(table_list) > 0:
|
|
|
+ for table in table_list:
|
|
|
+ column_list, parsed_table = agent.extract_table(table=table)
|
|
|
+ for column_name in column_list:
|
|
|
+ if key_column in column_name:
|
|
|
+ data["bid_form"] = {
|
|
|
+ "page": [img_index],
|
|
|
+ "table": parsed_table
|
|
|
+ }
|
|
|
+ return
|
|
|
+ if kw_res:
|
|
|
+ data["bid_form"] = {
|
|
|
+ "page": [img_index]
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ page_number, target_table = result
|
|
|
+ data["bid_form"] = {
|
|
|
+ "page": page_number,
|
|
|
+ "table": target_table
|
|
|
+ }
|
|
|
+
|
|
|
+ def parse_perf(self, agent, data):
|
|
|
+ perf_meta = agent.search_perf_info()
|
|
|
+ # import pdb; pdb.set_trace()
|
|
|
+ if perf_meta["table"] is not None:
|
|
|
+ data["perf_info"].append({
|
|
|
+ "perf_page": perf_meta["perf_page_number"],
|
|
|
+ "perf_table": perf_meta["table"]
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ center_page = 0
|
|
|
+ if perf_meta["perf_page_number"] != -1:
|
|
|
+ center_page = perf_meta["perf_page_number"]
|
|
|
+ if len(perf_meta["qual_page_number"]) > 0:
|
|
|
+ tmp = 10000
|
|
|
+ for candidate_page in perf_meta["qual_page_number"]:
|
|
|
+ if candidate_page > agent.start_threshold:
|
|
|
+ tmp = min(tmp, candidate_page)
|
|
|
+ center_page = min(center_page, tmp)
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+ img_list = glob.glob(os.path.join(scanned_dir, 'page-*.jpg'))
|
|
|
+ for img_prefix in img_list:
|
|
|
+ img_name = os.path.basename(img_prefix)
|
|
|
+ if ('roi' in img_name) or ('ink' in img_name):
|
|
|
+ continue
|
|
|
+ img_index = int(img_name.split('-')[1].split('.')[0])
|
|
|
+ if img_index >= center_page:
|
|
|
+ img_path = os.path.join(scanned_dir, img_name)
|
|
|
+ # 1st step: 移除红色印章
|
|
|
+ processed_path = os.path.join(scanned_dir, f'page-{img_index}_red_roi.jpg')
|
|
|
+ processed_folder = os.path.join(scanned_dir, 'processed')
|
|
|
+ os.makedirs(processed_folder, exist_ok=True)
|
|
|
+ if not os.path.exists(processed_path):
|
|
|
+ processed_img = remove_red_seal(img_path)
|
|
|
+ cv2.imwrite(processed_path, processed_img)
|
|
|
+ # 2nd step: 调用ocr搜寻关键字
|
|
|
+ content = self.ocr.get_content(image_path=processed_path)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+ if self.ocr.search(image_info, ['类似']):
|
|
|
+ # 3rd step: 识别表格
|
|
|
+ expectation = self.ocr.table_parse(image_path=processed_path, save_folder=processed_folder)
|
|
|
+ table_list = expectation['table']['content']
|
|
|
+ data["perf_info"].append({
|
|
|
+ "perf_page": img_index + 1,
|
|
|
+ "perf_table": table_list
|
|
|
+ })
|
|
|
+
|
|
|
+ def parse_manager(self, agent, data, kw_meta=None):
|
|
|
+ keywords = ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表', '职务', '职称']
|
|
|
+ meta = agent.find_itempeople_form()
|
|
|
+ if len(meta["table_list"]) > 0:
|
|
|
+ # 找到类似表格
|
|
|
+ data["manager"] = meta["table_list"]
|
|
|
+ else:
|
|
|
+ candidate_page_set = meta["candidate_page"]
|
|
|
+ if len(candidate_page_set) == 0 and (kw_meta is None or len(kw_meta) == 0):
|
|
|
+ logger.info("查询候选项目经理为空, 开始进行全文档搜索")
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+ for index in range(0, agent.total_pages):
|
|
|
+ raw_page = os.path.join(scanned_dir, f'page-{index}.jpg')
|
|
|
+ processed_page = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
|
|
|
+ if not os.path.exists(processed_page):
|
|
|
+ processed_img = remove_red_seal(image_path=raw_page)
|
|
|
+ cv2.imwrite(processed_page, processed_img)
|
|
|
+ # 对处理过红章的页面进行ocr
|
|
|
+ content = self.ocr.get_content(image_path=processed_page)
|
|
|
+ image_info = content["rawjson"]["ret"]
|
|
|
+ if self.ocr.search(image_info, keywords):
|
|
|
+ expectation = self.ocr.table_parse(image_path=processed_page, save_folder=scanned_dir)
|
|
|
+ table_list = expectation['table']['content']
|
|
|
+ if len(table_list) > 0:
|
|
|
+ for table in table_list:
|
|
|
+ column_list, parsed_table = agent.extract_table(table=table)
|
|
|
+ for column_name in column_list:
|
|
|
+ if '职称' in column_name or '职务' in column_name:
|
|
|
+ data["manager"].append(parsed_table)
|
|
|
+
|
|
|
+ else:
|
|
|
+ spread_set = set()
|
|
|
+ # from candidate_page_set
|
|
|
+ for candidate_page in candidate_page_set:
|
|
|
+ cnt = 0
|
|
|
+ while cnt <= 20 and candidate_page + cnt < agent.total_pages:
|
|
|
+ spread_set.add(candidate_page + cnt)
|
|
|
+ cnt += 1
|
|
|
+ # from meta
|
|
|
+ if kw_meta is not None and len(kw_meta) > 0:
|
|
|
+
|
|
|
+ for unit_meta in kw_meta:
|
|
|
+ label_page = unit_meta["label_page"]
|
|
|
+ if label_page.isdigit():
|
|
|
+ label_page = int(label_page)
|
|
|
+ cnt = -5
|
|
|
+ while cnt <= 5 and label_page + cnt < agent.total_pages:
|
|
|
+ spread_set.add(label_page + cnt)
|
|
|
+ cnt += 1
|
|
|
+
|
|
|
+ # 给每一个候选图片20区域范围
|
|
|
+ scanned_dir = agent.pdf2img()
|
|
|
+
|
|
|
+ for candidate_img in spread_set:
|
|
|
+ candidate_path = os.path.join(scanned_dir, f'page-{candidate_img}.jpg')
|
|
|
+ expectation = self.ocr.table_parse(image_path=candidate_path, save_folder=scanned_dir)
|
|
|
+ table_list = expectation['table']['content']
|
|
|
+ if len(table_list) > 0:
|
|
|
+ for table in table_list:
|
|
|
+ column_list, parsed_table = agent.extract_table(table=table)
|
|
|
+ for column_name in column_list:
|
|
|
+ if '职称' in column_name or '职务' in column_name:
|
|
|
+ data["manager"].append(parsed_table)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ # [测试demo]
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ # 请针对自己的环境进行修改log_path
|
|
|
+ global logger
|
|
|
+ firm_list = ['太原重工']
|
|
|
+ # firm_list = ['湖北海光']
|
|
|
+ for firm in firm_list:
|
|
|
+ log_path = f"/home/stf/miner_pdf/interface/test_outdir/manager_test/test_{firm}.log"
|
|
|
+ logger = create_logger(log_path=log_path)
|
|
|
+ # [环境参数]
|
|
|
+ # ocr url
|
|
|
+ url = "http://120.48.103.13:18000/ctr_ocr"
|
|
|
+ # seal_ocr url
|
|
|
+ base_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/seal?access_token="
|
|
|
+ # seal_ocr access_token
|
|
|
+ access_token = "24.6bbe9987c6bd19ba65e4402917811657.2592000.1724573148.282335-86574608"
|
|
|
+ # seal request url
|
|
|
+ seal_url = base_url + access_token
|
|
|
+ # seal_ocr headers
|
|
|
+ headers = {'content-type': 'application/x-www-form-urlencoded'}
|
|
|
+ # data_path为存储所有投标公司的起始路径
|
|
|
+ data_path = "/home/stf/miner_pdf/data/投标公司pdf"
|
|
|
+ # test_data_path为存储测试投标公司的起始路径
|
|
|
+ test_data_path = "/home/stf/miner_pdf/interface/test_files"
|
|
|
+ # test_out_path存储目前优化代码的测试结果!!!
|
|
|
+ test_out_path = "/home/stf/miner_pdf/interface/outdir/test_out.json"
|
|
|
+ unit_data_path = f"/home/stf/miner_pdf/interface/unit_test/{firm}"
|
|
|
+ # unit_out_path = f"/home/stf/miner_pdf/interface/outdir/unit_{firm}.json"
|
|
|
+ unit_out_path = f"/home/stf/miner_pdf/interface/test_outdir/manager_test/unit_{firm}.json"
|
|
|
+ # pipeline_out_path为执行所有公司pipeline逻辑后的输出位置
|
|
|
+ # 其为存放营业执照和资质证书位置信息的json文件
|
|
|
+ pipeline_out_path = "/home/stf/miner_pdf/interface/outdir/test_pipeline.json"
|
|
|
+ # single_out_path为执行单个公司pdf解析逻辑后的输出位置
|
|
|
+ # 其为存放营业执照和资质证书位置信息的json文件
|
|
|
+ single_out_path = "/home/stf/miner_pdf/interface/outdir/test_single.json"
|
|
|
+ # ground_truth目前为存储所有非扫描公司在pdf中营业执照与资质证书的json文件
|
|
|
+ ground_truth = "/home/stf/miner_pdf/ground_truth.json"
|
|
|
+ # 用于区分该公司提供的pdf文件为(扫描件 or 非扫描件)
|
|
|
+ firm_excel_file = "/home/stf/miner_pdf/data/certificate.xlsx"
|
|
|
+ df = pd.read_excel(firm_excel_file)
|
|
|
+ # 封装好的ocr接口
|
|
|
+ ocr = OcrAgent(url=url)
|
|
|
+ ocr.integrate_sealagent(
|
|
|
+ url=seal_url,
|
|
|
+ headers=headers
|
|
|
+ )
|
|
|
+ # 封装好的pipeline
|
|
|
+ pipeline = PdfParse_pipeline(
|
|
|
+ ocr=ocr,
|
|
|
+ firm_dir=unit_data_path,
|
|
|
+ out_path=unit_out_path,
|
|
|
+ )
|
|
|
+ # start
|
|
|
+
|
|
|
+ data = pipeline.parse_pipeline()
|
|
|
+
|
|
|
+ # caculate time cost
|
|
|
+ cost_time = time.time() - start_time
|
|
|
+ logger.info(f"processing {len(data)} documents, total cost {cost_time // 60} min {cost_time % 60} sec ...")
|