123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- # -*- coding: utf-8 -*-
- # @Author: privacy
- # @Date: 2024-06-11 13:43:14
- # @Last Modified by: privacy
- # @Last Modified time: 2024-09-29 16:33:17
- import os
- import re
- from tqdm import tqdm
- from celery_tasks.ocr import find_current_row
- from celery_tasks.commonprocess import pic_ocr
- from celery_tasks.instance_locate import get_instances_by_title
- def is_price(word: str) -> bool:
- pattern = (
- r"(?:\b(?:[BS]/\.|R(?:D?\$|p))|\b(?:[TN]T|[CJZ])\$|Дин\.|\b(?:Bs|Ft|Gs"
- r"|K[Mč]|Lek|B[Zr]|k[nr]|[PQLSR]|лв|ден|RM|MT|lei|zł|USD|GBP|EUR|JPY"
- r"|CHF|SEK|DKK|NOK|SGD|HKD|AUD|TWD|NZD|CNY|KRW|INR|CAD|VEF|EGP|THB|IDR"
- r"|PKR|MYR|PHP|MXN|VND|CZK|HUF|PLN|TRY|ZAR|ILS|ARS|CLP|BRL|RUB|QAR|AED"
- r"|COP|PEN|CNH|KWD|SAR)|\$[Ub]|"
- r"[^\w\s])\s?(?:\d{1,3}(?:,\d{3})*|\d+)(?:\.\d{1,2})?(?!\.?\d)"
- )
- char_set = set('1234567890,.')
- if re.fullmatch(pattern, word):
- return True
- elif sum([0 if s in char_set else 1 for s in word]) == 0:
- return True
- else:
- return False
- def extract_financial_report(title_list: list, table_list: list, image_list: list, year: int) -> list:
- """
- 财报解析
- Args:
- path:
- title_list: 标题列表
- table_list: 表格列表
- image_list: 图片列表
- year: 年份
- Returns:
- results
- """
- instances = get_instances_by_title(
- title_list=title_list,
- table_list=table_list,
- image_list=image_list,
- instances=[
- '财务状况',
- '近年财务状况表',
- '{}年审计报告'.format(year - 1),
- '{}年审计报告'.format(year - 2)
- ]
- )
- results = []
- for item in instances:
- if item['page_number'] >= item['end_page']:
- print('Wrong titles extracted at {}'.format(item['title']))
- elif item['tables']:
- table_name = [t['table_name'] for t in item['tables']]
- profits = []
- for table in item['tables']:
- profit = []
- for row in table['table']:
- if list(filter(lambda x: re.match(r'.*利润.*', x) is not None, row)):
- profit.append(row)
- profits.append(profit)
- results.append({
- 'title': table_name,
- 'result': profits,
- 'pages': [i['page_numbers'] for i in item['tables']],
- 'chapter': item['title']
- })
- elif item.get('images'):
- print('未找到表格 图片识别中')
- print(item.get('images'))
- pages = [
- img['page_number'] for img in item.get('images')
- ]
- ocr_results = [
- pic_ocr.apply_async(kwargs={'image_path': img['image_name']}).get(timeout=30)['rawjson']['ret']
- for img in item.get('images')
- ]
- candidate = []
- rows = []
- print('结果分析中')
- for i, ret in tqdm(enumerate(ocr_results)):
- for res in ret:
- if re.match(r'.*(净利润).*', res['word']) is not None:
- top = res['rect']['top']
- bottom = res['rect']['top'] - res['rect']['height']
- candidate.append(
- {
- 'page': pages[i],
- 'text': res['word'],
- 'top': top,
- 'bottom': bottom,
- }
- )
- rows.append(find_current_row(ret, top, bottom))
- for it in candidate:
- print('定位:\t{}\t定位词:\t{}'.format(it['page'], it['text']))
- for i, row in enumerate(rows):
- title = []
- profits = []
- for w in row:
- if is_price(w['word']):
- profits.append(w['word'])
- else:
- title.append(w['word'])
- if title and profits:
- results.append({
- 'chapter': item['title'],
- 'page': candidate[i]['page'],
- 'title': title,
- 'result': profits
- })
- return results
- if __name__ == '__main__':
- import json
- import datetime
- from settings import title_n_path, table_list_path, image_path
- with open(title_n_path, 'r', encoding='utf-8') as fp:
- title_list = json.load(fp)
- with open(table_list_path, 'r', encoding='utf-8') as fp:
- table_list = json.load(fp)
- with open(image_path, 'r', encoding='utf-8') as fp:
- image_list = json.load(fp)
- y = datetime.datetime.now().year
- print(
- extract_financial_report(
- title_list=title_list,
- table_list=table_list,
- image_list=image_list,
- year=2022
- )
- )
|