123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065 |
- ### 解析所有pdf文件并提取信息进行测试的框架
- ### PdfExtractAttr作为提取pdf信息的基类
- # 子类在其基础上实现匹配功能
- # 标准包导入
- import os
- import re
- import json
- import re
- import shutil
- import pandas as pd
- import pdb
- import base64
- from io import BytesIO
- from pprint import pprint
- # 第三方包导入
- import numpy as np
- import pandas as pd
- import cv2
- import torch
- import glob
- import logging
- import requests
- import time
- import datetime
- from tqdm import tqdm
- from tools import RefPageNumberResolver
- from get_info import PdfExtractAttr
- from get_info import is_title, export_image, _save_jpeg, _save_jpeg2000, _save_bmp, main_parse, table_parse, load_json
- from PIL import Image
- import cn_clip.clip as clip
- from cn_clip.clip import load_from_name, available_models
- from pdfminer.image import ImageWriter
- # global envs
- device = "cuda" if torch.cuda.is_available() else "cpu"
- clip_version = "ViT-B-16"
- model, preprocess = load_from_name(clip_version)
- model.eval()
- log_path = "/home/stf/miner_pdf/info.log"
- # log
- def create_logger(log_path):
- """
- 将日志输出到日志文件和控制台
- """
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- formatter = logging.Formatter(
- '%(asctime)s - %(levelname)s - %(message)s')
- # 创建一个handler,用于写入日志文件
- file_handler = logging.FileHandler(
- filename=log_path, mode='w')
- file_handler.setFormatter(formatter)
- file_handler.setLevel(logging.INFO)
- logger.addHandler(file_handler)
- # 创建一个handler,用于将日志输出到控制台
- console = logging.StreamHandler()
- console.setLevel(logging.DEBUG)
- console.setFormatter(formatter)
- logger.addHandler(console)
- return logger
- logger = create_logger(log_path=log_path)
- # ocr外部接口
- class OcrAgent():
- def __init__(self, url):
- self.url = url
- self.datetime_re = r'\d{4}年\d{1,2}月\d{1,2}日至(?:\d{4}年\d{1,2}月\d{1,2}日|长期)'
- # 不同类型证书资质正则
- self.re_dict = {
- "business_license" : r'营业执照',
- "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$',
- "production_license": r'\b[\u4e00-\u9fff]*许可证\b',
- "qualtifications" : r'\b[\u4e00-\u9fff]*证书',
- "proof": r'\b[\u4e00-\u9fff]*证明',
- }
- # 字迹阈值
- self.sign_threshold = 0.05
-
- def get_content(self, image_path):
- try:
- with open(image_path, 'rb') as image_file:
- files = {"file": ("image.jpg", image_file, "image/jpeg")}
- # files = {"file": ("image.png", image_file, "image/png")}
- response = requests.post(url, files=files)
- return response.json()
-
- except:
- raise ValueError(f"传入图像{image_path}已损坏")
- def remove_red_seal(self, image_path):
- # 读取图像
- input_img = cv2.imread(image_path)
- # 分离图片的通道
- blue_c, green_c, red_c = cv2.split(input_img)
- #利用大津法自动选择阈值
- thresh, ret = cv2.threshold(red_c, 0, 255, cv2.THRESH_OTSU)
- #对阈值进行调整
- filter_condition = int(thresh * 1.0)
- #移除红色的印章
- _, red_thresh = cv2.threshold(red_c, filter_condition, 255, cv2.THRESH_BINARY)
- # 把图片转回3通道
- result_img = np.expand_dims(red_thresh, axis=2)
- result_img = np.concatenate((result_img, result_img, result_img), axis=-1)
- return result_img
- def judge(self, image_path: str, firm_name: str):
- '''使用正则判断是否属于营业执照或资质证书类型'''
- # image_prefix = image_path.split('/')[-1][:-4]
- image_prefix = image_path.split('/')[-1]
- logger.info(f'processing img: {image_prefix}')
- # page_number = image_prefix.split('_')[-2]
-
- response_item = {
- "qualtified": None, # 是否为证书
- "matched": None, # 是否出现匹配的公司名称
- "license_name": None, # 证书名
- # "page_number": page_number, # 证书所在页
- "start_datetime": None, # 有效起始时间
- "end_datetime": None # 有效终止时间
- }
- content = self.get_content(image_path=image_path)
- image_info = content["rawjson"]["ret"]
- # 必须包含公司名称信息
- if not self.search(image_info=image_info, key=firm_name):
- return None
- else:
- response_item['matched'] = True
-
- # 是否匹配营业执照或资质证书
- for key, format in self.re_dict.items():
- if key == 'business_license':
- match_name = self.re_match(image_info=image_info, format=format)
- else:
- match_name = self.re_search(image_info=image_info, format=format)
- if match_name and key == 'business_license':
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_license_datetime(image_info=image_info, response_item=response_item)
- return response_item
- elif match_name:
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item)
- return response_item
- return response_item
- # TODO 资质证书有效期定位
- def find_certificate_datetime(self, image_info, response_item):
- # keyword
- start_keywords = ['颁发日期', '发证日期', '生效日期']
- end_keywords = ['终止日期']
- priority_keywords = ['有效期', '使用期限', '有效日期']
- keywords_list = ['有效期', '使用期限', '有效日期', '终止日期', '颁发日期', '发证日期', '生效日期']
- # re format
- format = r'(?:[自至])?\d{4}年\d{1,2}月\d{1,2}日(?:至)?(?:\d{4}年\d{1,2}月\d{1,2}日)?'
- special_format = r'\d{4}-\d{1,2}-\d{1,2}'
- # 判断是否存在日期关键字
- flag = False
- keyword_dict = {}
- for info in image_info:
- word = info['word']
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- for keyword in keywords_list:
- # 该证书存在日期关键字
- if keyword in word:
- flag = True
- charset_list = info['charset']
- for char_dc in charset_list:
- if char_dc['word'] == keyword[-1]:
- right = char_dc['rect']['left'] + char_dc['rect']['width']
- keyword_dict[keyword] = {
- "left": left,
- "top": top,
- "right": right
- }
-
-
- if flag:
- for info in image_info:
- word = info['word']
- if '年' in word or re.search(r'\d', word):
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- if '年' in word:
- find_list = re.findall(pattern=format, string=word)
- else:
- find_list = re.findall(pattern=special_format, string=word)
- logger.info(f'word {word} has find_list{find_list}')
- # if self.check:
- # pdb.set_trace()
- if len(find_list) == 1:
- find_string = find_list[0]
- if '至' in find_string:
- start_prefix = find_string.split('至')[0].replace('自', '')
- end_prefix = find_string.split('至')[-1]
- if '年' in start_prefix:
- response_item['start_datetime'] = start_prefix
- if end_prefix != '':
- response_item['end_datetime'] = end_prefix
- return response_item
- # 不存在{至}的情况下通过位置和已有期限关键字来分配日期
- else:
- for k, k_info in keyword_dict.items():
- k_left = k_info['left']
- k_right = k_info['right']
- k_top = k_info['top']
- # 捕获关键字
- if left == k_left:
- if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
- response_item['end_datetime'] = find_string
- elif k in start_keywords and response_item['start_datetime'] is None:
- response_item['start_datetime'] = find_string
- break
- elif left >= k_right and top >= k_top:
- if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
- response_item['end_datetime'] = find_string
- elif k in start_keywords and response_item['start_datetime'] is None:
- response_item['start_datetime'] = find_string
-
- elif len(find_list) == 2:
- start_prefix = find_list[0].replace('自', '')
- end_prefix = find_list[-1].replace('至', '')
- if response_item['start_datetime'] is None:
- response_item['start_datetime'] = start_prefix
- if response_item['end_datetime'] is None:
- response_item['end_datetime'] = end_prefix
-
- else:
- logger.info(f'wrong word: {word} ...')
- else:
- continue
-
- return response_item
-
- # 找到营业执照中id与date信息
- def find_license_datetime(self, image_info, response_item):
-
- for info in image_info:
- word = info['word']
- # id
- if (word.startswith('证照编号:') and len(word) == 25) or (word.isdigit() and len(word) == 20):
- response_item['id'] = word if word.isdigit() else word[5:]
- elif bool(re.match(self.datetime_re, word)):
- split = word.split('至')
- start_datetime = split[0]
- end_datetime = split[-1]
- response_item['start_datetime'] = start_datetime
- response_item['end_datetime'] = end_datetime
- elif word == '长期':
- response_item['start_datetime'] = response_item['end_datetime'] = '长期'
- return response_item
- # 在image_info中搜寻word中包含key的内容
- def search(self, image_info, key):
-
- for info in image_info:
- word = info['word']
- if key in word:
- return True
- return False
-
- # 在image_info中使用re.search搜寻满足{format}正则的信息
- def re_search(self, image_info, format):
- for info in image_info:
- word = info['word']
- match = re.search(format, word)
- if match:
- return match.group(0)
- return False
- # 在image_info中使用re.match搜寻满足{format}正则的信息
- def re_match(self, image_info, format):
- for info in image_info:
- word = info['word']
- match = re.match(format, word)
- if match:
- return word
- return False
- # 用于识别固定位置是否有公司法人签名
- def signature_recognition(self, image_path: str):
- keywords = ['投标函', '(法定代表人CA电子印章)','(法定代表人CA电子印章或签字)', '(签字)', '法定代表人或其委托代理人:', '法定代表人:']
- key_pos = {}
- image_prefix = image_path.split('/')[0]
- image_name = image_path.split('/')[-1][:-4]
- removed_image_name = image_name + '_roi' + image_path.split('/')[-1][-4:]
- ink_image_name = image_name + '_ink' + image_path.split('/')[-1][-4:]
- removed_image_path = os.path.join(image_prefix, removed_image_name)
- ink_image_path = os.path.join(image_prefix, ink_image_name)
- if not os.path.exists(removed_image_path):
- removed_seal_img = self.remove_red_seal(image_path=image_path)
- cv2.imwrite(removed_image_name, removed_seal_img)
- content = self.get_content(image_path=removed_image_path)
- image_info = content["rawjson"]["ret"]
- for info in image_info:
- word = info['word']
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- right = left + width
- bottom = top + height
- for keyword in keywords:
- if keyword in word:
- key_pos[keyword] = {
- "word": word,
- "left": left,
- "right": right,
- "top": top,
- "bottom": bottom
- }
- break
-
- # 如果不存在"投标函"、"法定代表人"等关键字,则返回False
- if len(key_pos) == 0:
- return False
-
- # 定位到法定代表人所在位置
- if ((key_pos.get('法定代表人:') is not None) or (key_pos.get('法定代表人或其委托代理人:') is not None)) and \
- ((key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None)):
- if key_pos.get('法定代表人或其委托代理人:') is not None:
- l_info = key_pos['法定代表人或其委托代理人:']
- l_cnt = 13
- l_string = '法定代表人或其委托代理人:'
- else:
- l_info = key_pos['法定代表人:']
- l_cnt = 6
- l_string = '法定代表人:'
- if key_pos.get('(法定代表人CA电子印章)') is not None:
- r_info = key_pos['(法定代表人CA电子印章)']
- r_string = '(法定代表人CA电子印章)'
- elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
- r_info = key_pos['(法定代表人CA电子印章或签字)']
- r_string = '(法定代表人CA电子印章或签字)'
- else:
- r_info = key_pos['(签字)']
- r_string = '(签字)'
-
- # 此时签名应在两者之间
- l = l_info['right']
- l_word = l_info['word']
- r = r_info['left']
- r_word = r_info['word']
- t = max(l_info['top'], r_info['top'])
- b = min(l_info['bottom'], r_info['bottom']) - 5
- if l_word[-l_cnt:] != l_string or r_word != r_string:
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=removed_seal_img,
- out_path=ink_image_path,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- elif (key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None):
- # 此时签名应已包含
- if key_pos.get('(法定代表人CA电子印章)') is not None:
- key = key_pos['(法定代表人CA电子印章)']
- elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
- key = key_pos['(法定代表人CA电子印章或签字)']
- elif key_pos.get('(签字)') is not None:
- key = key_pos['(签字)']
- key_word = key['word']
- key_word = key_word.replace('(法定代表人CA电子印章)','').replace('(法定代表人CA电子印章或签字)', '').replace('(签字)','').replace('法定代表人或其委托代理人:', '').replace('法定代表人:', '')
- if key_word != '':
- return True
- return False
- elif key_pos.get('法定代表人:') is not None:
- # 此时签名在右边或已包含
- word = key_pos['法定代表人:']['word']
- l = key_pos['法定代表人:']['left']
- r = l + 100
- t = key_pos['法定代表人:']['top']
- b = key_pos['法定代表人:']['bottom'] - 5
- if word[-6:] != '法定代表人:':
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=removed_seal_img,
- out_path=ink_image_path,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- elif key_pos.get('法定代表人或其委托代理人:') is not None:
- # 此时签名在右边或已包含
- word = key_pos['法定代表人或其委托代理人:']['word']
- l = key_pos['法定代表人或其委托代理人:']['left']
- r = l + 100
- t = key_pos['法定代表人或其委托代理人:']['top']
- b = key_pos['法定代表人或其委托代理人:']['bottom'] - 5
- if word[-13:] != '法定代表人或其委托代理人:':
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=removed_seal_img,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- else:
- return False
- # 用于判断固定位置的长方形框内是否存在签名字迹
- def ink_recognition(self, input_img, out_path, meta: dict):
- left = meta["left"]
- right = meta["right"]
- top = meta["top"]
- bottom = meta["bottom"]
- crop_img = input_img[top:bottom, left:right, :]
- cv2.imwrite(out_path, crop_img)
-
- gray_img = cv2.cvtColor(crop_img, cv2.COLOR_BGR2GRAY)
- thresh, ret = cv2.threshold(gray_img, 0, 255, cv2.THRESH_OTSU)
- filter_condition = int(thresh * 0.90)
- _, black_thresh = cv2.threshold(gray_img, filter_condition, 255, cv2.THRESH_BINARY_INV)
-
- total_pixels = black_thresh.size
- black_pixels = np.count_nonzero(black_thresh)
- black_ratio = black_pixels / total_pixels
- return black_ratio
- # seal ocr外部接口,用于提取页面中的印章信息
- # 集成签名判断函数
- class seal_agent():
- def __init__(self,
- base_url: str,
- access_token: str,
- headers: dict,
- ):
- self.base_url = base_url
- self.access_token = access_token
- self.headers = headers
- self.request_url = base_url + access_token
- def seal_recognition(self, img_path):
- f = open(img_path, 'rb')
- img = base64.b64encode(f.read())
- params = {"image":img}
- response = requests.post(self.request_url, data=params, headers=self.headers)
- if response:
- data = response.json()
- else:
- data = {}
- return data
- class PdfMatcher(PdfExtractAttr):
- '''pdf匹配'''
- def __init__(self, file_path: str):
- super(PdfMatcher, self).__init__(
- file_path=file_path
- )
- # 投标书名称
- self.bid_name = file_path.split('/')[-1][:-4]
- # 投标书数据文件夹
- self.bid_dir = os.path.join(os.path.dirname(file_path), self.bid_name)
- # 公司名称
- self.firm_name = file_path.split('/')[-2]
- # title list
- title_path = os.path.join(self.bid_dir, "title.json")
- self.title = load_json(title_path)
- # outline list
- outline_path = os.path.join(self.bid_dir, "outlines.json")
- self.outline = self.parse_outline(out_path=outline_path)
- # text list
- text_path = os.path.join(self.bid_dir, "all_texts.json")
- self.details = self.parse_text(out_path=text_path)
- # table list
- table_path = os.path.join(self.bid_dir, "all_tables.json")
- if os.path.exists(table_path):
- self.table = load_json(table_path)
- else:
- self.tables = self.parse_table(out_path=table_path)
- # image list
- self.image_dir = os.path.join(self.bid_dir, "extracted_images")
- # image format
- self.image_format = "image_page_{}*"
- # image filter threshold
- self.start_threshold = 10
- self.distance_threshold = 6
- self.search_threshold = 20
- self.match_threshold = 44.0
- self.degrade_threshold = 42.0
- def search_interval(self):
- '''定位营业执照、资质证书的区间范围'''
- # 通过关键字模糊定位
- keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','其他资料','附件', '影印件']
- search_interval = []
- # locate in title.json
- left_pos = -1 # 左指针
- right_pos = -1 # 右指针
- for title_block in self.title:
- block_text = title_block['text'].replace(' ', '').strip()
-
- # 先进行左区间判定
- if left_pos != -1 and '证书' not in block_text:
- right_pos = title_block['page_number']
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- for keyword in keywords:
- if keyword in block_text:
- # 先进行模糊的outline定位
- center_page = None
- if '.' in block_text:
- center_page = block_text.split('.')[-1]
- if center_page.isdigit():
- center_page = eval(center_page)
- left_pos = min(title_block['page_number'], center_page)
- else:
- left_pos = title_block['page_number']
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- right_pos = -1
-
- # locate in outlines.json
- if len(self.outline) > 0:
- for outline_block in self.outline:
- if left_pos != -1:
- right_pos = outline_block["page_number"]
- right_pos = right_pos if right_pos is not None else -1
- search_interval.append((left_pos, right_pos))
- left_pos = -1
- outline_text = outline_block['title'].strip()
- for keyword in keywords:
- if keyword in outline_text:
- if outline_block["page_number"] is not None:
- left_pos = outline_block["page_number"]
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- # 搜寻区间合并
- search_interval.sort()
- merge_interval = []
- if len(search_interval) > 0:
- left = -1
- right = -1
- for interval in search_interval:
- l, r = interval
- if r < l:
- continue
- if left == -1 and right == -1:
- left = l
- right = r
- elif l <= right:
- right = r
- else:
- merge_interval.append((left, right))
- left = l
- right = r
- merge_interval.append((left, right))
- return merge_interval
- def find_candidate_images(self):
- candidate_images = set()
- merge_intervals = self.search_interval()
-
- for interval in merge_intervals:
- start_page, end_page = interval
- if start_page <= self.start_threshold:
- continue
-
- if end_page == -1:
- end_page = start_page + 20
- candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images)
-
- candidate_images = list(candidate_images)
- return candidate_images
- # 定位营业执照图像
- def locate_business_license(self):
- '''locate business license and return image'''
- keywords = ["资格审查资料", "其它资格审查材料", "资格审查材料"]
- candidate_pages = []
- center_pages = []
- candidate_images = set()
- # locate in title.json
- for title_block in self.title:
- block_text = title_block['text'].replace(' ', '').strip()
- for keyword in keywords:
- if keyword in block_text:
- # 先进行模糊的outline定位
- center_page = None
- if '.' in block_text:
- center_page = block_text.split('.')[-1]
- if center_page.isdigit():
- center_page = eval(center_page)
- center_pages.append(center_page)
- candidate_pages.append(title_block['page_number'])
- # locate in outlines.json
- if len(self.outline) > 0:
- for outline_block in self.outline:
- outline_text = outline_block['title'].strip()
- for keyword in keywords:
- if keyword in outline_text:
- center_pages.append(outline_block["page_number"])
- # information match
- filter_pages = set()
- if len(center_pages) == 0 and len(candidate_pages) == 0:
- return None
- elif len(center_pages) == 0:
- filter_pages.update(candidate_pages)
- elif len(candidate_pages) == 0:
- filter_pages.update(center_pages)
- else:
- # center_pages作为锚点,全部加入
- filter_pages.update(center_pages)
- # candidate_page与center_page进行匹配加入
- for candidate_page in candidate_pages:
- if candidate_page <= self.start_threshold:
- continue
- for center_page in center_pages:
- distance = abs(candidate_page - center_page)
- if distance <= self.distance_threshold:
- filter_pages.add(min(candidate_page, center_page) + distance // 2)
- # 得到筛选后的图片集存储于self.candidate_images
- for filter_page in filter_pages:
- # candidate_images = self.image_regularization(candidate_images=candidate_images, start_page=max(filter_page-self.search_threshold, 0), end_page=filter_page+self.search_threshold)
- candidate_images = self.image_regularization(start_page=max(filter_page-self.search_threshold, 0), end_page=filter_page+self.search_threshold, candidate_images=candidate_images)
- # 获取最终图像的地址
- candidate_images = list(candidate_images)
- target_list = self.exact_match(candidate_images=candidate_images)
-
- # return target_path list
- return target_list
- # 定位资质证书
- def locate_qualtification_certificate(self):
- '''返回资质证书的图像列表'''
- # 通过关键字模糊定位
- keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','影印件']
- search_interval = []
- candidate_images = set()
- # locate in title.json
- left_pos = -1 # 左指针
- right_pos = -1 # 右指针
- for title_block in self.title:
- block_text = title_block['text'].replace(' ', '').strip()
-
- # 先进行左区间判定
- if left_pos != -1 and '证书' not in block_text:
- right_pos = title_block['page_number']
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- for keyword in keywords:
- if keyword in block_text:
- # 先进行模糊的outline定位
- center_page = None
- if '.' in block_text:
- center_page = block_text.split('.')[-1]
- if center_page.isdigit():
- center_page = eval(center_page)
- left_pos = min(title_block['page_number'], center_page)
- else:
- left_pos = title_block['page_number']
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- right_pos = -1
-
- # locate in outlines.json
- if len(self.outline) > 0:
- for outline_block in self.outline:
- if left_pos != -1:
- right_pos = outline_block["page_number"]
- right_pos = right_pos if right_pos is not None else -1
- search_interval.append((left_pos, right_pos))
- left_pos = -1
- outline_text = outline_block['title'].strip()
- for keyword in keywords:
- if keyword in outline_text:
- if outline_block["page_number"] is not None:
- left_pos = outline_block["page_number"]
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- # 搜寻区间合并
- search_interval.sort()
- merge_interval = []
- if len(search_interval) > 0:
- left = -1
- right = -1
- for interval in search_interval:
- l, r = interval
- if r < l:
- continue
- if left == -1 and right == -1:
- left = l
- right = r
- elif l <= right:
- right = r
- else:
- merge_interval.append((left, right))
- left = l
- right = r
- merge_interval.append((left, right))
- for interval in merge_interval:
- start_page, end_page = interval
- if end_page == -1:
- end_page = start_page + 20
- if start_page <= self.start_threshold:
- continue
-
- candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images)
-
- candidate_images = list(candidate_images)
- target_list = self.search_qualtification_certificate(candidate_images=candidate_images)
- return target_list
- # 查询符合格式的图像
- def image_regularization(self, start_page: int, end_page:int, candidate_images: set):
- for index in range(start_page, end_page + 1):
- current_format = self.image_format.format(index)
- files = glob.glob(os.path.join(self.image_dir, current_format))
- # cut_files = list(map(lambda x: x.split('/')[-1], files))
- # filter_files = [file for file in cut_files if not file.endswith('.unk')]
- filter_files = [file for file in files if not file.endswith('.unk')]
- candidate_images.update(filter_files)
- return candidate_images
- def exact_match(self, candidate_images: list):
- '''精确匹配营业执照位置'''
- if len(candidate_images) == 0:
- return None
- target_list = []
- sim_list = []
- for image_path in candidate_images:
- score = self.get_similarity(image_path=image_path, tamplate=self.bl_tamplate)
- sim_list.append(score.cpu().numpy())
-
- # top-k > match_threshold
- sim_list = np.array(sim_list).reshape(len(sim_list))
- for i, cos_sim in enumerate(sim_list):
- if cos_sim > self.match_threshold:
- target_list.append(candidate_images[i])
- # 未找寻到符合当前阈值要求的图像,降低阈值
- if len(target_list) == 0:
- for i, cos_sim in enumerate(sim_list):
- if cos_sim > self.degrade_threshold:
- target_list.append(candidate_images[i])
- return target_list
- def search_qualtification_certificate(self, candidate_images: list):
- '''从candidate images中搜寻是否有符合资质证书的图像'''
- if len(candidate_images) == 0:
- return None
- target_list = []
- sim_list = []
- for image_path in candidate_images:
- score = self.get_similarity(image_path=image_path, tamplate=self.qc_tamplate)
- sim_list.append(score.cpu().numpy())
- sim_list = np.array(sim_list).reshape(len(sim_list))
- for i, cos_sim in enumerate(sim_list):
- if cos_sim > self.qc_threshold:
- target_list.append(candidate_images[i])
- return target_list
-
- def get_similarity(self, image_path, tamplate):
- image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
- text = clip.tokenize([tamplate]).to(device)
- with torch.no_grad():
- logits_per_image, logits_per_text = model.get_similarity(image, text)
- return logits_per_image
-
- if __name__ == '__main__':
- start_time = time.time()
- url = "http://120.48.103.13:18000/ctr_ocr"
- base_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/seal?access_token="
- access_token = "24.6bbe9987c6bd19ba65e4402917811657.2592000.1724573148.282335-86574608"
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- data_path = "/home/stf/miner_pdf/data/投标公司pdf"
- out_path = "/home/stf/miner_pdf/test.json"
- ground_truth = "/home/stf/miner_pdf/ground_truth.json"
- firm_excel_file = "/home/stf/miner_pdf/data/certificate.xlsx"
- df = pd.read_excel(firm_excel_file)
-
- ocr = OcrAgent(url=url)
- seal_ocr = seal_agent(base_url=base_url, access_token=access_token, headers=headers)
- unscanned_firm_list = df[(df['是否为扫描件'] == '否')]['公司名称'].tolist()
- scanned_firm_list = df[(df['是否为扫描件'] == '是')]['公司名称'].tolist()
- all_firm_list = unscanned_firm_list + scanned_firm_list
- data = {}
- start_time = time.time()
-
- # 以下为提取pdf中标题文本
- for firm_name in all_firm_list:
- firm_path = os.path.join(data_path, firm_name)
- # 在firm_path下创建新文件
- for bid_name in tqdm(os.listdir(firm_path)):
- if bid_name.endswith('.pdf'):
- # 获取bid_dictionary
- bid_dir = os.path.join(firm_path, bid_name[:-4])
- os.makedirs(bid_dir, exist_ok=True)
- # 获取bid_dictionary / image_dir
- image_dir = os.path.join(bid_dir, 'extracted_images')
- os.makedirs(image_dir, exist_ok=True)
- document = os.path.join(firm_path, bid_name)
- logger.info(f'processing document {document} ...')
- # 提取文档标题存入title_path
- '''
- title_path = os.path.join(bid_dir, 'title.json')
- main_parse(pdf_path=document, title_path=title_path, image_dir=image_dir)
- extractor = PdfExtractAttr(file_path=document)
- extractor.parse_outline(out_path=os.path.join(bid_dir, 'outlines.json'))
- extractor.parse_text(out_path=os.path.join(bid_dir, 'all_texts.json'))
- extractor.parse_table(out_path=os.path.join(bid_dir, 'all_tables.json'))
- '''
- # 以下为提取scanned firm的pdf中营业执照、资质证书等信息
- # 已经失效
- '''
- for firm_name in scanned_firm_list:
- data[firm_name] = {'license_list':[]}
- firm_path = os.path.join(data_path, firm_name)
- # 在firm_path下创建新文件
- for bid_name in tqdm(os.listdir(firm_path)):
- if bid_name.endswith('.pdf'):
- # 获取img_dictionary
- image_dir = os.path.join(firm_path, 'scanned')
- document = os.path.join(firm_path, bid_name)
- logger.info(f'processing document {document} ...')
-
- # 统计该pdf文件共转换为多少张扫描图像
- total_img = len(os.listdir(image_dir))
- logger.info(f'当前文档共扫描出{total_img}张图像 ...')
- for img in os.listdir(image_dir):
- img_path = os.path.join(image_dir, img)
- try:
- response = ocr.judge(image_path=img_path, firm_name=firm_name)
- if response == None or response['qualtified'] == None:
- # logger.info(json.dumps(response, indent=4, ensure_ascii=False))
- continue
- else:
- data[firm_name]["license_list"].append({
- "license_name": response["license_name"],
- "license_path": img,
- # "license_page": response["page_number"],
- "start_datetime": response["start_datetime"],
- "end_datetime": response["end_datetime"]
- })
- except ValueError as e:
- logger.info(e)
-
- with open(out_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
- '''
- # 以下原本为提取非扫描件公司的营业执照、资质证书信息
- # 现面向所有公司
- # for firm_name in unscanned_firm_list:
- for firm_name in all_firm_list:
- firm_path = os.path.join(data_path, firm_name)
- # 在firm_path下创建新文件
- for bid_name in tqdm(os.listdir(firm_path)):
- if bid_name.endswith('.pdf'):
- # 获取bid_dictionary
- bid_dir = os.path.join(firm_path, bid_name[:-4])
- os.makedirs(bid_dir, exist_ok=True)
- # 获取bid_dictionary / image_dir
- image_dir = os.path.join(bid_dir, 'extracted_images')
- os.makedirs(image_dir, exist_ok=True)
- document = os.path.join(firm_path, bid_name)
- logger.info(f'processing document {document} ...')
-
- agent = PdfMatcher(document)
- data[firm_name] = {"license_list":[]}
- candidate_images = agent.find_candidate_images()
- if len(candidate_images) == 0:
- logger.info(f'current firm: {firm_name} is unqualtified ...')
- else:
- for img in candidate_images:
- try:
- response = ocr.judge(image_path=img, firm_name=firm_name)
- if response == None or response['qualtified'] == None:
- # logger.info(json.dumps(response, indent=4, ensure_ascii=False))
- continue
- else:
- data[firm_name]["license_list"].append({
- "license_name": response["license_name"],
- "license_path": img,
- "license_page": response["page_number"],
- "start_datetime": response["start_datetime"],
- "end_datetime": response["end_datetime"]
- })
- except ValueError as e:
- logger.info(e)
- '''
- result = agent.locate_business_license()
- media_time = time.time()
- logger.info(f'search cost time: {media_time - start_time}')
- if result is None:
- logger.info(f'current firm {firm_name} detects None ...')
- else:
- for tgt_path in result:
- response = ocr.get_license_info(image_path=tgt_path, firm_name=firm_name)
- #TODO 新旧营业执照的逻辑处理
- if (response['qualtified'] == True) and (response['matched'] == True):
- break
- logger.info(f'detect cost time: {time.time() - media_time}')
-
-
- # 提取图片存入image_dir
- # 提取文档标题存入title_path
- # title_path = os.path.join(bid_dir, 'title.json')
- # main_parse(pdf_path=document, title_path=title_path, image_dir=image_dir)
- # extractor = PdfExtractAttr(file_path=document)
- # extractor.parse_outline(out_path=os.path.join(bid_dir, 'outlines.json'))
- # extractor.parse_text(out_path=os.path.join(bid_dir, 'all_texts.json'))
- # extractor.parse_table(out_path=os.path.join(bid_dir, 'all_tables.json'))
- # df.to_excel(firm_excel_file, index=False)
-
- '''
-
- # 以下将data的数据存入out_path
- with open(out_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
-
- # 以下根据ground-truth进行精度测试
- business_lic_num = 0
- certificate_lic_num = 0
- business_lic_cnt = 0
- certificate_lic_cnt = 0
- true_data = load_json(ground_truth)
- for firm_name in true_data:
- y_data = true_data[firm_name]['license_list']
- pred_data = data[firm_name]['license_list']
- for lic in y_data:
-
- if lic['license_name'] == '营业执照':
- business_lic_num += 1
- for i in range(len(pred_data)):
- if pred_data[i]['license_name'] == '营业执照' and pred_data[i]['license_path'] == lic['license_path']:
- business_lic_cnt += 1
- elif lic['license_name'] == '资质证书':
- certificate_lic_num += 1
- for i in range(len(pred_data)):
- if pred_data[i]['license_path'] == lic['license_path']:
- certificate_lic_cnt += 1
- else:
- logger.info('wrong truth data of {}'.format(lic['license_path']))
- logger.info(f'营业执照识别准确率为:{business_lic_cnt / business_lic_num}')
- logger.info(f'资质证书识别率为:{certificate_lic_cnt / certificate_lic_num}')
-
- delta = time.time() - start_time
- logger.info(f"total unscanned document {len(unscanned_firm_list)} cost time: {delta // 60} min {delta % 60} sec ...")
|