123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- # -*- coding: utf-8 -*-
- # @Author: privacy
- # @Date: 2024-06-11 13:43:14
- # @Last Modified by: privacy
- # @Last Modified time: 2024-09-27 14:08:30
- import re
- import json
- import pandas as pd
- import os
- """
- textmind 结果解析
- [Node]
- node_id: int
- text: str
- node_type: <text|title|contents|head_tail|table|image>
- parent: int
- children: list
- para_type: <text|title_1|title_2|title_3|title_4|title_5|title_6|contents|head_tail|table|image>
- [position]
- pageno: int
- layout_index: int
- box: list
- """
- def json2json(path):
- _lines = open(path, 'r', encoding='utf-8').read()
- json_line = json.loads(_lines)
- return json_line
- def paese_content(layouts:list):
- ''' '''
- if not layouts:
- return pd.NA
- contents = []
- for layout in layouts:
- if layout['type'] != 'table' or layout['type'] != 'image' or layout['type'] != 'seal' or layout['type'] != 'head_tail':
- if not layout['text']: continue
- contents.append(layout['text'])
- return "\n".join(contents).replace('\n\n','\n').replace(' ','')
- def parse_table_name(tables:list, layouts:list):
- ''' '''
- if not tables:
- return pd.NA
-
- node_dict = {}
- for layout in layouts:
- if not layout['children']: continue
- node_dict[layout['text']] = layout['children'] # text对应children 一一对应
-
- table_ids = []
- for table in tables:
- table_ids.append({'layout_id':table['layout_id']})
-
- table_names = []
- for table_id in table_ids:
- layout_id = table_id['layout_id']
- for text, children in node_dict.items():
- if layout_id in children:
- table_names.append(text)
- if not table_names:
- layout_ids = []
- for layout in layouts:
- layout_ids.append({layout['layout_id']:layout['text']})
-
- table_layout_ids = []
- for table in tables:
- table_layout_ids.append({'layout_id':table['layout_id']})
-
- index_ = 0
- for table_layout_id in table_layout_ids:
- for layout_id in layout_ids:
- if table_layout_id['layout_id'] in layout_id:
- index_ = layout_ids.index(layout_id)
- break
-
- for ids in layout_ids[:index_]:
- for value in ids.values():
- if '表' in value: table_names.append(value)
-
- if not table_names and index_ > 0:
- table_names.append(list(layout_ids[index_-1].values())[0])
- return ";".join(table_names)
-
- def parse_title(layouts:list):
- ''' 解析标题 '''
- if not layouts: return pd.NA
- for layout in layouts:
- if (layout['type'] == 'title' or 'title' in layout['sub_type']) and layout['text'] and layout['type'] != 'head_tail':
- text = re.sub("\n","",layout['text'])
- if not text: continue
- return text
- for layout in layouts:
- if not (layout['type'] == 'text' and layout['text']): continue
- text = re.sub("\n","",layout['text'])
- if text and len(text) < 30:
- return re.sub("\n","",text)
- return pd.NA
- def parse_table(markdown:str):
- table = []
- lines = markdown.split('\n')
- for line in lines:
- line = re.sub(r"\\n| ","",line)
- table.append(line.strip('|').split('|'))
- return table
- def get_ocr_new(raw:dict, pretty: bool = False):
- '''解析textmind结果'''
- nodes = []
- for node in raw['pages']:
- del node['page_id']
- if not node['text']: continue
- nodes.append(node)
- df = pd.DataFrame(nodes)
- if not pretty:
- return df
- content_df = df.loc[:,['page_num']]
- content_df['text'] = df['layouts'].apply(lambda x: paese_content(x))
- content_df = content_df.rename(columns={'page_num':'page_number'})
- content_df.dropna(inplace=True)
- content = content_df.to_dict('records')
- title_df = df.loc[:,['page_num']]
- title_df = title_df.rename(columns={'page_num':'page_number'})
- title_df['title'] = df['layouts'].apply(lambda x: parse_title(x))
- title_df['box'] = df['layouts'].apply(lambda x: x[0]['position'] if x else pd.NA)
- # title_df['box'] = df[df['layouts'].apply(lambda x: x[0]['position'] if x else False)]
- title_df['node_type'] = df['layouts'].apply(lambda x: x[0]['type'] if x else pd.NA)
- title_df['para_type'] = df['layouts'].apply(lambda x: x[0]['sub_type'] if x else pd.NA)
- title_df['text'] = title_df['title']
- title_df.dropna(inplace=True)
- outline = title_df.to_dict('records')
- # print(outline[:2])
-
- title_df['seq_num'] = title_df.index
- title = title_df.to_dict('records')
- # print(title[:2])
- table_df = df.loc[:,['page_num']]
- table_df['page_num'] = table_df['page_num'].apply(lambda x: [x])
- table_df = table_df.rename(columns={'page_num':'page_numbers'})
- table_df['table'] = df['tables'].apply(lambda x: parse_table(x[0]['markdown']) if x else pd.NA)
- table_df['table_name'] = df.apply(lambda x: parse_table_name(x['tables'], x['layouts']), axis=1)
- table_df.dropna(inplace=True)
- table = table_df.to_dict('records')
- # print(table[:2])
- return {"title": title, "outline": outline, "contents": content, "tables": table, "images": []}
-
- def run():
- basepath = '/mnt/d/Work_PWS/24y_04_SanXia/data/甲方提供材料/30份数据整理'
- for save_file in os.listdir(basepath):
- save_file_path = os.path.join(basepath, save_file)
- for save_file_name in os.listdir(save_file_path):
- if '投标文件' == save_file_name:
- save_file_name_path = os.path.join(save_file_path,save_file_name)
- textmind_save_dir = os.path.join(save_file_name_path,'textmind')
- if not os.path.exists(textmind_save_dir): continue
- for bidder_name in os.listdir(textmind_save_dir):
- if 'textmind.json' not in bidder_name[-13:]: continue
- textmind_result_path = os.path.join(textmind_save_dir, bidder_name)
- print("textmind_result_path ",textmind_result_path)
- with open(textmind_result_path, 'r', encoding='utf-8') as fp:
- raw = json.load(fp)
- try:
- raw = get_ocr_new(raw=raw, pretty=True)
- for k, v in raw.items():
- if k == 'title':
- with open(f'{textmind_save_dir}/{bidder_name[:-5]}_title.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- elif k == 'outline':
- with open(f'{textmind_save_dir}/{bidder_name[:-5]}_outlines.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- elif k == 'contents':
- with open(f'{textmind_save_dir}/{bidder_name[:-5]}_content.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- elif k == 'tables':
- with open(f'{textmind_save_dir}/{bidder_name[:-5]}_tables.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- except:
- print(textmind_result_path)
- raise ValueError("stop")
- def parse_datasets():
- base_dir = '/mnt/d/Work_PWS/24y_04_SanXia/data/甲方提供材料/20241122-4'
- # pre_parse_datasets = []
- for base_folders in os.listdir(base_dir):
- base_folder = os.path.join(base_dir, base_folders)
- folder_info = {}
- for folders in os.listdir(base_folder):
- folder = os.path.join(base_folder, folders)
- if folders == "招标文件":
- for file in os.listdir(folder):
- if file.endswith(".pdf"):
- projectName = file.split(".")[0] # 去掉后缀之后的文件名
- tender_file = os.path.join(folder, file)
- # folder_info["projectName"] = projectName
- # folder_info["buyFile"] = tender_file
-
- elif folders == '投标文件':
- # folder_info["bidder_info"] = []
- print("folder:", folder)
- for file in os.listdir(folder):
- # if file.endswith(".pdf"):
- # bidderUnit = file.split(".")[0] # 去掉后缀之后的文件名
- # bidder_file = os.path.join(folder, file)
- # folder_info["bidder_info"].append({"bidderUnit":bidderUnit, "bidderFile":bidder_file})
- if file == 'textmind':
- textmind_result_path = os.path.join(folder, file)
- for textmind_json in os.listdir(textmind_result_path):
- if '_textmind' not in textmind_json: continue
- bidderUnit = textmind_json.split("_")[0] # _textmind.json
- textmind_file_path = os.path.join(textmind_result_path, textmind_json)
- with open(textmind_file_path, 'r', encoding='utf-8') as fp:
- raw = json.load(fp)
- try:
- raw = get_ocr_new(raw=raw, pretty=True)
- for k, v in raw.items():
- if k == 'title':
- with open(f'{textmind_result_path}/{bidderUnit}_bidding_title.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- elif k == 'outline':
- with open(f'{textmind_result_path}/{bidderUnit}_bidding_outlines.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- elif k == 'contents':
- with open(f'{textmind_result_path}/{bidderUnit}_bidding_content.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- elif k == 'tables':
- with open(f'{textmind_result_path}/{bidderUnit}_bidding_tables.json', 'w', encoding='utf-8') as fo:
- json.dump(v, fo, ensure_ascii=False)
- except:
- print(textmind_result_path)
- raise ValueError("stop")
- # pre_parse_datasets.append(folder_info)
- # 提前循环遍历建立保存文件夹内容
- # pre_parse_datasets
- if __name__ == '__main__':
- pass
- run()
- # parse_datasets()
|