import os import re import time from re import match from tqdm import tqdm from scan_dir import scan_dir from instance_locate import get_instances_by_title from ocr_api import OcrAgent, find_current_row import datetime def is_price(word: str) -> bool: pattern = ( r"(?:\b(?:[BS]/\.|R(?:D?\$|p))|\b(?:[TN]T|[CJZ])\$|Дин\.|\b(?:Bs|Ft|Gs|K[Mč]|Lek|B[Zr]|k[nr]|[PQLSR]|лв" r"|ден|RM|MT|lei|zł|USD|GBP|EUR|JPY|CHF|SEK|DKK|NOK|SGD|HKD|AUD|TWD|NZD|CNY|KRW|INR|CAD|VEF|EGP|THB|IDR" r"|PKR|MYR|PHP|MXN|VND|CZK|HUF|PLN|TRY|ZAR|ILS|ARS|CLP|BRL|RUB|QAR|AED|COP|PEN|CNH|KWD|SAR)|\$[Ub]|" r"[^\w\s])\s?(?:\d{1,3}(?:,\d{3})*|\d+)(?:\.\d{1,2})?(?!\.?\d)" ) char_set = set('1234567890,.') if re.fullmatch(pattern, word): return True elif sum([0 if s in char_set else 1 for s in word]) == 0: return True else: return False def extract_financial_report(path: str, year: int = None): instances = get_instances_by_title(path, ['财务状况', '{}年审计报告'.format(year - 1), '{}年审计报告'.format(year - 2)]) results = [] ocr_agent = OcrAgent("http://120.48.103.13:18000/ctr_ocr") for item in instances: if item['tables']: table_name = [t['table_name'] for t in item['tables']] profits = [] for table in item['tables']: profit = [] for row in table['table']: if list(filter(lambda x: match(r'.*利润.*', x) is not None, row)): profit.append(row) profits.append(profit) results.append({ 'title': table_name, 'result': profits, 'pages': [i['page_numbers'] for i in item['tables']], 'chapter': item['title'] }) elif item['page_number'] >= item['end_page']: print('Wrong titles extracted at {}'.format(item['title'])) else: images = list(filter( lambda x: (item['page_number'] <= int(x.split('_')[2]) <= item['end_page']) and (x.endswith('.jpg') or x.endswith('.png')) and os.path.isfile(os.path.join(item['image_loc'], x)), os.listdir(item['image_loc'])) ) # for image in images: # ocr = table_pic_ocr(os.path.join(item['image_loc'], image)) # pass '''paddleOCR abandoned ocr_results = table_pic_ocr_batch([os.path.join(item['image_loc'], image) for image in images]) candidate = [] for i in range(len(images)): page = images[i] for data in ocr_results[i]: if data['type'] in ('header', 'footer', 'table_caption', 'figure_caption', 'title'): for text in data['res']: if '利润' in text['text']: candidate.append(page) break elif data['type'] in ('text', 'figure'): for text in data['res']: if '净利润' in text['text']: candidate.append(page) break elif data['type'] in ('table',): table = pd.read_html(data['res']['html'])[0].values.tolist() for row in table: if '净利润' in ''.join([str(i) for i in row]): candidate.append(page) break else: for text in data['res']: if '净利润' in text['text']: candidate.append(page) break ''' print('未找到表格 图片识别中') ocr_results = [ocr_agent.get_content(os.path.join(item['image_loc'], i))['rawjson']['ret'] for i in tqdm(images)] candidate = [] rows = [] print('结果分析中') for i, ret in tqdm(enumerate(ocr_results)): for res in ret: if re.match(r'.*(净利润).*', res['word']) is not None: top = res['rect']['top'] bottom = res['rect']['top'] - res['rect']['height'] candidate.append( { 'page': images[i], 'text': res['word'], 'top': top, 'bottom': bottom, } ) rows.append(find_current_row(ret, top, bottom)) for it in candidate: print('定位:\t{}\t定位词:\t{}'.format(it['page'], it['text'])) for i, row in enumerate(rows): title = [] profits = [] for w in row: if is_price(w['word']): profits.append(w['word']) else: title.append(w['word']) if title and profits: results.append({ 'chapter': item['title'], 'page': candidate[i]['page'], 'title': title, 'result': profits }) pass pass return results if __name__ == '__main__': # print(extract_financial_report('./投标文件-修改版9-5-1-1.pdf')) os.environ["TRANSFORMERS_OFFLINE"] = '1' y = datetime.datetime.now().year print(extract_financial_report( '/home/zzh/ocr/pdf/美华建设有限公司/投标文件111.pdf', # '/home/zzh/ocr/pdf/南方电网数字电网研究院有限公司/南方电网数字研究院有限公司.pdf', # '/home/zzh/pdf_title_image/投标文件-修改版9-5-1-1.pdf', 2022 )) # start = time.time() # fs = scan_dir('/home/zzh/ocr/pdf/', 'pdf') # # for f in fs: # try: # print(f) # print(extract_financial_report(f, 2022)) # print('\n*********Runtime {} s*********\n'.format(time.time() - start)) # except: # print('Something wrong') # # print('\n\n{}'.format(time.time() - start))