12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931 |
- # 在pdf_miner的基础上进行优化
- # 标准包导入
- import os
- import re
- import json
- import re
- import shutil
- import pandas as pd
- import pdb
- import base64
- from io import BytesIO
- from pprint import pprint
- from paddleocr import PPStructure, draw_structure_result, save_structure_res
- from pypdf import PdfReader
- from pdf2image import convert_from_path
- # 第三方包导入
- import numpy as np
- import pandas as pd
- import cv2
- import torch
- import glob
- import logging
- import requests
- import time
- import datetime
- import subprocess
- from tqdm import tqdm
- from tooklit import RefPageNumberResolver
- from get_info import PdfExtractAttr
- from get_info import is_title, export_image, _save_jpeg, _save_jpeg2000, _save_bmp, main_parse, table_parse, load_json
- from PIL import Image
- from pdfminer.image import ImageWriter
- from tooklit import remove_red_seal, remove_blue_seal
- # tools function
- def create_logger(log_path):
- """
- 将日志输出到日志文件和控制台
- """
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- formatter = logging.Formatter(
- '%(asctime)s - %(levelname)s - %(message)s')
- # 创建一个handler,用于写入日志文件
- file_handler = logging.FileHandler(
- filename=log_path, mode='w')
- file_handler.setFormatter(formatter)
- file_handler.setLevel(logging.INFO)
- logger.addHandler(file_handler)
- # 创建一个handler,用于将日志输出到控制台
- console = logging.StreamHandler()
- console.setLevel(logging.DEBUG)
- console.setFormatter(formatter)
- logger.addHandler(console)
- return logger
- # 页面信息缓存
- class PageBuffer():
-
- def __init__(self):
- self.page_cache = {}
- # 查询某一页的信息属性
- def query(self, page):
- if self.page_cache.get(page, -1) == -1:
- return None
- page_info = self.page_cache[page]
- return page_info
- class SealAgent():
- def __init__(self, url, headers):
- self.url = url
- self.headers = headers
- def get_content(self, image_path):
- f = open(image_path, 'rb')
- img = base64.b64encode(f.read())
- params = {"image":img}
- try:
- response = requests.post(url=self.url, data=params, headers=self.headers)
- return response.json()
- except:
- logger.info(f"当前图像:{image_path}在印章识别ocr接口中网络不稳定 ...")
- def seal_parse(self, image_path):
- meta = {
- "firm_seals": [],
- "indiv_seals": []
- }
- content = self.get_content(image_path=image_path)
- seal_num = content["result_num"]
- seal_result = content["result"]
- if seal_num == 0:
- return meta
- for seal_info in seal_result:
- seal_type = seal_info["type"]
- seal_content = seal_info["major"]["words"].strip().replace(' ', '')
- top = seal_info["location"]["top"]
- left = seal_info["location"]["left"]
- width = seal_info["location"]["width"]
- height = seal_info["location"]["height"]
- if '公司' in seal_content:
- meta['firm_seals'].append(
- {
- "seal_type": seal_type,
- "firm_name": seal_content
- }
- )
- else:
- meta['indiv_seals'].append({
- "seal_type": seal_type,
- "indiv_name": seal_content
- })
- return meta
- # ocr外部接口
- class OcrAgent():
- def __init__(self, url):
- self.url = url
- self.datetime_re = r'\d{4}年\d{1,2}月\d{1,2}日至(?:\d{4}年\d{1,2}月\d{1,2}日|长期)'
- # 不同类型证书资质正则
- self.re_dict = {
- "business_license" : r'营业执照',
- "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$',
- "production_license": r'\b[\u4e00-\u9fff]*许可证\b',
- "qualtifications" : r'\b[\u4e00-\u9fff]*证书',
- "proof": r'\b[\u4e00-\u9fff]*证明',
- }
- # 字迹阈值
- self.sign_threshold = 0.05
- self.font_threshold = 39
- # 集成印章ocr
- def integrate_sealagent(self, url, headers):
- self.sealagent = SealAgent(url=url, headers=headers)
- # 获取图像的ocr信息
- def get_content(self, image_path):
- try:
- with open(image_path, 'rb') as image_file:
- files = {"file": ("image.jpg", image_file, "image/jpeg")}
- response = requests.post(self.url, files=files)
- return response.json()
-
- except:
- raise ValueError(f"传入图像{image_path}已损坏")
- def judge_pro(self, image_path: str, firm_name: str):
- # 以下实现要求image_path的路径如下例所示:
- # ./test/page-0.jpg
- image_name = image_path.split('/')[-1]
- logger.info(f'processing img: {image_name}')
- page_number = image_name.split('-')[-1].split('.')[0]
- response_item = {
- "qualtified": None, # 是否为证书
- "matched": None, # 是否出现匹配的公司名称
- "license_name": None, # 证书名
- "license_page": page_number, # 证书所在页
- "start_datetime": None, # 有效起始时间
- "end_datetime": None # 有效终止时间
- }
- content = self.get_content(image_path=image_path)
- image_info = content["rawjson"]["ret"]
- # 必须包含公司名称信息
- if not self.search(image_info=image_info, key_list=[firm_name]):
- return None
- else:
- response_item['matched'] = True
-
- # 是否匹配营业执照或资质证书
- for key, format in self.re_dict.items():
- if key == 'business_license':
- match_name = self.re_match(image_info=image_info, format=format)
- else:
- match_name = self.re_search(image_info=image_info, format=format)
- if match_name and key == 'business_license':
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_license_datetime(image_info=image_info, response_item=response_item)
- return response_item
- elif match_name:
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item)
- return response_item
- return response_item
- # 判断图像是否为某公司的营业执照或资质证书信息,并返回提取到的信息
- def judge(self, image_path: str, firm_name: str):
- # 以下实现要求image_path的路径如下例所示:
- # ./test/image_page_12_0.jpg
- # 12代表当前图像在pdf中的第12页
- # 0代表当前图像为该页提取的第1张图像
- image_prefix = image_path.split('/')[-1]
- logger.info(f'processing img: {image_prefix}')
- page_number = image_prefix.split('_')[-2]
- response_item = {
- "qualtified": None, # 是否为证书
- "matched": None, # 是否出现匹配的公司名称
- "license_name": None, # 证书名
- "license_page": page_number, # 证书所在页
- "start_datetime": None, # 有效起始时间
- "end_datetime": None # 有效终止时间
- }
- content = self.get_content(image_path=image_path)
- image_info = content["rawjson"]["ret"]
- # 必须包含公司名称信息
- if not self.search(image_info=image_info, key=firm_name):
- return None
- else:
- response_item['matched'] = True
-
- # 是否匹配营业执照或资质证书
- for key, format in self.re_dict.items():
- if key == 'business_license':
- match_name = self.re_match(image_info=image_info, format=format)
- else:
- match_name = self.re_search(image_info=image_info, format=format)
- if match_name and key == 'business_license':
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_license_datetime(image_info=image_info, response_item=response_item)
- return response_item
- elif match_name:
- response_item["qualtified"] = True
- response_item["license_name"] = match_name
- response_item = self.find_certificate_datetime(image_info=image_info, response_item=response_item)
- return response_item
- return response_item
- # 资质证书有效期定位
- def find_certificate_datetime(self, image_info, response_item):
- # keyword
- start_keywords = ['颁发日期', '发证日期', '生效日期']
- end_keywords = ['终止日期']
- priority_keywords = ['有效期', '使用期限', '有效日期']
- keywords_list = ['有效期', '使用期限', '有效日期', '终止日期', '颁发日期', '发证日期', '生效日期']
- # re format
- format = r'(?:[自至])?\d{4}年\d{1,2}月\d{1,2}日(?:至)?(?:\d{4}年\d{1,2}月\d{1,2}日)?'
- special_format = r'\d{4}-\d{1,2}-\d{1,2}'
- # 判断是否存在日期关键字
- flag = False
- keyword_dict = {}
- for info in image_info:
- word = info['word']
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- for keyword in keywords_list:
- # 该证书存在日期关键字
- if keyword in word:
- flag = True
- charset_list = info['charset']
- for char_dc in charset_list:
- if char_dc['word'] == keyword[-1]:
- right = char_dc['rect']['left'] + char_dc['rect']['width']
- keyword_dict[keyword] = {
- "left": left,
- "top": top,
- "right": right
- }
-
-
- if flag:
- for info in image_info:
- word = info['word']
- if '年' in word or re.search(r'\d', word):
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- if '年' in word:
- find_list = re.findall(pattern=format, string=word)
- else:
- find_list = re.findall(pattern=special_format, string=word)
- # logger.info(f'word {word} has find_list{find_list}')
- # if self.check:
- # pdb.set_trace()
- if len(find_list) == 1:
- find_string = find_list[0]
- if '至' in find_string:
- start_prefix = find_string.split('至')[0].replace('自', '')
- end_prefix = find_string.split('至')[-1]
- if '年' in start_prefix:
- response_item['start_datetime'] = start_prefix
- if end_prefix != '':
- response_item['end_datetime'] = end_prefix
- return response_item
- # 不存在{至}的情况下通过位置和已有期限关键字来分配日期
- else:
- for k, k_info in keyword_dict.items():
- k_left = k_info['left']
- k_right = k_info['right']
- k_top = k_info['top']
- # 捕获关键字
- if left == k_left:
- if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
- response_item['end_datetime'] = find_string
- elif k in start_keywords and response_item['start_datetime'] is None:
- response_item['start_datetime'] = find_string
- break
- elif left >= k_right and top >= k_top:
- if (k in priority_keywords) or (k in end_keywords) and response_item['end_datetime'] is None:
- response_item['end_datetime'] = find_string
- elif k in start_keywords and response_item['start_datetime'] is None:
- response_item['start_datetime'] = find_string
-
- elif len(find_list) == 2:
- start_prefix = find_list[0].replace('自', '')
- end_prefix = find_list[-1].replace('至', '')
- if response_item['start_datetime'] is None:
- response_item['start_datetime'] = start_prefix
- if response_item['end_datetime'] is None:
- response_item['end_datetime'] = end_prefix
-
- else:
- logger.info(f'wrong word: {word} ...')
- else:
- continue
-
- return response_item
-
- # 营业执照有效期定位
- def find_license_datetime(self, image_info, response_item):
-
- for info in image_info:
- word = info['word']
- # id
- if (word.startswith('证照编号:') and len(word) == 25) or (word.isdigit() and len(word) == 20):
- response_item['id'] = word if word.isdigit() else word[5:]
- elif bool(re.match(self.datetime_re, word)):
- split = word.split('至')
- start_datetime = split[0]
- end_datetime = split[-1]
- response_item['start_datetime'] = start_datetime
- response_item['end_datetime'] = end_datetime
- elif word == '长期':
- response_item['start_datetime'] = response_item['end_datetime'] = '长期'
- return response_item
- # 在目录中找到正文pos右侧对应的数字标签
- def digit_label(self, image_info, pos: dict):
- gold_left = pos['left']
- gold_right = pos['right']
- gold_top = pos['top']
- gold_bottom = pos['bottom']
- # 判断字符串中是否包含数字
- def contain_digit(word):
- for c in word:
- if c.isdigit():
- return True
- return False
-
- mini_distance = 10000
- mini_word = ""
- for info in image_info:
- word = info['word']
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- right = left + width
- bottom = top + height
- if contain_digit(word=word) and left >= gold_left:
- distance = abs(top - gold_top)
- if distance < mini_distance:
- mini_distance = distance
- mini_word = word
-
- # 提取最终的mini_word
- label_page = None
- if '.' in mini_word:
- label_page = mini_word.split('.')[-1]
- elif mini_word.isdigit():
- label_page = mini_word
-
- return label_page
-
-
- # 在image_info中搜寻word中包含key_list的内容,并打包信息返回
- def pack_search(self, image_info, key_list):
- meta = []
- for info in image_info:
- word = info['word'].strip().replace(' ', '')
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- right = left + width
- bottom = top + height
- for key in key_list:
- if key in word:
- meta.append({
- "word": word,
- "contain_key": key,
- "bbox": {
- "left": left,
- "right": right,
- "top": top,
- "bottom": bottom,
- "width": width,
- "height": height
- }
- })
- return meta
- # 在image_info中搜寻word中包含key_list的内容
- def search(self, image_info, key_list):
-
- for info in image_info:
- word = info['word'].strip().replace(' ', '')
- for key in key_list:
- if key in word:
- return True
- return False
-
- # 精确匹配key_list中的内容
- def exact_search(self, image_info, key_list):
- meta = []
- for info in image_info:
- word = info['word'].strip().replace(' ', '')
- for key in key_list:
- if key == word:
- height = info['rect']['height']
- meta.append({
- "keyword": word,
- "font_size": height
- })
- return meta
- # 在image_info中使用re.search搜寻满足{format}正则的信息
- def re_search(self, image_info, format):
- for info in image_info:
- word = info['word']
- match = re.search(format, word)
- if match:
- return match.group(0)
- return False
- # 在image_info中使用re.match搜寻满足{format}正则的信息
- def re_match(self, image_info, format):
- for info in image_info:
- word = info['word']
- match = re.match(format, word)
- if match:
- return word
- return False
- # 用于识别固定位置是否有公司法人签名或公司盖章
- def signature_recognition(self, image_path: str):
- # 先调用接口判断公司盖章
- meta = self.sealagent.seal_parse(image_path=image_path)
- if len(meta["firm_seals"]) > 0 or len(meta["indiv_seals"]) > 0:
- logger.info("检测到当前页面具备印章 ...")
- return True
- keywords = ['投标函', '(法定代表人CA电子印章)','(法定代表人CA电子印章或签字)', '(签字)', '法定代表人或其委托代理人:', '法定代表人:']
- key_pos = {}
- image_prefix = os.path.dirname(image_path)
- image_name = image_path.split('/')[-1][:-4]
- removed_red_image_name = image_name + '_red_roi' + image_path.split('/')[-1][-4:]
- removed_blue_image_name = image_name + '_blue_roi' + image_path.split('/')[-1][-4:]
- red_ink_image_name = image_name + '_red_ink' + image_path.split('/')[-1][-4:]
- blue_ink_image_name = image_name + '_blue_ink' + image_path.split('/')[-1][-4:]
- removed_red_image_path = os.path.join(image_prefix, removed_red_image_name)
- removed_blue_image_path = os.path.join(image_prefix, removed_blue_image_name)
- red_ink_image_path = os.path.join(image_prefix, red_ink_image_name)
- blue_ink_image_path = os.path.join(image_prefix, blue_ink_image_name)
- if not os.path.exists(removed_red_image_path):
- removed_red_seal_img = remove_red_seal(image_path=image_path)
- cv2.imwrite(removed_red_image_path, removed_red_seal_img)
- else:
- removed_red_seal_img = cv2.imread(removed_red_image_path)
- if not os.path.exists(removed_blue_image_path):
- removed_blue_seal_img = remove_blue_seal(image_path=image_path)
- cv2.imwrite(removed_blue_image_path, removed_blue_seal_img)
- else:
- removed_blue_seal_img = cv2.imread(removed_blue_image_path)
- red_content = self.get_content(image_path=removed_red_image_path)
- red_image_info = red_content["rawjson"]["ret"]
- blue_content = self.get_content(image_path=removed_blue_image_path)
- blue_image_info = blue_content["rawjson"]["ret"]
- def identify(image_info, input_img, out_path):
- for info in image_info:
- word = info['word'].replace(' ', '')
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- right = left + width
- bottom = top + height
- for keyword in keywords:
- if keyword in word:
- key_pos[keyword] = {
- "word": word,
- "left": left,
- "right": right,
- "top": top,
- "bottom": bottom
- }
- break
-
- # 如果不存在"投标函"、"法定代表人"等关键字,则返回False
- if len(key_pos) == 0:
- return False
-
- # 定位到法定代表人所在位置
- # import pdb; pdb.set_trace()
- if ((key_pos.get('法定代表人:') is not None) or (key_pos.get('法定代表人或其委托代理人:') is not None)) and \
- ((key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None)):
- if key_pos.get('法定代表人或其委托代理人:') is not None:
- l_info = key_pos['法定代表人或其委托代理人:']
- l_cnt = 13
- l_string = '法定代表人或其委托代理人:'
- else:
- l_info = key_pos['法定代表人:']
- l_cnt = 6
- l_string = '法定代表人:'
- if key_pos.get('(法定代表人CA电子印章)') is not None:
- r_info = key_pos['(法定代表人CA电子印章)']
- r_string = '(法定代表人CA电子印章)'
- elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
- r_info = key_pos['(法定代表人CA电子印章或签字)']
- r_string = '(法定代表人CA电子印章或签字)'
- else:
- r_info = key_pos['(签字)']
- r_string = '(签字)'
-
- # 此时签名应在两者之间
- l = l_info['right']
- l_word = l_info['word']
- r = r_info['left']
- r_word = r_info['word']
- t = max(l_info['top'], r_info['top'])
- b = min(l_info['bottom'], r_info['bottom']) - 5
- if l_word[-l_cnt:] != l_string or r_word != r_string:
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=input_img,
- out_path=out_path,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- elif (key_pos.get('(法定代表人CA电子印章)') is not None) or (key_pos.get('(法定代表人CA电子印章或签字)') is not None) or (key_pos.get('(签字)') is not None):
- # 此时签名应已包含
- if key_pos.get('(法定代表人CA电子印章)') is not None:
- key = key_pos['(法定代表人CA电子印章)']
- elif key_pos.get('(法定代表人CA电子印章或签字)') is not None:
- key = key_pos['(法定代表人CA电子印章或签字)']
- elif key_pos.get('(签字)') is not None:
- key = key_pos['(签字)']
- key_word = key['word']
- key_word = key_word.replace('(法定代表人CA电子印章)','').replace('(法定代表人CA电子印章或签字)', '').replace('(签字)','').replace('法定代表人或其委托代理人:', '').replace('法定代表人:', '')
- if key_word != '':
- return True
- return False
- elif key_pos.get('法定代表人:') is not None:
- # 此时签名在右边或已包含
- word = key_pos['法定代表人:']['word']
- l = key_pos['法定代表人:']['left']
- r = l + 100
- t = key_pos['法定代表人:']['top']
- b = key_pos['法定代表人:']['bottom'] - 5
- if word[-6:] != '法定代表人:':
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=input_img,
- out_path=out_path,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- elif key_pos.get('法定代表人或其委托代理人:') is not None:
- # 此时签名在右边或已包含
- word = key_pos['法定代表人或其委托代理人:']['word']
- l = key_pos['法定代表人或其委托代理人:']['left']
- r = l + 100
- t = key_pos['法定代表人或其委托代理人:']['top']
- b = key_pos['法定代表人或其委托代理人:']['bottom'] - 5
- if word[-13:] != '法定代表人或其委托代理人:':
- return True
- else:
- black_ratio = self.ink_recognition(
- input_img=input_img,
- out_path=out_path,
- meta={
- "left": l,
- "right": r,
- "top": t,
- "bottom": b
- }
-
- )
- if black_ratio >= self.sign_threshold:
- return True
- return False
- else:
- return False
-
- return identify(red_image_info, removed_red_seal_img, red_ink_image_path) \
- or identify(blue_image_info, removed_blue_seal_img, blue_ink_image_path)
- # 用于判断固定位置的长方形框内是否存在签名字迹
-
- # 用于识别图像固定位置黑色字迹所占比例,并将该位置的图像截取保存
- def ink_recognition(self, input_img, out_path, meta: dict):
- left = meta["left"]
- right = meta["right"]
- top = meta["top"]
- bottom = meta["bottom"]
- crop_img = input_img[top:bottom, left:right, :]
- cv2.rectangle(input_img, (left, top), (right, bottom), (255, 255, 0), 2) # 绿色框,线宽为2
- test_path = out_path[:-4] + '*' + out_path[-4:]
- if crop_img is None or crop_img.size == 0:
- logger.info("Error: crop_img is empty")
- return 0.0
- else:
- cv2.imwrite(out_path, crop_img)
- cv2.imwrite(test_path, input_img)
-
- gray_img = cv2.cvtColor(crop_img, cv2.COLOR_BGR2GRAY)
- thresh, ret = cv2.threshold(gray_img, 0, 255, cv2.THRESH_OTSU)
- filter_condition = int(thresh * 0.90)
- _, black_thresh = cv2.threshold(gray_img, filter_condition, 255, cv2.THRESH_BINARY_INV)
-
- total_pixels = black_thresh.size
- black_pixels = np.count_nonzero(black_thresh)
- black_ratio = black_pixels / total_pixels
- return black_ratio
- # 用于判别字体大小
- def font_judge(self, kw_search_meta):
- if len(kw_search_meta) == 0:
- # 即未搜寻到关键字,非相关页
- return False
- for meta in kw_search_meta:
- keyword = meta["keyword"]
- font_size = meta["font_size"]
- logger.info(f"keyword:{keyword} has font_size: {font_size}")
- if font_size >= self.font_threshold:
- return True
-
-
- # 基于paddlepaddle的table ocr接口
- def table_parse(self, image_path: str, save_folder: str = ''):
- table_engine = PPStructure(show_log=True)
- img = cv2.imread(image_path)
- result = table_engine(img)
- expectation = {
- "table": {
- "title": [],
- "title_confidence": [],
- "content": [],
- "content_confidence": [],
- },
- "figure": {
- "content": [],
- "content_confidence": [],
- "caption": [],
- "caption_confidence": [],
- },
- "page_numbers": [],
- "others": []
- }
- for res in result:
- if res['type'] == 'title' or res['type'] == 'table_caption':
- if len(res['res']) > 0:
- expectation['table']['title_confidence'].append(res['res'][0]['confidence'])
- expectation['table']['title'].append(res['res'][0]['text'])
- elif res['type'] == 'table':
- expectation['table']['content_confidence'].append(res['score'])
- expectation['table']['content'].append(pd.read_html(res['res']['html'])[0].values.tolist())
- elif res['type'] == 'figure':
- expectation['figure']['content_confidence'].append(res['score'])
- expectation['figure']['content'].append(res['res'])
- elif res['type'] == 'figure_caption':
- expectation['figure']['caption_confidence'].append(res['score'])
- expectation['figure']['caption'].append(res['res'])
- else:
- expectation['others'].append(res)
-
- if save_folder:
- # 存储为save_folder/save_name
- save_structure_res(result, save_folder, os.path.basename(image_path).split('.')[0])
-
- return expectation
- # 提供pdf解析,并基于提取文本信息进行位置匹配
- class PdfMatcher(PdfExtractAttr):
- # file_path为提供的pdf文件路径
- def __init__(self, file_path: str):
- super(PdfMatcher, self).__init__(
- file_path=file_path
- )
- # 投标书路径
- self.document = file_path
- # 投标书名称
- self.bid_name = file_path.split('/')[-1][:-4]
- # 投标书数据文件夹
- self.bid_dir = os.path.join(os.path.dirname(file_path), self.bid_name)
- # 公司名称
- self.firm_name = file_path.split('/')[-2]
- # title list
- title_path = os.path.join(self.bid_dir, "title.json")
- # image list
- # self.image_dir = os.path.join(self.bid_dir, "extracted_images")
- # if (not os.path.exists(title_path)) or (not os.path.exists(self.image_dir)):
- # os.makedirs(self.image_dir, exist_ok=True)
- if not os.path.exists(title_path):
- self.main_parse(pdf_path=file_path, title_path=title_path)
- # self.main_parse(pdf_path=file_path, title_path=title_path, image_dir=self.image_dir)
- self.title = load_json(title_path)
- # outline list
- outline_path = os.path.join(self.bid_dir, "outlines.json")
- self.outline = self.parse_outline(out_path=outline_path)
- # text list
- text_path = os.path.join(self.bid_dir, "all_texts.json")
- self.details = self.parse_text(out_path=text_path)
- # table list
- table_path = os.path.join(self.bid_dir, "all_tables.json")
- if os.path.exists(table_path):
- self.table = load_json(table_path)
- else:
- self.table = self.parse_table_pro(table_path=table_path)
- # image format
- # self.image_format = "image_page_{}*"
- # image filter threshold
- self.start_threshold = 10
- self.distance_threshold = 6
- self.search_threshold = 20
- # total pages
- self.total_pages = self.count_pages()
- # 证书正则
- self.license_dict = {
- "business_license" : r'营业执照',
- "deposit": r'^(?:开户许可证|[\u4e00-\u9fff]+存款账户[\u4e00-\u9fff]+)$',
- "production_license": r'\b[\u4e00-\u9fff]*许可证\b',
- "qualtifications" : r'\b[\u4e00-\u9fff]*证书',
- "proof": r'\b[\u4e00-\u9fff]*证明',
- }
- # 在title中找寻包含keyword的信息
- # digit_limit表明是否使用数字限制
- def search_in_title(self, keyword, digit_limit=False):
- meta = []
- digits = "一二三四五六七八九十"
- for title_block in self.title:
- block_text = title_block['text'].replace(' ', '').strip()
- if digit_limit:
- if keyword in block_text:
- # 确保keyword左右不包含digit中的内容
- cnt = block_text.find(keyword)
- length = len(keyword)
- check_left = cnt - 1
- check_right = cnt + length
- if (check_left >= 0 and block_text[check_left] in digits) or (check_right < len(block_text) and block_text[check_right] in digits):
- continue
- else:
- if keyword in block_text:
- meta.append({
- "page_number": title_block["page_number"],
- "text": block_text
- })
- return meta
-
- # 在outline中找寻包含keywords的信息
- def search_in_outline(self, keyword):
- meta = []
- for outline_block in self.outline:
- block_text = outline_block['text'].replace(' ', '').strip()
- if keyword in block_text:
- meta.append({
- "page_number": outline_block["page_number"],
- "text": block_text
- })
- return meta
-
- # 用于定位营业执照、资质证书的页面范围
- def search_license_interval(self, necessity_interval=None):
- '''定位营业执照、资质证书的区间范围'''
- # 通过关键字模糊定位
- keywords = ['资格审查资料','资格审查材料','其它材料','其他材料','其他资料','附件', '影印件']
-
- search_interval = []
- license_pages = []
- # locate in title.json
- left_pos = -1 # 左指针
- right_pos = -1 # 右指针
- for title_block in self.title:
- block_text = title_block['text'].replace(' ', '').strip()
- # TODO 先进行证书正则判断
- '''
- for key, format in self.license_dict.items():
- match = re.search(format, block_text)
- if match:
- license_pages.append(title_block['page_number'])
- '''
-
- # 先进行左区间判定
- if left_pos != -1 and '证书' not in block_text:
- right_pos = title_block['page_number']
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- for keyword in keywords:
- if keyword in block_text:
- # 先进行模糊的outline定位
- center_page = None
- if '.' in block_text:
- center_page = block_text.split('.')[-1]
- if center_page.isdigit():
- center_page = eval(center_page)
- left_pos = min(title_block['page_number'], center_page)
- else:
- left_pos = title_block['page_number']
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- # 重置
- left_pos = -1
- right_pos = -1
-
- # locate in outlines.json
- if len(self.outline) > 0:
- for outline_block in self.outline:
- if left_pos != -1:
- right_pos = outline_block["page_number"]
- right_pos = right_pos if right_pos is not None else -1
- search_interval.append((left_pos, right_pos))
- left_pos = -1
- outline_text = outline_block['title'].strip()
- for keyword in keywords:
- if keyword in outline_text:
- if outline_block["page_number"] is not None:
- left_pos = outline_block["page_number"]
-
- # 最终判定
- if left_pos != -1:
- search_interval.append((left_pos, right_pos))
- if necessity_interval is not None:
- search_interval += necessity_interval
- # 搜寻区间合并
- search_interval.sort()
- logger.info(f"search_interval: {search_interval} ...")
- merge_interval = []
- if len(search_interval) > 0:
- left = -1
- right = -1
- for interval in search_interval:
- l, r = interval
- if r < l:
- continue
- # 初始化
- if left == -1 and right == -1:
- left = l
- right = r
- elif l <= right and r > right:
- right = r
- elif l <= right:
- continue
- else:
- merge_interval.append((left, right))
- left = l
- right = r
- merge_interval.append((left, right))
- return merge_interval
- # 用于定位相关业绩的页面范围
- def search_perf_info(self, ):
- flag = False
- keywords = ['资格审查资料','资格审查材料']
- meta = {
- "perf_page_number": -1,
- "qual_page_number": set(),
- "table": None
- }
- # 先从表格数据中查询是否直接提取到相关业绩表信息
- for table_block in self.table:
- page_number = table_block["page_numbers"]
- table_name = table_block["table_name"]
- table_name = table_name.strip().replace("\n", "").replace(" ", "")
- if ('类似' in table_name) and (('项目' in table_name) or ('业绩' in table_name)):
- flag = True
- meta["perf_page_number"] = page_number
- meta["table"] = table_block["table"]
- break
- if flag:
- return meta
- # 从outlines中模糊匹配
- for outline_block in self.outline:
- page_number = outline_block["page_number"]
- text = outline_block["title"]
- text = text.strip().replace("\n", "").replace(" ", "")
- for keyword in keywords:
- if keyword in text:
- qual_page = page_number
- meta["qual_page_number"].add(qual_page)
- if ('类似' in text) and (('项目' in text) or ('业绩' in text)):
- flag = True
- meta["perf_page_number"] = page_number
- break
- if flag:
- return meta
- # 从title中模糊匹配
- for title_block in self.title:
- page_number = title_block["page_number"]
- text = title_block["text"]
- text = text.strip().replace("\n", "").replace(" ", "")
- for keyword in keywords:
- if keyword in text:
- qual_page = page_number
- meta["qual_page_number"].add(qual_page)
- if ('类似' in text) and (('项目' in text) or ('业绩' in text)):
- flag = True
- meta["perf_page_number"] = page_number
- break
-
-
- return meta
- # 返回可能为营业执照或资质证书的图像集
- def find_candidate_images(self):
- candidate_images = set()
- merge_intervals = self.search_license_interval()
- logger.info(f"merge_intervals: {merge_intervals}")
-
- for interval in merge_intervals:
- start_page, end_page = interval
- if start_page <= self.start_threshold:
- continue
-
- if end_page == -1:
- end_page = start_page + 20
- candidate_images = self.image_regularization(start_page=max(0, start_page-self.search_threshold), end_page=end_page+self.search_threshold, candidate_images=candidate_images)
-
- candidate_images = list(candidate_images)
- return candidate_images
- # 使用正则查询符合格式的图像
- def image_regularization(self, start_page: int, end_page:int, candidate_images: set):
- for index in range(start_page, end_page + 1):
- current_format = self.image_format.format(index)
- files = glob.glob(os.path.join(self.image_dir, current_format))
- filter_files = [file for file in files if not file.endswith('.unk')]
- candidate_images.update(filter_files)
- return candidate_images
- # 返回可能为营业执照或资质证书的pdf2img图像集
- def find_candidate_images_pro(self, necessity_interval=None):
- scanned_dir = self.pdf2img()
- candidate_images = set()
- merge_intervals = self.search_license_interval(necessity_interval=necessity_interval)
- logger.info(f"merge_intervals: {merge_intervals}")
-
- for interval in merge_intervals:
- start_page, end_page = interval
- if start_page <= self.start_threshold:
- continue
-
- if end_page == -1:
- end_page = start_page + 20
- for index in range(start_page, end_page + 1):
- img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
- processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
- if os.path.exists(img_path) and (not os.path.exists(processed_img_path)):
- processed_img = remove_red_seal(image_path=img_path)
- cv2.imwrite(processed_img_path, processed_img)
- candidate_images.add(img_path)
- candidate_images.add(processed_img_path)
-
-
- candidate_images = list(candidate_images)
- return candidate_images
- # 在表格数据中查询是否提取到投标报价表的数据
- def find_bid_quotation_form(self):
- keywords = ["投标报价总表", "投标报价汇总表"]
- key_column = '增值税金额'
- tables = []
- flag = False
- for table_block in self.table:
- page_number = table_block["page_numbers"]
- table_name = table_block["table_name"]
- table_name = table_name.replace(' ', '')
- # 根据关键词找寻table
- for keyword in keywords:
- if keyword in table_name:
- tables = table_block["table"]
- flag = True
- break
- # 再根据关键列名找寻table
- if len(tables) == 0:
- column_num = len(table_block["table"])
- cnt = 0
- while cnt < column_num:
- column_list = table_block["table"][cnt]
- for column_name in column_list:
- if column_name is not None:
- column_name = column_name.replace("\n", "").replace(" ", "").strip()
- if key_column in column_name:
- tables = table_block["table"]
- flag = True
- break
- if '其中' in column_name:
- cnt += 1
- if (not cnt) or flag:
- break
- if flag:
- break
- # 当前表格中存在投标报价表的信息
- if flag:
- parsed_table = self.extract_table(table=tables)
- return page_number, parsed_table
- # 当前表格中不存在投标报价表的信息
- return None
-
- # 在表格数据中查询是否提取到拟投入本项目人员配备情况表 or 项目管理机构组成表的数据
- def find_itempeople_form(self):
- keywords = ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表']
- flag = False # 标记是否通过table_name查询到表格
- meta = {
- "candidate_page": set(),
- "table_list": [],
- }
- for table_block in self.table:
- if len(table_block["table"]) == 0:
- continue
- page_number = table_block["page_numbers"]
- table_name = table_block["table_name"]
- table_name = table_name.strip().replace("\n", "").replace(" ", "")
- for keyword in keywords:
- if keyword in table_name:
- meta["table_list"].append({
- "page_number":page_number,
- "table": table_block["table"]
- })
- flag = True
- break
- if flag:
- return meta
-
- column_name_list = table_block["table"][0]
- for column_name in column_name_list:
- if column_name is not None:
- column_name = column_name.strip().replace("\n", "").replace(" ", "")
- if '职务' in column_name or '职称' in column_name:
- meta["table_list"].append({
- "page_number":page_number,
- "table": table_block["table"]
- })
- break
-
- sec_keywords = ['拟投入本项目人员配备情况表', '项目管理机构', '项目管理机构组成表']
- # 在outlines中定位项目管理机构等位置
- for outline_block in self.outline:
- page_number = outline_block["page_number"]
- text = outline_block["title"]
- text = text.strip().replace("\n", "").replace(" ", "")
- for sec_keyword in sec_keywords:
- if sec_keyword in text:
- if '.' in text:
- page = text.split('.')[-1]
- if page.isdigit():
- page = eval(page)
- else:
- page = page_number
- meta["candidate_page"].add(page)
-
- # 在titles中定位项目管理机构等位置
- for title_block in self.title:
- page_number = title_block["page_number"]
- text = title_block["text"]
- text = text.strip().replace("\n", "").replace(" ", "")
- for sec_keyword in sec_keywords:
- if sec_keyword in text:
- if '.' in text:
- page = text.split('.')[-1]
- if page.isdigit():
- page = eval(page)
- else:
- page = page_number
- meta["candidate_page"].add(page)
-
- return meta
-
- # 用于解析提取到的表格信息
- def extract_table(self, table):
- row_num = len(table)
- if row_num == 0:
- return [], []
- column_num = len(table[0])
- new_table = []
- # first step: 完善列名
- cnt = 0 # 从第一行开始
- column_list = []
- while len(column_list) < column_num and cnt < row_num:
- current_column_list = table[cnt]
- for column_name in current_column_list:
- column_name = str(column_name).strip().replace("\n", "").replace(" ", "")
- if (column_name != None) and ('其中' not in column_name) and (column_name not in column_list):
- column_list.append(column_name)
- if len(column_list) < column_num:
- cnt += 1
- # second step: 填入表格
- new_table.append(column_list)
- for i in range(cnt + 1, row_num):
- tmp = []
- for j in range(column_num):
- element = table[i][j]
- tmp.append(element)
- new_table.append(tmp)
-
- return column_list, new_table
- # 查询pdf总页数
- def count_pages(self):
- reader = PdfReader(self.file_path)
- return len(reader.pages)
- # 用于自动创建pdf->image的scanned文件夹
- def pdf2img(self):
- scanned_dir = os.path.join(self.bid_dir, 'scanned')
- if os.path.exists(scanned_dir):
- logger.info(f"检测到当前投标文件{self.bid_dir}存在扫描文件夹 ...")
- else:
- os.makedirs(scanned_dir, exist_ok=True)
- logger.info(f"开始转换pdf2img页面")
- convert_start_time = time.time()
- try:
- images = convert_from_path(pdf_path=self.document)
- for i, image in enumerate(images):
- image.save(os.path.join(scanned_dir, f'page-{i}.jpg'), 'JPEG')
- logger.info("convert successfully !")
- except subprocess.CalledProcessError as e:
- logger.info(f"convert failure: {e}")
- convert_cost_time = time.time() - convert_start_time
- logger.info(f"转化pdf2img花费{convert_cost_time // 60} min {convert_cost_time % 60} sec ...")
- return scanned_dir
- class PdfParse_pipeline():
- def __init__(self,
- ocr, # ocr接口
- firm_dir, # 存储所有公司的路径
- out_path, # 输出地址
- ):
- self.ocr = ocr
- self.firm_dir = firm_dir
- self.out_path = out_path
- def parse_pipeline(self):
- data = {}
-
- for firm_name in tqdm(os.listdir(self.firm_dir)):
- logger.info(f'processing firm {firm_name} ...')
- firm_path = os.path.join(self.firm_dir, firm_name)
- for bid_name in tqdm(os.listdir(firm_path)):
- if bid_name.endswith('.pdf'):
- document=os.path.join(firm_path, bid_name)
- bid_dir = os.path.join(firm_path, bid_name[:-4])
- os.makedirs(bid_dir, exist_ok=True)
- document_data = self.parse_single_document(pdf_path=document)
- data[firm_name] = document_data
- # 以下将data的数据存入out_path
- with open(self.out_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
-
- return data
- def parse_single_document(self, pdf_path: str):
- agent = PdfMatcher(file_path=pdf_path)
- firm_name = agent.firm_name
- total_pages = agent.total_pages
- data = {
- "necessity_interval": [],
- # 投标函中是否有签字 or 盖章
- "has_signature_or_seal": False,
- "formatting_img": None,
- # 资质证书 & 营业执照信息
- "license_list":[],
- # 投标报价汇总表
- "bid_form": None,
- # 相关业绩表
- "perf_info": [],
- # 项目经理相关信息
- "manager": [],
- "kw_meta": {}
- }
- logger.info("start finding the kw info in directory ...")
- kw_meta = self.find_kw_from_dc(agent=agent, data=data, total_pages=total_pages)
- logger.info("start processing the nextiter information ...")
- # iter = self.parse_nextiter(agent=agent, data=data, total_pages=total_pages)
- # for signature or seal
- logger.info("start judging the signature & seal information ...")
- # self.parse_bid(agent=agent, data=data, total_pages=total_pages)
- # for license_list
- logger.info("start finding license information ...")
- # self.parse_license(agent=agent, data=data, iter=iter, firm_name=firm_name)
- # for bid_form
- logger.info("start finding bid form ...")
- # self.parse_bid_form(agent=agent, data=data)
- # for perf information
- logger.info("start finding perf information ...")
- # self.parse_perf(agent=agent, data=data)
- # for manager
- logger.info("start finding manager information ...")
- self.parse_manager(agent=agent, data=data, kw_meta=kw_meta["manager"])
-
- return data
-
- # 从目录中查询是否存在关键词以及该关键字对应页码
- def find_kw_from_dc(self, agent, data, total_pages):
- meta = {}
- keywords = {
- "manager": ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表']
- }
- # 初始化
- for kw in keywords:
- meta[kw] = []
- scanned_dir = agent.pdf2img()
- # 目录一般位于前20页
- start = 0
- end = 20 if total_pages > 20 else total_pages
- is_enter = False
- for index in range(start, end):
- logger.info(f"find kw from index {index} ...")
- img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
- processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
- # 去除红章
- if not os.path.exists(processed_img_path):
- processed_img = remove_red_seal(image_path=img_path)
- cv2.imwrite(processed_img_path, processed_img)
- # 对处理过红章的页面进行ocr
- content = self.ocr.get_content(image_path=processed_img_path)
- image_info = content["rawjson"]["ret"]
-
- if not is_enter and self.ocr.search(image_info, '目录'):
- # 当前为目录页面首页,标记is_enter
- is_enter = True
-
- # 已经进入目录页面
- if is_enter:
- # 整体搜寻关键字
- for kw, elements in keywords.items():
-
- pack_info = self.ocr.pack_search(image_info=image_info, key_list=elements)
-
- logger.info(pack_info)
- # 找出对应数值标签
- if len(pack_info) > 0:
- for info in pack_info:
- word = info["word"]
- contain_key = info["contain_key"]
- pos = info["bbox"]
- # 如果word中包含了页码
- if word[-1].isdigit():
- label_page = word.split('.')[-1]
- meta[kw].append(
- {
- "element": contain_key,
- "word": word,
- "label_page": label_page
- }
- )
- else:
- meta[kw].append(
- {
- "element": contain_key,
- "word": word,
- "label_page": self.ocr.digit_label(image_info=image_info, pos=pos)
- }
- )
-
- data["kw_meta"] = meta
- return meta
-
- def parse_nextiter(self, agent, data, total_pages):
-
- # 目录一般都会带有关键字:目录
- keyword = '目录'
- # 需要定位下一章的关键字
- iter_keywords = {
- '1': ['资格审查资料', '资格审查材料'],
- '2': ['其他材料', '其它材料', '其他资料', '其它资料'],
- '3': ['附件'],
- '4': ['影印件']
- }
- index_keywords = {
- '1': ['一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、'],
- '2': ['一章', '二章', '三章', '四章', '五章', '六章', '七章', '八章', '九章', '十章']
-
- }
- # 找寻下一层级
- def find_next(current_index):
- logger.info(f"processing current_index: {current_index}")
- cycle = {
- "一": "二",
- "二": "三",
- "三": "四",
- "四": "五",
- "五": "六",
- "六": "七",
- "七": "八",
- "八": "九",
- "九": "十",
- "十": "二",
- }
- if current_index.isdigit():
- next_index = str(eval(current_index) + 1)
- return next_index
- next_index = ""
- # 涉及进位
- if len(current_index) == 1:
- if current_index in cycle.keys():
- if current_index == "十":
- next_index = "十一"
- else:
- next_index = cycle[current_index]
- else:
- raise ValueError(f"筛选current index {current_index} 有误 ...")
-
- return next_index
-
- if current_index[-1] == '九':
- if current_index[0] in cycle.keys():
- next_index = cycle[current_index[0]] + '十'
- else:
- return ""
- elif current_index[-1] == '十':
- next_index = current_index + '一'
-
- else:
- if current_index[-1] in cycle.keys():
- next_index = current_index[:-1] + cycle[current_index[-1]]
- else:
- return ""
- return next_index
- # 用于提取字符串的当前层级,并返回下一层级
- def refine(string: str):
-
- digit_keywords = "123456789一二三四五六七八九十"
- string = string.strip().replace(' ', '').replace('(', '').replace(')', '').replace('(', '').replace(')', '')
- flag = False
- for digit_kw in digit_keywords:
- if digit_kw in string:
- flag = True
-
- if not flag:
- return ""
-
- if '、' in string and '章' in string:
- index_string = string.split('、')[0]
- current_index = ""
- next_index = ""
- is_start = False
- for c in index_string:
- if c == "第":
- is_start = True
- elif (not is_start) and c in digit_keywords:
- is_start = True
- current_index += c
- elif c == "章":
- next_index = find_next(current_index)
- elif is_start and c in digit_keywords:
- current_index += c
- return next_index
-
- if '、' in string:
- index_string = string.split('、')[0]
- next_index = find_next(index_string)
- return next_index
-
- if '章' in string and '第' in string:
- l = string.find('第')
- r = string.find('章')
- index_string = string[l+1:r]
- next_index = find_next(index_string)
- return next_index
-
- return ""
-
- # 传入当前keyword的bounding box,返回其对应的index
- def find_ocr_index(image_info, bbox: dict):
- meta = {}
- candidate_distance = 10000
- candidate_word = ""
- keywords = "123456789一二三四五六七八九十"
- match_left = bbox['left']
- match_right = bbox['right']
- match_top = bbox['top']
- match_bottom = bbox['bottom']
- for info in image_info:
- word = info['word'].replace(' ', '')
- left = info['rect']['left']
- top = info['rect']['top']
- width = info['rect']['width']
- height = info['rect']['height']
- right = left + width
- bottom = top + height
- for keyword in keywords:
- if keyword in word and left < match_left and right < match_right:
- distance = abs(top - match_top)
- if distance < candidate_distance:
- candidate_word = word
- candidate_distance = distance
-
- meta["candidate_word"] = candidate_word
- meta["candidate_distance"] = candidate_distance
- return meta
- iter = []
- scanned_dir = agent.pdf2img()
- # 目录一般位于前20页
- start = 0
- end = 20 if total_pages > 20 else total_pages
- is_enter = False
- for index in range(start, end):
- img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
- processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
- # 去除红章
- if not os.path.exists(processed_img_path):
- processed_img = remove_red_seal(image_path=img_path)
- cv2.imwrite(processed_img_path, processed_img)
- # 对处理过红章的页面进行ocr
- content = self.ocr.get_content(image_path=processed_img_path)
- image_info = content["rawjson"]["ret"]
-
- if not is_enter and self.ocr.search(image_info, keyword):
- # 当前为目录页面首页,标记is_enter
- is_enter = True
-
- # 已经进入目录页面
- if is_enter:
- for id, cover_keywords in iter_keywords.items():
- meta = self.ocr.pack_search(image_info, cover_keywords)
- if len(meta) == 0:
- continue
- for meta_info in meta:
- word = meta_info['word']
- logger.info(f"processing iter word: {word}")
- contain_key = meta_info['contain_key']
- bbox = meta_info['bbox']
- # 查看word所对应序列号
- # check word first
- if '、' in word or ('章' in word and '第' in word):
- next_index = refine(word)
- if next_index != "":
- iter.append({
- "current_key": contain_key,
- "next_index": next_index
- })
- else:
- # check ocr second
- meta = find_ocr_index(image_info, bbox)
- candidate_word = meta["candidate_word"]
- next_index = refine(candidate_word)
- iter.append({
- "current_key": contain_key,
- "next_index": next_index
- })
- data["iter"] = iter
- return iter
- def parse_bid(self, agent, data, total_pages):
- # TODO 由于投标函主要出现在前30页,暂时只搜寻前30页
- start_page = 0
- end_page = 30 if total_pages > 30 else total_pages
- scanned_dir = agent.pdf2img()
- key_list = ['一、投标函及投标函附录', '1投标函及投标函附录', '1、投标函及投标函附录', '投标函及投标函附录', '投标函', '一、投标函', '1.投标函', '1投标函', '一投标函', '(一)投标函', '(一)投标函', '(一)、投标函', '(一)、投标函']
-
- for index in range(start_page, end_page + 1):
- img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
- # 先判断该页内容是否为投标函
- content = self.ocr.get_content(image_path=img_path)
- image_info = content["rawjson"]["ret"]
- kw_search_meta = self.ocr.exact_search(image_info, key_list)
- kw_search_res = self.ocr.font_judge(kw_search_meta)
- ol_search_res = self.ocr.search(image_info, ['目录'])
- if (not kw_search_res) or ol_search_res:
- continue
-
- result = self.ocr.signature_recognition(image_path=img_path)
- if result:
- data["has_signature_or_seal"] = True
- data["formatting_img"] = img_path
- return
- def parse_license(self, agent, iter, data, firm_name):
- # 先找寻contain_key的page,再找寻next_index的page
- necessity_interval = []
- # 遍历得到的每一个上下章
- for unit_iter in iter:
- contain_key = unit_iter["current_key"]
- next_index = unit_iter["next_index"]
- kw_title_meta = agent.search_in_title(contain_key)
- iter_title_meta = agent.search_in_title(next_index, digit_limit=True)
- left = 10000
- right = -1
- left_kw = ""
- right_kw = ""
- # 先确定right page
- if len(iter_title_meta) == 0:
- right = agent.total_pages
- else:
- for iter_meta in iter_title_meta:
- page_number = iter_meta["page_number"]
- iter_text = iter_meta["text"]
- if page_number < 20:
- continue
- else:
- if page_number > right:
- right = page_number
- right_kw = iter_text
-
- if right == -1:
- right = agent.total_pages
- # 再确定left page
- if len(kw_title_meta) == 0:
- continue
- else:
- for kw_meta in kw_title_meta:
- page_number = kw_meta["page_number"]
- title_text = kw_meta["text"]
- if page_number < 20 or page_number > right:
- continue
- else:
- if page_number < left:
- left = page_number
- left_kw = title_text
- if left == 10000:
- continue
- necessity_interval.append((left, right))
- data["necessity_interval"].append(
- {
- "left_kw": left_kw,
- "right_kw": right_kw,
- "left_page": left,
- "right_page": right
- }
- )
-
-
- candidate_images = agent.find_candidate_images_pro(necessity_interval=necessity_interval)
-
- # candidate_images = agent.find_candidate_images()
- logger.info(candidate_images)
- # import pdb; pdb.set_trace()
- if len(candidate_images) == 0:
- scanned_dir = agent.pdf2img()
- for index in range(0, agent.total_pages):
- img_path = os.path.join(scanned_dir, f'page-{index}.jpg')
- processed_img_path = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
- if not os.path.exists(processed_img_path):
- processed_img = remove_red_seal(image_path=img_path)
- cv2.imwrite(processed_img_path, processed_img)
- try:
- response = self.ocr.judge_pro(image_path=processed_img_path, firm_name=firm_name)
- if response == None or response['qualtified'] == None:
- continue
- else:
- data["license_list"].append({
- "license_name": response["license_name"],
- "license_path": img_path,
- "license_page": response["license_page"],
- "start_datetime": response["start_datetime"],
- "end_datetime": response["end_datetime"]
- })
- except ValueError as e:
- print(e)
- else:
- for img in candidate_images:
- try:
- response = self.ocr.judge_pro(image_path=img, firm_name=firm_name)
- if response == None or response['qualtified'] == None:
- continue
- else:
- data["license_list"].append({
- "license_name": response["license_name"],
- "license_path": img,
- "license_page": response["license_page"],
- "start_datetime": response["start_datetime"],
- "end_datetime": response["end_datetime"]
- })
- except ValueError as e:
- print(e)
- def parse_bid_form(self, agent, data):
- result = agent.find_bid_quotation_form()
- if result is None:
- # 先转扫描件
- scanned_dir = agent.pdf2img()
- key_column = '增值税金额'
- img_list = glob.glob(os.path.join(scanned_dir, '*.jpg'))
- for img_prefix in img_list:
- img_name = os.path.basename(img_prefix)
- if ('roi' in img_name) or ('ink' in img_name):
- continue
- img_index = int(img_name.split('-')[1].split('.')[0])
- if img_index > 50:
- continue
- img_path = os.path.join(scanned_dir, img_name)
- #TODO 添加对"投标报价汇总表"字样的ocr辅助
- expectation = self.ocr.table_parse(image_path=img_path, save_folder=scanned_dir)
- content = self.ocr.get_content(image_path=img_path)
- image_info = content["rawjson"]["ret"]
- kw_res = self.ocr.search(image_info=image_info, key_list=['投标报价汇总表'])
- table_list = expectation['table']['content']
- if len(table_list) > 0:
- for table in table_list:
- column_list, parsed_table = agent.extract_table(table=table)
- for column_name in column_list:
- if key_column in column_name:
- data["bid_form"] = {
- "page": [img_index],
- "table": parsed_table
- }
- return
- if kw_res:
- data["bid_form"] = {
- "page": [img_index]
- }
- else:
- page_number, target_table = result
- data["bid_form"] = {
- "page": page_number,
- "table": target_table
- }
-
- def parse_perf(self, agent, data):
- perf_meta = agent.search_perf_info()
- # import pdb; pdb.set_trace()
- if perf_meta["table"] is not None:
- data["perf_info"].append({
- "perf_page": perf_meta["perf_page_number"],
- "perf_table": perf_meta["table"]
- })
- else:
- center_page = 0
- if perf_meta["perf_page_number"] != -1:
- center_page = perf_meta["perf_page_number"]
- if len(perf_meta["qual_page_number"]) > 0:
- tmp = 10000
- for candidate_page in perf_meta["qual_page_number"]:
- if candidate_page > agent.start_threshold:
- tmp = min(tmp, candidate_page)
- center_page = min(center_page, tmp)
- scanned_dir = agent.pdf2img()
- img_list = glob.glob(os.path.join(scanned_dir, 'page-*.jpg'))
- for img_prefix in img_list:
- img_name = os.path.basename(img_prefix)
- if ('roi' in img_name) or ('ink' in img_name):
- continue
- img_index = int(img_name.split('-')[1].split('.')[0])
- if img_index >= center_page:
- img_path = os.path.join(scanned_dir, img_name)
- # 1st step: 移除红色印章
- processed_path = os.path.join(scanned_dir, f'page-{img_index}_red_roi.jpg')
- processed_folder = os.path.join(scanned_dir, 'processed')
- os.makedirs(processed_folder, exist_ok=True)
- if not os.path.exists(processed_path):
- processed_img = remove_red_seal(img_path)
- cv2.imwrite(processed_path, processed_img)
- # 2nd step: 调用ocr搜寻关键字
- content = self.ocr.get_content(image_path=processed_path)
- image_info = content["rawjson"]["ret"]
- if self.ocr.search(image_info, ['类似']):
- # 3rd step: 识别表格
- expectation = self.ocr.table_parse(image_path=processed_path, save_folder=processed_folder)
- table_list = expectation['table']['content']
- data["perf_info"].append({
- "perf_page": img_index + 1,
- "perf_table": table_list
- })
- def parse_manager(self, agent, data, kw_meta=None):
- keywords = ['拟投入本项目人员配备情况表', '项目管理机构组成表', '项目管理机构成员', '项目管理组成表', '职务', '职称']
- meta = agent.find_itempeople_form()
- if len(meta["table_list"]) > 0:
- # 找到类似表格
- data["manager"] = meta["table_list"]
- else:
- candidate_page_set = meta["candidate_page"]
- if len(candidate_page_set) == 0 and (kw_meta is None or len(kw_meta) == 0):
- logger.info("查询候选项目经理为空, 开始进行全文档搜索")
- scanned_dir = agent.pdf2img()
- for index in range(0, agent.total_pages):
- raw_page = os.path.join(scanned_dir, f'page-{index}.jpg')
- processed_page = os.path.join(scanned_dir, f'page-{index}_red_roi.jpg')
- if not os.path.exists(processed_page):
- processed_img = remove_red_seal(image_path=raw_page)
- cv2.imwrite(processed_page, processed_img)
- # 对处理过红章的页面进行ocr
- content = self.ocr.get_content(image_path=processed_page)
- image_info = content["rawjson"]["ret"]
- if self.ocr.search(image_info, keywords):
- expectation = self.ocr.table_parse(image_path=processed_page, save_folder=scanned_dir)
- table_list = expectation['table']['content']
- if len(table_list) > 0:
- for table in table_list:
- column_list, parsed_table = agent.extract_table(table=table)
- for column_name in column_list:
- if '职称' in column_name or '职务' in column_name:
- data["manager"].append(parsed_table)
-
- else:
- spread_set = set()
- # from candidate_page_set
- for candidate_page in candidate_page_set:
- cnt = 0
- while cnt <= 20 and candidate_page + cnt < agent.total_pages:
- spread_set.add(candidate_page + cnt)
- cnt += 1
- # from meta
- if kw_meta is not None and len(kw_meta) > 0:
-
- for unit_meta in kw_meta:
- label_page = unit_meta["label_page"]
- if label_page.isdigit():
- label_page = int(label_page)
- cnt = -5
- while cnt <= 5 and label_page + cnt < agent.total_pages:
- spread_set.add(label_page + cnt)
- cnt += 1
- # 给每一个候选图片20区域范围
- scanned_dir = agent.pdf2img()
- for candidate_img in spread_set:
- candidate_path = os.path.join(scanned_dir, f'page-{candidate_img}.jpg')
- expectation = self.ocr.table_parse(image_path=candidate_path, save_folder=scanned_dir)
- table_list = expectation['table']['content']
- if len(table_list) > 0:
- for table in table_list:
- column_list, parsed_table = agent.extract_table(table=table)
- for column_name in column_list:
- if '职称' in column_name or '职务' in column_name:
- data["manager"].append(parsed_table)
-
- if __name__ == "__main__":
- # [测试demo]
- start_time = time.time()
-
- # 请针对自己的环境进行修改log_path
- global logger
- firm_list = ['太原重工']
- # firm_list = ['湖北海光']
- for firm in firm_list:
- log_path = f"/home/stf/miner_pdf/interface/test_outdir/manager_test/test_{firm}.log"
- logger = create_logger(log_path=log_path)
- # [环境参数]
- # ocr url
- url = "http://120.48.103.13:18000/ctr_ocr"
- # seal_ocr url
- base_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/seal?access_token="
- # seal_ocr access_token
- access_token = "24.6bbe9987c6bd19ba65e4402917811657.2592000.1724573148.282335-86574608"
- # seal request url
- seal_url = base_url + access_token
- # seal_ocr headers
- headers = {'content-type': 'application/x-www-form-urlencoded'}
- # data_path为存储所有投标公司的起始路径
- data_path = "/home/stf/miner_pdf/data/投标公司pdf"
- # test_data_path为存储测试投标公司的起始路径
- test_data_path = "/home/stf/miner_pdf/interface/test_files"
- # test_out_path存储目前优化代码的测试结果!!!
- test_out_path = "/home/stf/miner_pdf/interface/outdir/test_out.json"
- unit_data_path = f"/home/stf/miner_pdf/interface/unit_test/{firm}"
- # unit_out_path = f"/home/stf/miner_pdf/interface/outdir/unit_{firm}.json"
- unit_out_path = f"/home/stf/miner_pdf/interface/test_outdir/manager_test/unit_{firm}.json"
- # pipeline_out_path为执行所有公司pipeline逻辑后的输出位置
- # 其为存放营业执照和资质证书位置信息的json文件
- pipeline_out_path = "/home/stf/miner_pdf/interface/outdir/test_pipeline.json"
- # single_out_path为执行单个公司pdf解析逻辑后的输出位置
- # 其为存放营业执照和资质证书位置信息的json文件
- single_out_path = "/home/stf/miner_pdf/interface/outdir/test_single.json"
- # ground_truth目前为存储所有非扫描公司在pdf中营业执照与资质证书的json文件
- ground_truth = "/home/stf/miner_pdf/ground_truth.json"
- # 用于区分该公司提供的pdf文件为(扫描件 or 非扫描件)
- firm_excel_file = "/home/stf/miner_pdf/data/certificate.xlsx"
- df = pd.read_excel(firm_excel_file)
- # 封装好的ocr接口
- ocr = OcrAgent(url=url)
- ocr.integrate_sealagent(
- url=seal_url,
- headers=headers
- )
- # 封装好的pipeline
- pipeline = PdfParse_pipeline(
- ocr=ocr,
- firm_dir=unit_data_path,
- out_path=unit_out_path,
- )
- # start
-
- data = pipeline.parse_pipeline()
-
- # caculate time cost
- cost_time = time.time() - start_time
- logger.info(f"processing {len(data)} documents, total cost {cost_time // 60} min {cost_time % 60} sec ...")
|