extract_financial_report.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. # -*- coding: utf-8 -*-
  2. # @Author: privacy
  3. # @Date: 2024-06-11 13:43:14
  4. # @Last Modified by: privacy
  5. # @Last Modified time: 2024-09-05 15:04:14
  6. import os
  7. import re
  8. import datetime
  9. from tqdm import tqdm
  10. from celery_tasks.ocr import find_current_row
  11. from celery_tasks.commonprocess import pic_ocr
  12. from celery_tasks.instance_locate import get_instances_by_title
  13. def is_price(word: str) -> bool:
  14. pattern = (
  15. r"(?:\b(?:[BS]/\.|R(?:D?\$|p))|\b(?:[TN]T|[CJZ])\$|Дин\.|\b(?:Bs|Ft|Gs"
  16. r"|K[Mč]|Lek|B[Zr]|k[nr]|[PQLSR]|лв|ден|RM|MT|lei|zł|USD|GBP|EUR|JPY"
  17. r"|CHF|SEK|DKK|NOK|SGD|HKD|AUD|TWD|NZD|CNY|KRW|INR|CAD|VEF|EGP|THB|IDR"
  18. r"|PKR|MYR|PHP|MXN|VND|CZK|HUF|PLN|TRY|ZAR|ILS|ARS|CLP|BRL|RUB|QAR|AED"
  19. r"|COP|PEN|CNH|KWD|SAR)|\$[Ub]|"
  20. r"[^\w\s])\s?(?:\d{1,3}(?:,\d{3})*|\d+)(?:\.\d{1,2})?(?!\.?\d)"
  21. )
  22. char_set = set('1234567890,.')
  23. if re.fullmatch(pattern, word):
  24. return True
  25. elif sum([0 if s in char_set else 1 for s in word]) == 0:
  26. return True
  27. else:
  28. return False
  29. def extract_financial_report(title_list: list, table_list: list, image_list: list, year: int) -> list:
  30. """
  31. 财报解析
  32. Args:
  33. path:
  34. title_list: 标题列表
  35. table_list: 表格列表
  36. image_list: 图片列表
  37. year: 年份
  38. Returns:
  39. results
  40. """
  41. instances = get_instances_by_title(
  42. title_list=title_list,
  43. table_list=table_list,
  44. image_list=image_list,
  45. instances=[
  46. '财务状况', '{}年审计报告'.format(year - 1),
  47. '{}年审计报告'.format(year - 2)
  48. ]
  49. )
  50. results = []
  51. for item in instances:
  52. if item['page_number'] >= item['end_page']:
  53. print('Wrong titles extracted at {}'.format(item['title']))
  54. elif item['tables']:
  55. table_name = [t['table_name'] for t in item['tables']]
  56. profits = []
  57. for table in item['tables']:
  58. profit = []
  59. for row in table['table']:
  60. if list(filter(lambda x: re.match(r'.*利润.*', x) is not None, row)):
  61. profit.append(row)
  62. profits.append(profit)
  63. results.append({
  64. 'title': table_name,
  65. 'result': profits,
  66. 'pages': [i['page_numbers'] for i in item['tables']],
  67. 'chapter': item['title']
  68. })
  69. elif item.get('images'):
  70. print('未找到表格 图片识别中')
  71. print(item.get('images'))
  72. pages = [
  73. img['page_number'] for img in item.get('images')
  74. ]
  75. ocr_results = [
  76. pic_ocr.apply_async(kwargs={'image_path': img['image_name']}).get(timeout=30)['rawjson']['ret']
  77. for img in item.get('images')
  78. ]
  79. candidate = []
  80. rows = []
  81. print('结果分析中')
  82. for i, ret in tqdm(enumerate(ocr_results)):
  83. for res in ret:
  84. if re.match(r'.*(净利润).*', res['word']) is not None:
  85. top = res['rect']['top']
  86. bottom = res['rect']['top'] - res['rect']['height']
  87. candidate.append(
  88. {
  89. 'page': pages[i],
  90. 'text': res['word'],
  91. 'top': top,
  92. 'bottom': bottom,
  93. }
  94. )
  95. rows.append(find_current_row(ret, top, bottom))
  96. for it in candidate:
  97. print('定位:\t{}\t定位词:\t{}'.format(it['page'], it['text']))
  98. for i, row in enumerate(rows):
  99. title = []
  100. profits = []
  101. for w in row:
  102. if is_price(w['word']):
  103. profits.append(w['word'])
  104. else:
  105. title.append(w['word'])
  106. if title and profits:
  107. results.append({
  108. 'chapter': item['title'],
  109. 'page': candidate[i]['page'],
  110. 'title': title,
  111. 'result': profits
  112. })
  113. return results
  114. if __name__ == '__main__':
  115. import json
  116. from settings import title_n_path, table_list_path, image_path
  117. with open(title_n_path, 'r', encoding='utf-8') as fp:
  118. title_list = json.load(fp)
  119. with open(table_list_path, 'r', encoding='utf-8') as fp:
  120. table_list = json.load(fp)
  121. with open(image_path, 'r', encoding='utf-8') as fp:
  122. image_list = json.load(fp)
  123. y = datetime.datetime.now().year
  124. print(
  125. extract_financial_report(
  126. title_list=title_list,
  127. table_list=table_list,
  128. image_list=image_list,
  129. year=2022
  130. )
  131. )