# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2024-06-11 13:43:14 # @Last Modified by: privacy # @Last Modified time: 2024-09-27 14:08:30 import re import json import pandas as pd import os """ textmind 结果解析 [Node] node_id: int text: str node_type: parent: int children: list para_type: [position] pageno: int layout_index: int box: list """ def json2json(path): _lines = open(path, 'r', encoding='utf-8').read() json_line = json.loads(_lines) return json_line def paese_content(layouts:list): ''' ''' if not layouts: return pd.NA contents = [] for layout in layouts: if layout['type'] != 'table' or layout['type'] != 'image' or layout['type'] != 'seal' or layout['type'] != 'head_tail': if not layout['text']: continue contents.append(layout['text']) return "\n".join(contents).replace('\n\n','\n').replace(' ','') def parse_table_name(tables:list, layouts:list): ''' ''' if not tables: return pd.NA node_dict = {} for layout in layouts: if not layout['children']: continue node_dict[layout['text']] = layout['children'] # text对应children 一一对应 table_ids = [] for table in tables: table_ids.append({'layout_id':table['layout_id']}) table_names = [] for table_id in table_ids: layout_id = table_id['layout_id'] for text, children in node_dict.items(): if layout_id in children: table_names.append(text) if not table_names: layout_ids = [] for layout in layouts: layout_ids.append({layout['layout_id']:layout['text']}) table_layout_ids = [] for table in tables: table_layout_ids.append({'layout_id':table['layout_id']}) index_ = 0 for table_layout_id in table_layout_ids: for layout_id in layout_ids: if table_layout_id['layout_id'] in layout_id: index_ = layout_ids.index(layout_id) break for ids in layout_ids[:index_]: for value in ids.values(): if '表' in value: table_names.append(value) if not table_names and index_ > 0: table_names.append(list(layout_ids[index_-1].values())[0]) return ";".join(table_names) def parse_title(layouts:list): ''' 解析标题 ''' if not layouts: return pd.NA for layout in layouts: if (layout['type'] == 'title' or 'title' in layout['sub_type']) and layout['text'] and layout['type'] != 'head_tail': text = re.sub("\n","",layout['text']) if not text: continue return text for layout in layouts: if not (layout['type'] == 'text' and layout['text']): continue text = re.sub("\n","",layout['text']) if text and len(text) < 30: return re.sub("\n","",text) return pd.NA def parse_table(markdown:str): table = [] lines = markdown.split('\n') for line in lines: line = re.sub(r"\\n| ","",line) table.append(line.strip('|').split('|')) return table def get_ocr_new(raw:dict, pretty: bool = False): '''解析textmind结果''' nodes = [] for node in raw['pages']: del node['page_id'] if not node['text']: continue nodes.append(node) df = pd.DataFrame(nodes) if not pretty: return df content_df = df.loc[:,['page_num']] content_df['text'] = df['layouts'].apply(lambda x: paese_content(x)) content_df = content_df.rename(columns={'page_num':'page_number'}) content_df.dropna(inplace=True) content = content_df.to_dict('records') title_df = df.loc[:,['page_num']] title_df = title_df.rename(columns={'page_num':'page_number'}) title_df['title'] = df['layouts'].apply(lambda x: parse_title(x)) title_df['box'] = df['layouts'].apply(lambda x: x[0]['position'] if x else pd.NA) # title_df['box'] = df[df['layouts'].apply(lambda x: x[0]['position'] if x else False)] title_df['node_type'] = df['layouts'].apply(lambda x: x[0]['type'] if x else pd.NA) title_df['para_type'] = df['layouts'].apply(lambda x: x[0]['sub_type'] if x else pd.NA) title_df['text'] = title_df['title'] title_df.dropna(inplace=True) outline = title_df.to_dict('records') # print(outline[:2]) title_df['seq_num'] = title_df.index title = title_df.to_dict('records') # print(title[:2]) table_df = df.loc[:,['page_num']] table_df['page_num'] = table_df['page_num'].apply(lambda x: [x]) table_df = table_df.rename(columns={'page_num':'page_numbers'}) table_df['table'] = df['tables'].apply(lambda x: parse_table(x[0]['markdown']) if x else pd.NA) table_df['table_name'] = df.apply(lambda x: parse_table_name(x['tables'], x['layouts']), axis=1) table_df.dropna(inplace=True) table = table_df.to_dict('records') # print(table[:2]) return {"title": title, "outline": outline, "contents": content, "tables": table, "images": []} def run(): basepath = '/mnt/d/Work_PWS/24y_04_SanXia/data/甲方提供材料/30份数据整理' for save_file in os.listdir(basepath): save_file_path = os.path.join(basepath, save_file) for save_file_name in os.listdir(save_file_path): if '投标文件' == save_file_name: save_file_name_path = os.path.join(save_file_path,save_file_name) textmind_save_dir = os.path.join(save_file_name_path,'textmind') if not os.path.exists(textmind_save_dir): continue for bidder_name in os.listdir(textmind_save_dir): if 'textmind.json' not in bidder_name[-13:]: continue textmind_result_path = os.path.join(textmind_save_dir, bidder_name) print("textmind_result_path ",textmind_result_path) with open(textmind_result_path, 'r', encoding='utf-8') as fp: raw = json.load(fp) try: raw = get_ocr_new(raw=raw, pretty=True) for k, v in raw.items(): if k == 'title': with open(f'{textmind_save_dir}/{bidder_name[:-5]}_title.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) elif k == 'outline': with open(f'{textmind_save_dir}/{bidder_name[:-5]}_outlines.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) elif k == 'contents': with open(f'{textmind_save_dir}/{bidder_name[:-5]}_content.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) elif k == 'tables': with open(f'{textmind_save_dir}/{bidder_name[:-5]}_tables.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) except: print(textmind_result_path) raise ValueError("stop") def parse_datasets(): base_dir = '/mnt/d/Work_PWS/24y_04_SanXia/data/甲方提供材料/20241122-4' # pre_parse_datasets = [] for base_folders in os.listdir(base_dir): base_folder = os.path.join(base_dir, base_folders) folder_info = {} for folders in os.listdir(base_folder): folder = os.path.join(base_folder, folders) if folders == "招标文件": for file in os.listdir(folder): if file.endswith(".pdf"): projectName = file.split(".")[0] # 去掉后缀之后的文件名 tender_file = os.path.join(folder, file) # folder_info["projectName"] = projectName # folder_info["buyFile"] = tender_file elif folders == '投标文件': # folder_info["bidder_info"] = [] print("folder:", folder) for file in os.listdir(folder): # if file.endswith(".pdf"): # bidderUnit = file.split(".")[0] # 去掉后缀之后的文件名 # bidder_file = os.path.join(folder, file) # folder_info["bidder_info"].append({"bidderUnit":bidderUnit, "bidderFile":bidder_file}) if file == 'textmind': textmind_result_path = os.path.join(folder, file) for textmind_json in os.listdir(textmind_result_path): if '_textmind' not in textmind_json: continue bidderUnit = textmind_json.split("_")[0] # _textmind.json textmind_file_path = os.path.join(textmind_result_path, textmind_json) with open(textmind_file_path, 'r', encoding='utf-8') as fp: raw = json.load(fp) try: raw = get_ocr_new(raw=raw, pretty=True) for k, v in raw.items(): if k == 'title': with open(f'{textmind_result_path}/{bidderUnit}_bidding_title.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) elif k == 'outline': with open(f'{textmind_result_path}/{bidderUnit}_bidding_outlines.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) elif k == 'contents': with open(f'{textmind_result_path}/{bidderUnit}_bidding_content.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) elif k == 'tables': with open(f'{textmind_result_path}/{bidderUnit}_bidding_tables.json', 'w', encoding='utf-8') as fo: json.dump(v, fo, ensure_ascii=False) except: print(textmind_result_path) raise ValueError("stop") # pre_parse_datasets.append(folder_info) # 提前循环遍历建立保存文件夹内容 # pre_parse_datasets if __name__ == '__main__': pass run() # parse_datasets()