''' 招投标文件预审查 1. 解析Bidding_document_extract中all_tables.json结果 ''' from tools import BaseMethods from pprint import pprint import re import logging import requests # from bidding_document_extract.get_Bidding_info import PdfExtractAttr_ # from bidding_document_extract.get_bidding_info import PdfExtractAttr chinese_num_map = { '零': 0, '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9, '十': 10 } def create_logger(log_path): """ 将日志输出到日志文件和控制台 """ logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s') # 创建一个handler,用于写入日志文件 file_handler = logging.FileHandler( filename=log_path, mode='w') file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) # 创建一个handler,用于将日志输出到控制台 console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(formatter) logger.addHandler(console) return logger log_path = "code/logs/logs.log" logger = create_logger(log_path=log_path) class DocumentPreReview(): def __init__(self) -> None: self.bm = BaseMethods() # self.agent_ = PdfExtractAttr_(file_path=self.file_path) # self.agent = PdfExtractAttr(file_path=self.file_path) self.Bidding_tables = self.get_Bidding_table() self.contexts = self.get_Bidding_contexts() self.announcement = self.get_announcement() self.Bidding_context = self.get_Bidding_json() self.tender_context = self.get_tender_context() self.chinese_num_map = chinese_num_map def get_Bidding_contexts(self, file_path:str = 'data/预审查数据/contexts.json'): ''' get contexts by page ''' contexts = self.bm.json_read(file_path) return contexts def get_Bidding_table(self): ''' get table data ''' # file_path = "data/预审查数据/all_tables.json" file_path = "data/预审查数据/三峡左岸及电源电站中央空调系统监控系统改造招标文件-发售版-table.json" all_tables = self.bm.json_read(file_path) return all_tables def get_Bidding_json(self): ''' read json to get context ''' file_path = "data/预审查数据/Bidding_contents_2022-2025年度三峡电站9台机组检修密封加工制作重新招标招标文件印刷版.json" Bidding_context = self.bm.json_read(file_path) return Bidding_context def get_tender_context(self): ''' read the tender context ''' file_path = "data/预审查数据/南方电网数字研究院有限公司_bidding_content.json" tender_context = self.bm.json_read(file_path) return tender_context def _scrutinize_judge(self, tag:str): ''' Clause number content judgment 商务 技术 报价 评审 评分 标准 ''' scrutinize_tuple = ("商务","技术","报价","评审","评分","标准") hit_num = 0 for scru in scrutinize_tuple: if scru in tag: hit_num+= 1 if hit_num>=3: return True else: return False def get_table(self): ''' parse the Bidding_tables.json file to get the table data from it. ''' all_tables = self.Bidding_tables # 招标文件内容中预审查 tag_sign = '' tag_list = ("形式评审标准", "资格评审标准", "响应性评审标准") tag_dict = dict([(tag,[]) for tag in tag_list]) # 招标文件内容中清标表格数据 # scrutinize_tuple = ("商务部分评分标准","技术部分评审标准","技术部分评分标准","投标报价评审标准","报价部分评审标准","报价评分标准","报价部分评分标准") scrutinize_dict = {} scrutinize_page = 0 scrutinize_index = 0 scrutinize_Initial_title_len = 0 # 详审位置标记 record_page = 0 bidder_know = {} # 投标人须知前附表 for partial_form in all_tables: table_name = partial_form['table_name'] page_number = partial_form['page_numbers'] title_len = partial_form['title_len'] tables = partial_form["table"] if '投标人须知前附表' == table_name: record_page = page_number[0] if page_number[0] < record_page + 3: for table in tables[1:]: if '条' in table: continue # 存在BUG try: if table[0] and table[0] not in bidder_know: bidder_know[table[0]] = [] if table[0]: bidder_know[table[0]].append({"条款名称":table[1],"编列内容":table[2]}) except: logger.error('该文件中的投标人须知前附表部分表格没有边框,只有中间部分表格存在边框,提取代码认为只有边框存在才被判定为表格内容') if '评标方法' in table_name: table_name = table_name.strip().replace("\n","") if table_name == "评标办法前附表": table_page_num = page_number[0] inital_data = tables[0] # confirm data location regulation_number_index = inital_data.index("条款号") evaluation_factor_index = inital_data.index("评审因素") evaluation_criteria_index = inital_data.index("评审标准") for table in tables[1:]: tag = table[regulation_number_index+1] if tag: tag = tag.strip().replace("\n","") if tag: tag_sign = tag evaluation_factor,evaluation_criteria = table[evaluation_factor_index],table[evaluation_criteria_index] if tag_sign in tag_dict: tag_dict[tag_sign].append({"评审因素":evaluation_factor.strip().replace("\n",""), "评审标准":evaluation_criteria.strip().replace("\n","")}) if '评分因素' in table or '评分标准' in table: scrutinize_page = table_page_num scrutinize_Initial_title_len = title_len if not scrutinize_page: scrutinize_page = table_page_num+1 ''' scrutinize ''' if (scrutinize_page == page_number[0] and scrutinize_Initial_title_len) or scrutinize_page == page_number[0]: regulation_number_index,evaluation_factor_index,evaluation_criteria_index,weights_index = 0,0,0,0 if not scrutinize_Initial_title_len: scrutinize_Initial_title_len = title_len for table in tables: if '评分因素' in table and '评分标准' in table: regulation_number_index = table.index("条款号") evaluation_factor_index = table.index("评分因素") evaluation_criteria_index = table.index("评分标准") weights_index = table.index("权重") tag_sign = '' scrutinize_index = tables.index(table) if scrutinize_index: for table in tables[scrutinize_index+1:]: if table[regulation_number_index+1]: tag = table[regulation_number_index+1] elif self._scrutinize_judge(table[regulation_number_index+2]): tag = table[regulation_number_index+2] else: tag = table[regulation_number_index] if tag: tag = tag.strip().replace("\n","") tag = ''.join(re.findall(r"[\u4e00-\u9fa5]+", tag)) if tag and self._scrutinize_judge(tag): tag_sign = tag if tag_sign not in scrutinize_dict: scrutinize_dict[tag_sign] = [] evaluation_factor,evaluation_criteria,weights = table[evaluation_factor_index],table[evaluation_criteria_index],table[weights_index] if not weights: value = {"评分因素":evaluation_factor.strip().replace("\n",""),"评分标准":evaluation_criteria.strip().replace("\n","")} else: value = {"评分因素":evaluation_factor.strip().replace("\n",""), "评分标准":evaluation_criteria.strip().replace("\n",""), "权重":weights.strip().replace("\n","")} scrutinize_dict[tag_sign].append(value) if '报价' in tag_sign and '标准' in tag_sign: scrutinize_dict = {key: value for key, value in scrutinize_dict.items() if value} scrutinize_Initial_title_len = 0 break elif scrutinize_page+1 == page_number[0] and '报价' not in tag_sign: difference_value = scrutinize_Initial_title_len - title_len if scrutinize_Initial_title_len: evaluation_factor_index -= difference_value evaluation_criteria_index -= difference_value weights_index -= difference_value for table in tables: if not table[2]: scrutinize_dict[tag_sign][-1]['评分标准'] += table[3] continue if table[regulation_number_index+1]: tag = table[regulation_number_index+1] elif self._scrutinize_judge(table[regulation_number_index+2]): tag = table[regulation_number_index+2] else: tag = table[regulation_number_index] if tag: tag = tag.strip().replace("\n","") tag = re.findall("[\u4e00-\u9fff]+", tag)[0] if tag and self._scrutinize_judge(tag): tag_sign = tag if tag_sign not in scrutinize_dict: scrutinize_dict[tag_sign] = [] try: evaluation_factor,evaluation_criteria,weights = table[evaluation_factor_index],table[evaluation_criteria_index],table[weights_index] except: print() if not weights: value = {"评分因素":evaluation_factor.strip().replace("\n",""), "评分标准":evaluation_criteria.strip().replace("\n","")} else: value = {"评分因素":evaluation_factor.strip().replace("\n",""), "评分标准":evaluation_criteria.strip().replace("\n",""), "权重":weights.strip().replace("\n","")} scrutinize_dict[tag_sign].append(value) if '报价' in tag_sign and '标准' in tag_sign: scrutinize_dict = {key: value for key, value in scrutinize_dict.items() if value} scrutinize_Initial_title_len = 0 break elif scrutinize_page+2 == page_number[0] and '报价' not in tag_sign: difference_value = scrutinize_Initial_title_len - title_len if scrutinize_Initial_title_len: evaluation_factor_index -= difference_value evaluation_criteria_index -= difference_value weights_index -= difference_value for table in tables: if not table[2]: scrutinize_dict[tag_sign][-1]['评分标准'] += table[3] continue if table[regulation_number_index+1]: tag = table[regulation_number_index+1] elif self._scrutinize_judge(table[regulation_number_index+2]): tag = table[regulation_number_index+2] else: tag = table[regulation_number_index] if tag: tag = tag.strip().replace("\n","") tag = re.findall("[\u4e00-\u9fff]+", tag)[0] if tag and self._scrutinize_judge(tag): tag_sign = tag if tag_sign not in scrutinize_dict: scrutinize_dict[tag_sign] = [] evaluation_factor,evaluation_criteria,weights = table[evaluation_factor_index],table[evaluation_criteria_index],table[weights_index] if not weights: value = {"评分因素":evaluation_factor.strip().replace("\n",""), "评分标准":evaluation_criteria.strip().replace("\n","")} else: value = {"评分因素":evaluation_factor.strip().replace("\n",""), "评分标准":evaluation_criteria.strip().replace("\n",""), "权重":weights.strip().replace("\n","")} scrutinize_dict[tag_sign].append(value) if '报价' in tag_sign and '标准' in tag_sign: scrutinize_dict = {key: value for key, value in scrutinize_dict.items() if value} scrutinize_Initial_title_len = 0 break # pprint(tag_dict) pprint(scrutinize_dict) # pprint(bidder_know) return tag_dict,bidder_know,scrutinize_dict def get_announcement(self)->str: ''' bidder announcement ''' announcements = '' announcement_contexts = self.contexts[2:8] for index, announcement in enumerate(announcement_contexts): finder = re.findall("^第一章",announcement['text']) if finder: for text in announcement_contexts[index:]: if re.findall("^第二章", text["text"]): break announcements += text["text"] break return announcements def contexts_extract(self, evaluation_criteria:str): ''' 招标文件正文抓取 ''' comp1 = re.compile("(第.*?章)") comp2 = re.compile("“(.*?)”") title = comp1.findall(evaluation_criteria)[0]+comp2.findall(evaluation_criteria)[0] comp3 = re.compile("第(.*?)章") title_list = [] format_index,sta_page = -1,-1 sign = True title_next = '' for context in self.Bidding_context: # 取招标文件内容 text = context['text'].strip().replace(" ","") if text == '目录': sta_page = context['page_number'] if sta_page != -1 and context['page_number'] < 4: finder = comp3.findall(context['text']) if finder and sign: if title_list: chinese_num = self.chinese_num_map.get(comp3.findall(title_list[-1])[0],None) if chinese_num > self.chinese_num_map.get(finder[0],0): sign = False else: title_list.append(context['text'].split(' ')[0]) else: title_list.append(context['text'].split(' ')[0]) if text == title and format_index == -1: format_index = self.Bidding_context.index(context) break title_index = title_list.index(title) if title_index != len(title_list)-1: title_next = title_list[title_index+1] file_format = {title:{}} for context in self.Bidding_context[format_index+1:]: text = context['text'].strip().replace(" ","").replace("\n","——>") if title_next and title_next == text: break if context['page_number'] not in file_format[title]: file_format[title][context['page_number']] = [] file_format[title][context['page_number']].append(context['text']) return file_format def formal_criteria(self, review_criteria_list:list): ''' Analysis of formal review criteria 形式评审标准 [{'评审因素': '投标人名称', '评审标准': '与营业执照书一致'}, {'评审因素': '投标文件封面、投标函签字盖章', '评审标准': '投标文件封面、投标函须有法定代表人(或其委托代理人)签字(或签章)并加盖单位章,由委托代理人签字的须具有有效的授权委托书'}, {'评审因素': '投标文件格式', '评审标准': '符合第八章“投标文件格式”的要求'}, {'评审因素': '联合体投标人(如有)', '评审标准': '不适用'}, {'评审因素': '报价唯一', '评审标准': '只能有一个有效报价'}] ''' formal_result = {} for review_criteria in review_criteria_list: evaluation_factor = review_criteria['评审因素'] evaluation_criteria = review_criteria['评审标准'] if '投标人名称' in evaluation_factor or '供应商名称' in evaluation_factor: ['营业执照','资质证书'] ''' 要求投标文件中 投标公司 与 其提供的营业执照或资质证书中的名称相同 ''' pass elif '报价函签字盖章' in evaluation_factor or '投标文件封面、投标函签字盖章' in evaluation_factor: ''' 要求投标文件中 投标公司的 法人或委托人签字或是 存在单位盖章 ''' pass elif '投标文件格式' in evaluation_factor: file_format = self.contexts_extract(evaluation_criteria) pprint(file_format) ''' 招标文件 file_format 与投标文件内容对比,投标文件中只要存在file_format内容即可 ''' chinese_map_list = list(self.chinese_num_map) catelogue_list = [] tender_start = 0 catelogue_value = '' add_index = 0 hit_nums = 0 numbers = 0 for format_values in file_format.values(): for format in format_values.values(): numbers += 1 catelogue_update_sign = False first_value = format[0].replace(" ","").replace("\n","") if '目录' == first_value: for i in format[1:]: for j in chinese_map_list: if j in i and i not in catelogue_list: catelogue_list.append(i) if catelogue_list and not tender_start: catelogue = catelogue_list[0] comp1 = re.compile(f'^{catelogue}') for tender_context in self.tender_context: context = tender_context['text'] finder = comp1.findall(context) if finder: tender_start = self.tender_context.index(tender_context) break if first_value in catelogue_list: catelogue_update_sign = True catelogue_value = first_value catelogue_index = catelogue_list.index(catelogue_value) if catelogue_list[-1] != catelogue_value: catelogue_value_next = catelogue_list[catelogue_index+1] else: catelogue_value_next = catelogue_value if catelogue_value: hit_num = 0 if catelogue_update_sign: tender_start += add_index add_index = 0 for tender_index, tender_contents in enumerate(self.tender_context[tender_start:]): tender_context = tender_contents['text'].split("\n") if tender_context[0] == catelogue_value_next: add_index = tender_index break for value in format: if value in tender_context: hit_num += 1 hit_nums += hit_num hit_rate = round(hit_nums/numbers,4) if hit_rate>0.70: formal_result[evaluation_factor] = (True, evaluation_criteria) else: formal_result[evaluation_factor] = (False, evaluation_criteria) elif '联合体投标人' in evaluation_factor: if '不适用' in evaluation_criteria: continue elif '报价唯一' in evaluation_factor: ''' 需要在投标文件中比对三个位置的报价总和值抽取 ''' pass def qualification_criteria(self, review_criteria_list:list, bidder_know:dict): ''' Qualification assessment criteria 资格评审标准 ''' for review_criteria in review_criteria_list: evaluation_factor = review_criteria['评审因素'] evaluation_criteria = review_criteria['评审标准'] if '营业执照' in evaluation_factor: ''' 在投标文件中 对营业执照识别营业期限;长期识别认为可以;只有开始时间没有结束时间给提示。 ''' pass elif '资质' in evaluation_factor: comp1 = re.compile('(第.*?章)') comp2 = re.compile('“(.*?)”') comp3 = re.compile('第([\d+\.]+)项规定') finder1 = comp1.findall(evaluation_criteria)[0] finder2 = comp2.findall(evaluation_criteria)[0] finder3 = comp3.findall(evaluation_criteria)[0] chapter_name = finder1+finder2 stipulation = finder3 if '投标人须知' in chapter_name: bidder_data = bidder_know.get(stipulation,None) if not bidder_data: continue ## 需要修改 clause_name = bidder_data[0]['条款名称'].replace("\n","") list_content = bidder_data[0]['编列内容'] if '招标公告' in list_content: cert_index = self.announcement.index('资质') ## 默认 资质条件 不变 cert_required = re.findall(":(.*?)\\n",self.announcement[cert_index:cert_index+500])[0] print(cert_required) # 具备法人资格 ''' big model 需要设计prompt,可将内容及情况在线上glm4中使用,测出合适prompt ''' def responsive_criteria(self, review_criteria_list:list, bidder_know:dict): ''' Responsive review criteria 响应性评审 ''' for review_criteria in review_criteria_list: evaluation_factor = review_criteria['评审因素'] evaluation_criteria = review_criteria['评审标准'] if evaluation_factor == '权利义务' or '合同' in evaluation_criteria: '''不对合同进行处理''' continue def content_parsing(self): ''' data analysis aggregate function ''' tag_dict,bidder_know,scrutinize_dict = dpr.get_table() # {} # self.formal_criteria(tag_dict['形式评审标准']) self.qualification_criteria(tag_dict['资格评审标准'], bidder_know) from fastapi import FastAPI import uvicorn app = FastAPI() @app.post('get_pre_review') def get_pre_review(): result = { "":"" } return result if __name__ == '__main__': dpr = DocumentPreReview() dpr.get_table() # dpr.content_parsing() # formal_review_criteria = [ # {'评审因素': '投标文件格式', '评审标准': '符合第八章“投标文件格式”的要求'} # # {'评审因素': '投标文件格式', '评审标准': '符合第四章“合同条款及格式”规定'} # ] # dpr.formal_criteria(formal_review_criteria)