tools.py 18 KB


  1. # -*- coding: utf-8 -*-
  2. # @Author: privacy
  3. # @Date: 2024-06-11 13:43:14
  4. # @Last Modified by: privacy
  5. # @Last Modified time: 2024-09-30 11:29:16
  6. import os
  7. import re
  8. import json
  9. from enum import Enum, auto
  10. from typing import Any, Optional, List
  11. import pandas as pd
  12. from pdfminer.pdftypes import PDFObjRef
  13. from pdfminer.pdfdocument import PDFDocument
  14. from pdfminer.pdfpage import PDFPage, LITERAL_PAGE
  15. import pdfplumber
  16. def comment_clean(comment: str):
  17. '''
  18. 对LLM返回结果进行清洗
  19. Args:
  20. comment: LLM返回结果
  21. Returns:
  22. comment: 清洗后的LLM返回结果
  23. '''
  24. if not comment:
  25. return comment
  26. # score_rating = re.search(r"\'([A-D])\'",comment)
  27. comment = re.sub(r'=<LevelEnum\.[A-D]:|>', '', comment)
  28. return comment
  29. def check_scan_pdf(file_path: str) -> bool:
  30. """
  31. 测试PDF文件是否为扫描件
  32. Args:
  33. file_path: 文件地址
  34. Returns:
  35. bool: 是否为扫描件
  36. """
  37. probability_page = 0
  38. with pdfplumber.open(file_path) as pdf:
  39. page_num = len(pdf.pages)
  40. for page in pdf.pages:
  41. content = page.extract_text()
  42. if len(content) > 50:
  43. probability_page += 1
  44. if (probability_page / page_num) > 0.1:
  45. return False
  46. return True
  47. def num_to_chinese(num: int) -> str:
  48. """
  49. 数字转中文
  50. Args:
  51. num: 待转数字
  52. Returns:
  53. 数字的中文表示
  54. """
  55. chinese_num = ['零', '一', '二', '三', '四', '五', '六', '七', '八', '九']
  56. chinese_unit = ['', '十', '百', '千', '万']
  57. if num == 0:
  58. return chinese_num[0]
  59. res = ''
  60. unit_index = 0
  61. while num > 0:
  62. digit = num % 10
  63. if digit != 0:
  64. res = chinese_num[digit] + chinese_unit[unit_index] + res
  65. elif not res.startswith(chinese_num[0]):
  66. res = chinese_num[0] + res
  67. num //= 10
  68. unit_index += 1
  69. return res.replace('一十', '十').rstrip('零')
  70. def chinese_to_num(chinese_num: str) -> int:
  71. """
  72. 中文转数字
  73. Args:
  74. chinese_num: 待转中文
  75. Returns:
  76. 数字
  77. """
  78. number_map = {'零': 0, '一': 1, '二': 2, '三': 3, '四': 4, '五': 5, '六': 6, '七': 7, '八': 8, '九': 9}
  79. # 单位映射
  80. unit_map = {'十': 10, '百': 100, '千': 1000, '万': 10000}
  81. output = 0
  82. unit = 1
  83. num = 0
  84. for index, cn_num in enumerate(chinese_num):
  85. if cn_num in number_map:
  86. # 数字
  87. num = number_map[cn_num]
  88. # 最后的个位数字
  89. if index == len(chinese_num) - 1:
  90. output = output + num
  91. elif cn_num in unit_map:
  92. # 单位
  93. unit = unit_map[cn_num]
  94. # 累加
  95. output = output + num * unit
  96. num = 0
  97. else:
  98. raise ValueError(f"{cn_num} 不在转化范围内")
  99. return output
  100. def next_chinese_num(chinese_num: str) -> str:
  101. """
  102. 中文数字加一
  103. Args:
  104. chinese_num: 待加中文数字
  105. Returns:
  106. 加一后的中文
  107. """
  108. num = chinese_to_num(chinese_num)
  109. return num_to_chinese(num + 1)
  110. def filter_images(image_list: list, start_page: int, end_page: int) -> List[dict]:
  111. """
  112. 从已解析的图片中筛选出指定页面的图片
  113. Args:
  114. image_list: 图片列表
  115. start_page: 起始页码
  116. end_page: 终止页码
  117. Returns:
  118. 从起始页码到终止页码间的图片列表
  119. """
  120. df = pd.DataFrame(image_list)
  121. return df.query(f''' {start_page} <= page_number <= {end_page} ''').to_dict(orient='records')
  122. def filter_tables(table_list: list, start_page: int, end_page: int) -> List[dict]:
  123. """
  124. 从已解析的表格中筛选出指定页面的表格
  125. Args:
  126. table_list: 表格列表
  127. start_page: 起始页码
  128. end_page: 终止页码
  129. Returns:
  130. 从起始页码到终止页码间的表格列表
  131. """
  132. return [table for table in table_list if (start_page <= min(table['page_numbers'])) and (end_page >= max(table['page_numbers']))]
  133. def filter_content(content_list: list, start_page: int, end_page: int) -> List[dict]:
  134. """
  135. 从已解析的内容中筛选出指定页面的内容
  136. Args:
  137. content_list: 内容列表
  138. start_page: 起始页码
  139. end_page: 终止页码
  140. Returns:
  141. 从起始页码到终止页码间的内容列表
  142. """
  143. return [content for content in content_list if (start_page <= content['page_number']) and (end_page >= content['page_number'])]
  144. def rmb_to_digit(rmb_str: str):
  145. digit_map = {'零': 0, '壹': 1, '贰': 2, '叁': 3, '肆': 4, '伍': 5, '陆': 6, '柒': 7, '捌': 8, '玖': 9}
  146. unit_map = {'分': 0.01, '角': 0.1, '元': 1, '拾': 10, '佰': 100, '仟': 1000, '万': 10000, '亿': 100000000}
  147. digit = 0
  148. total = 0
  149. tmp = 0
  150. for char in rmb_str:
  151. if char in digit_map:
  152. digit = digit_map[char]
  153. elif char in unit_map:
  154. if digit + tmp:
  155. total += (tmp + digit) * unit_map[char]
  156. tmp = digit = 0
  157. else:
  158. total *= unit_map[char]
  159. else:
  160. tmp = digit
  161. total += tmp + digit
  162. return '{:.2f}'.format(total)
  163. def match_price_zhs(text: str) -> List[str]:
  164. pattern = (r"[壹,贰,叁,肆,伍,陆,柒,捌,玖,拾,佰,仟][壹,贰,叁,肆,伍,陆,柒,捌,玖,拾,佰,仟,元,角,万,分,百,整,零]+"
  165. r"[壹,贰,叁,肆,伍,陆,柒,捌,玖,拾,佰,仟,元,角,万,分,百,整,零]")
  166. temp = re.findall(pattern, text)
  167. for i in range(len(temp)):
  168. if temp[i].endswith('整元') or temp[i].endswith('角元') or temp[i].endswith('分元') or temp[i].endswith('元元'):
  169. temp[i] = temp[i][:-1]
  170. return temp
  171. def match_price_num(text: str) -> List[str]:
  172. pattern = (r"(?:\b(?:[BS]/\.|R(?:D?\$|p))|\b(?:[TN]T|[CJZ])\$|Дин\.|\b(?:Bs|Ft|Gs|K[Mč]|Lek|B[Zr]|k[nr]|[PQLSR]|лв|"
  173. r"ден|RM|MT|lei|zł|USD|GBP|EUR|JPY|CHF|SEK|DKK|NOK|SGD|HKD|AUD|TWD|NZD|CNY|KRW|INR|CAD|VEF|EGP|THB|IDR|"
  174. r"PKR|MYR|PHP|MXN|VND|CZK|HUF|PLN|TRY|ZAR|ILS|ARS|CLP|BRL|RUB|QAR|AED|COP|PEN|CNH|KWD|SAR)|\$[Ub]|"
  175. r"[^\w\s])\s?(?:\d{1,3}(?:,\d{3})*|\d+)(?:\.\d{1,2})?(?!\.?\d)")
  176. return re.findall(pattern, text)
  177. def match_duration(text: str) -> List[str]:
  178. pattern = r"[1-9]+[\d]日历天"
  179. return re.findall(pattern, text)
  180. def match_quality(text: str) -> List[str]:
  181. pattern = r"工程质量.+"
  182. return re.findall(pattern, text)
  183. class PDFRefType(Enum):
  184. """PDF reference type."""
  185. PDF_OBJ_REF = auto()
  186. DICTIONARY = auto()
  187. LIST = auto()
  188. NAMED_REF = auto()
  189. UNK = auto() # fallback
  190. class RefPageNumberResolver:
  191. """PDF Reference to page number resolver.
  192. .. note::
  193. Remote Go-To Actions (see 12.6.4.3 in
  194. `https://www.adobe.com/go/pdfreference/`__)
  195. are out of the scope of this resolver.
  196. Attributes:
  197. document (:obj:`pdfminer.pdfdocument.PDFDocument`):
  198. The document that contains the references.
  199. objid_to_pagenum (:obj:`dict[int, int]`):
  200. Mapping from an object id to the number of the page that contains
  201. that object.
  202. """
  203. def __init__(self, document: PDFDocument):
  204. self.document = document
  205. # obj_id -> page_number
  206. self.objid_to_pagenum: dict[int, int] = {
  207. page.pageid: page_num
  208. for page_num, page in enumerate(PDFPage.create_pages(document), 1)
  209. }
  210. @classmethod
  211. def get_ref_type(cls, ref: Any) -> PDFRefType:
  212. """Get the type of a PDF reference."""
  213. if isinstance(ref, PDFObjRef):
  214. return PDFRefType.PDF_OBJ_REF
  215. elif isinstance(ref, dict) and "D" in ref:
  216. return PDFRefType.DICTIONARY
  217. elif isinstance(ref, list) and any(isinstance(e, PDFObjRef) for e in ref):
  218. return PDFRefType.LIST
  219. elif isinstance(ref, bytes):
  220. return PDFRefType.NAMED_REF
  221. else:
  222. return PDFRefType.UNK
  223. @classmethod
  224. def is_ref_page(cls, ref: Any) -> bool:
  225. """Check whether a reference is of type '/Page'.
  226. Args:
  227. ref (:obj:`Any`):
  228. The PDF reference.
  229. Returns:
  230. :obj:`bool`: :obj:`True` if the reference references
  231. a page, :obj:`False` otherwise.
  232. """
  233. return isinstance(ref, dict) and "Type" in ref and ref["Type"] is LITERAL_PAGE
  234. def resolve(self, ref: Any) -> Optional[int]:
  235. """Resolve a PDF reference to a page number recursively.
  236. Args:
  237. ref (:obj:`Any`):
  238. The PDF reference.
  239. Returns:
  240. :obj:`Optional[int]`: The page number or :obj:`None`
  241. if the reference could not be resolved (e.g., remote Go-To
  242. Actions or malformed references).
  243. """
  244. ref_type = self.get_ref_type(ref)
  245. if ref_type is PDFRefType.PDF_OBJ_REF and self.is_ref_page(ref.resolve()):
  246. return self.objid_to_pagenum.get(ref.objid)
  247. elif ref_type is PDFRefType.PDF_OBJ_REF:
  248. return self.resolve(ref.resolve())
  249. if ref_type is PDFRefType.DICTIONARY:
  250. return self.resolve(ref["D"])
  251. if ref_type is PDFRefType.LIST:
  252. # Get the PDFObjRef in the list (usually first element).
  253. return self.resolve(next(filter(lambda e: isinstance(e, PDFObjRef), ref)))
  254. if ref_type is PDFRefType.NAMED_REF:
  255. return self.resolve(self.document.get_dest(ref))
  256. return None # PDFRefType.UNK
  257. class BaseMethods:
  258. ''' base methods class
  259. '''
  260. def __init__(self) -> None:
  261. pass
  262. def pandas_read_xls(self, file_path: str, sheetname: str = "Sheet1"):
  263. ''' 读取xls文件方法
  264. '''
  265. return pd.read_excel(file_path, sheet_name=sheetname)
  266. def json_read(self, file_path: str):
  267. ''' 读取json文件方法
  268. '''
  269. with open(file_path, "r", encoding='utf-8') as fp:
  270. return json.load(fp)
  271. def save_file(self, save_data: list, save_path: str, file_format: str):
  272. ''' 保存文件
  273. '''
  274. if file_format == "json":
  275. with open(save_path, 'w', encoding='utf-8') as sf:
  276. sf.write(json.dumps(save_data, ensure_ascii=False))
  277. elif file_format == "xlsx" or file_format == "xls":
  278. with pd.ExcelWriter(save_path) as fp:
  279. save_data.to_excel(fp, sheet_name="Sheet1")
  280. elif file_format == 'txt':
  281. with open(save_path, 'w', encoding='utf-8') as tx:
  282. for data in save_data:
  283. tx.write(data + "\n")
  284. def traverse_file(self, dirpath: str):
  285. '''
  286. 遍历文件夹下文件
  287. '''
  288. filename = tuple()
  289. for root, dir, files in os.walk(dirpath):
  290. for name in files:
  291. filename = filename.__add__((name,))
  292. return filename
  293. class TitleLevelJudge:
  294. def __init__(self, titles: List[str]):
  295. self.titles = titles
  296. self.levels = self.judge_title_level(self.titles)
  297. @classmethod
  298. def judge_title_level(cls, titles: List[str]) -> List[int]:
  299. """
  300. 判断标题的等级
  301. 规则1000. 默认第一个标题的等级为 1
  302. 往下遍历标题
  303. 判断标题是否在正则表达式中,如果在,使用 规则1100.,如果不在,使用 规则1200.
  304. 规则1100. 判断标题使用的正则表达式是否为上个标题使用的正则表达式,如果是,则使用 规则1110. 如果否则使用 规则1120.
  305. 规则1110. 当前标题和上一个标题在同一个等级
  306. 规则1120. 向上查找,标题等级依次降低,如果找到,则使用 规则1121., 如果提升,则使用 规则1122.
  307. 规则1121. 使用找的标题等级
  308. 规则1122. 标题等级提升
  309. 规则1200. 特殊标题,标题等级提升
  310. """
  311. # 定义用于提取标题结构的正则表达式
  312. patterns = [
  313. r'^第[一二三四五六七八九十百]+章', # 例如:“第一章”
  314. r'^第[一二三四五六七八九十百]+条', # 例如:“第一条”
  315. r'^第[一二三四五六七八九十百]+部分', # 例如:“第一部分”
  316. r'^第\d+章', # 例如:“第3章”
  317. r'^第 \d+ 章', # 例如:“第 3 章”
  318. r'^第\d+条', # 例如:“第3条”
  319. r'^第 \d+ 条', # 例如:“第 3 条”
  320. r'^第\d+部分', # 例如:“第3部分”
  321. r'^第 \d+ 部分', # 例如:“第 3 部分”
  322. r'^([一二三四五六七八九十百]+)', # 例如:“(一)”
  323. r'^([\d]+)', # 例如:“(1)”
  324. r'^[一二三四五六七八九十百]+、', # 例如:“一、”
  325. r'^[一二三四五六七八九十百]+)', # 例如:“一)”
  326. r'^[一二三四五六七八九十百]+\)', # 例如:“一)”
  327. r'^\d+、', # 例如:“1、”
  328. r'^\d+)', # 例如:“1)”
  329. r'^\d+\)', # 例如:“1)”
  330. r'^\d+-\d+', # 例如:“5-2”
  331. r'^\d+\.\d+\.\d+\.\d+\.\d+\.\d+\.\d+\.\d+', # 例如:“1.1.1.1”
  332. r'^\d+\.\d+\.\d+\.\d+\.\d+\.\d+\.\d+', # 例如:“1.1.1.1”
  333. r'^\d+\.\d+\.\d+\.\d+\.\d+\.\d+', # 例如:“1.1.1.1”
  334. r'^\d+\.\d+\.\d+\.\d+\.\d+', # 例如:“1.1.1.1”
  335. r'^\d+\.\d+\.\d+\.\d+', # 例如:“1.1.1.1”
  336. r'^\d+\.\d+\.\d+', # 例如:“1.1.1”
  337. r'^\d+\.\d+', # 例如:“1.1”
  338. r'^\d+\.', # 例如:“1.”
  339. r'^文件 [一二三四五六七八九十百]+', # 例如:“文件 一”
  340. r'^附件 [一二三四五六七八九十百]+', # 例如:“附件 一”
  341. r'^附录 [一二三四五六七八九十百]+', # 例如:“附录 一”
  342. r'^文件[一二三四五六七八九十百]+', # 例如:“文件一”
  343. r'^附件[一二三四五六七八九十百]+', # 例如:“附件一”
  344. r'^附录[一二三四五六七八九十百]+', # 例如:“附录一”
  345. r'^文件 \d', # 例如:“文件 1”
  346. r'^附件 \d', # 例如:“附件 1”
  347. r'^附录 \d', # 例如:“附录 1”
  348. r'^文件\d', # 例如:“文件1”
  349. r'^附件\d', # 例如:“附件1”
  350. r'^附录\d', # 例如:“附录1”
  351. r'^图', # 例如:“图:1”
  352. r'图$', # 例如:“示例图”
  353. r'^表', # 例如:“表:1”
  354. r'^附表', # 例如:“附表:1”
  355. r'表$', # 例如:“示例表”
  356. r'函$', # 例如:“合规承诺函”
  357. r'承诺书$', # 例如:“合规承诺书”
  358. r'证书$', # 例如:“投标人专利证书”
  359. r'专利$', # 例如:“发明专利”
  360. r'^[一二三四五六七八九十百]+', # 例如:“一”
  361. r'^\d+', # 例如:“1”
  362. r'.*?' # 任意匹配
  363. ]
  364. # 初始化标题等级列表
  365. level_list = []
  366. pattern_list = []
  367. # 遍历所有标题
  368. for title in titles:
  369. # 遍历所有结构模式
  370. for i, pattern in enumerate(patterns):
  371. if re.match(pattern, title):
  372. current_pattern = i + 1
  373. break
  374. # 规则1000. 默认第一个标题的等级为 1
  375. if not level_list:
  376. current_level = 1
  377. # 判断标题是否在正则表达式中
  378. elif current_pattern in pattern_list:
  379. # 规则1100. 判断标题使用的正则表达式是否为上个标题使用的正则表达式
  380. if current_pattern == pattern_list[-1]:
  381. # 当前标题和上一个标题在同一个等级
  382. current_level = level_list[-1]
  383. # 规则1120. 向上查找,标题等级依次降低
  384. else:
  385. # 上一个等级
  386. tl = level_list[-1]
  387. for p, l in zip(pattern_list[::-1], level_list[::-1]):
  388. if (current_pattern == p) and current_level < (tl + 12):
  389. current_level = l
  390. break
  391. # 规则1200. 特殊标题,标题等级提升
  392. else:
  393. current_level = level_list[-1] + 1
  394. # 将当前标题的等级添加到列表中
  395. pattern_list.append(current_pattern)
  396. level_list.append(current_level)
  397. return level_list
  398. def find_next_title(self, current_title: str) -> Optional[str]:
  399. # 获取当前标题的索引
  400. current_index = self.titles.index(current_title)
  401. # 从当前标题的下一个标题开始遍历
  402. for i in range(current_index + 1, len(self.titles)):
  403. # 如果下一个标题的等级小于等于当前标题的等级,则返回该标题
  404. if self.levels[i] <= self.levels[current_index]:
  405. return self.titles[i]
  406. # 如果没有找到满足条件的标题,则返回None
  407. return None