123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- # -*- coding: utf-8 -*-
- # @Author: privacy
- # @Date: 2024-06-11 13:43:14
- # @Last Modified by: privacy
- # @Last Modified time: 2024-12-03 10:50:09
- import re
- import json
- import pandas as pd
- import os
- """
- textmind 结果解析
- [Node]
- node_id: int
- text: str
- node_type: <text|title|contents|head_tail|table|image>
- parent: int
- children: list
- para_type: <text|title_1|title_2|title_3|title_4|title_5|title_6|contents|head_tail|table|image>
- [position]
- pageno: int
- layout_index: int
- box: list
- """
- def json2json(path):
- _lines = open(path, 'r', encoding='utf-8').read()
- json_line = json.loads(_lines)
- return json_line
- def paese_content(layouts: list):
- ''' '''
- if not layouts:
- return pd.NA
- contents = []
- for layout in layouts:
- if layout['sub_type'] != 'table' or layout['sub_type'] != 'image' or layout['sub_type'] != 'seal':
- contents.append(layout['text'])
- return "".join(contents).replace('\n\n', '\n').replace(' ', '')
- def parse_table_name(tables: list, images: list, layouts: list):
- ''' '''
- if not tables:
- return pd.NA
- table_names = []
- for layout in layouts:
- if layout['sub_type'] == 'table_title' or layout['sub_type'] == 'head_tail':
- table_names.append(re.sub("\n| ", "", layout['text']))
- for image in images:
- for content_layouts in image['content_layouts']:
- if content_layouts['sub_type'] == 'table_title' or layout['sub_type'] == 'head_tail':
- table_names.append(re.sub("\n| ", "", content_layouts['text']))
- return ";".join(table_names)
- def parse_title(layouts: list):
- ''' 解析标题 '''
- if not layouts:
- return pd.NA
- for layout in layouts:
- if layout['type'] == 'title':
- return re.sub("\n", "", layout['text'])
- for layout in layouts:
- if layout['text']:
- return re.sub("\n", "", layouts[0]['text']) if 0 < len(layouts[0]['text']) < 15 else pd.NA
- def parse_table(markdown: str):
- table = []
- lines = markdown.split('\n')
- for line in lines:
- line = re.sub(r"\\n| ", "", line)
- table.append(line.strip('|').split('|'))
- return table
- def get_ocr_new(raw: dict, pretty: bool = False):
- '''解析textmind结果'''
- nodes = []
- for node in raw['pages']:
- del node['page_id']
- if not node['text']:
- continue
- nodes.append(node)
- df = pd.DataFrame(nodes)
- if not pretty:
- return df
- content_df = df.loc[:, ['page_num']]
- content_df['text'] = df['layouts'].apply(lambda x: paese_content(x))
- content_df = content_df.rename(columns={'page_num': 'page_number'})
- content_df.dropna(inplace=True)
- content = content_df.to_dict('records')
- title_df = df.loc[:, ['page_num']]
- title_df = title_df.rename(columns={'page_num': 'page_number'})
- title_df['title'] = df['layouts'].apply(lambda x: parse_title(x))
- title_df['parent'] = df['layouts'].apply(lambda x: x[0]['parent'] if x else pd.NA)
- title_df['node_type'] = df['layouts'].apply(lambda x: x[0]['type'] if x else pd.NA)
- title_df['para_type'] = df['layouts'].apply(lambda x: x[0]['sub_type'] if x else pd.NA)
- title_df['text'] = title_df['title']
- title_df.dropna(inplace=True)
- outline = title_df.to_dict('records')
- title_df['seq_num'] = title_df.index
- title = title_df.to_dict('records')
- table_df = df.loc[:, ['page_num']]
- table_df['page_num'] = table_df['page_num'].apply(lambda x: [x])
- table_df = table_df.rename(columns={'page_num': 'page_numbers'})
- table_df['table'] = df['tables'].apply(lambda x: parse_table(x[0]['markdown']) if x else pd.NA)
- table_df['table_name'] = df.apply(lambda x: parse_table_name(x['tables'], x['images'], x['layouts']), axis=1)
- table_df.dropna(inplace=True)
- table = table_df.to_dict('records')
- return {"title": title, "outline": outline, "contents": content, "tables": table, "images": []}
- if __name__ == '__main__':
- with open('D:\\desktop\\三峡水利\\data\\0预审查初审详审测试数据\\textmind_result\\安徽德通智联科技有限公司_textmind.txt', 'r', encoding='utf-8') as fp:
- raw = json.load(fp)
- data = get_ocr_new(raw=raw, pretty=True)
- print(data['tables'])
- # basepath = '/mnt/d/Work_PWS/24y_04_SanXia/data/甲方提供材料/30份数据整理'
- # for save_file in os.listdir(basepath):
- # save_file_path = os.path.join(basepath, save_file)
- # for save_file_name in os.listdir(save_file_path):
- # if '投标文件' == save_file_name:
- # save_file_name_path = os.path.join(save_file_path,save_file_name)
- # textmind_save_dir = os.path.join(save_file_name_path,'textmind')
- # for bidder_name in os.listdir(textmind_save_dir):
- # if bidder_name[-13:] != 'textmind.json': continue
- # textmind_result_path = os.path.join(textmind_save_dir, bidder_name)
- # with open(textmind_result_path, 'r', encoding='utf-8') as fp:
- # raw = json.load(fp)
- # try:
- # raw = get_ocr_new(raw=raw, pretty=True)
- # for k, v in raw.items():
- # if k == 'title':
- # with open(f'{textmind_save_dir}/{bidder_name[:-5]}_title.json', 'w', encoding='utf-8') as fo:
- # json.dump(v, fo, ensure_ascii=False)
- # elif k == 'outline':
- # with open(f'{textmind_save_dir}/{bidder_name[:-5]}_outlines.json', 'w', encoding='utf-8') as fo:
- # json.dump(v, fo, ensure_ascii=False)
- # elif k == 'contents':
- # with open(f'{textmind_save_dir}/{bidder_name[:-5]}_content.json', 'w', encoding='utf-8') as fo:
- # json.dump(v, fo, ensure_ascii=False)
- # elif k == 'tables':
- # with open(f'{textmind_save_dir}/{bidder_name[:-5]}_tables.json', 'w', encoding='utf-8') as fo:
- # json.dump(v, fo, ensure_ascii=False)
- # except Exception:
- # print(textmind_result_path)
- # raise ValueError("stop")
|