docs_site_loader.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import logging
  2. from urllib.parse import urljoin, urlparse
  3. import requests
  4. from bs4 import BeautifulSoup
  5. from embedchain.loaders.base_loader import BaseLoader
  6. class DocsSiteLoader(BaseLoader):
  7. def __init__(self):
  8. self.visited_links = set()
  9. def _get_child_links_recursive(self, url):
  10. parsed_url = urlparse(url)
  11. base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
  12. current_path = parsed_url.path
  13. response = requests.get(url)
  14. if response.status_code != 200:
  15. logging.info(f"Failed to fetch the website: {response.status_code}")
  16. return
  17. soup = BeautifulSoup(response.text, "html.parser")
  18. all_links = [link.get("href") for link in soup.find_all("a")]
  19. child_links = [link for link in all_links if link and link.startswith(current_path) and link != current_path]
  20. absolute_paths = [urljoin(base_url, link) for link in child_links]
  21. for link in absolute_paths:
  22. if link not in self.visited_links:
  23. self.visited_links.add(link)
  24. self._get_child_links_recursive(link)
  25. def _get_all_urls(self, url):
  26. self.visited_links = set()
  27. self._get_child_links_recursive(url)
  28. urls = [link for link in self.visited_links if urlparse(link).netloc == urlparse(url).netloc]
  29. return urls
  30. def _load_data_from_url(self, url):
  31. response = requests.get(url)
  32. if response.status_code != 200:
  33. logging.info(f"Failed to fetch the website: {response.status_code}")
  34. return []
  35. soup = BeautifulSoup(response.content, "html.parser")
  36. selectors = [
  37. "article.bd-article",
  38. 'article[role="main"]',
  39. "div.md-content",
  40. 'div[role="main"]',
  41. "div.container",
  42. "div.section",
  43. "article",
  44. "main",
  45. ]
  46. output = []
  47. for selector in selectors:
  48. element = soup.select_one(selector)
  49. if element:
  50. content = element.prettify()
  51. break
  52. else:
  53. content = soup.get_text()
  54. soup = BeautifulSoup(content, "html.parser")
  55. ignored_tags = [
  56. "nav",
  57. "aside",
  58. "form",
  59. "header",
  60. "noscript",
  61. "svg",
  62. "canvas",
  63. "footer",
  64. "script",
  65. "style",
  66. ]
  67. for tag in soup(ignored_tags):
  68. tag.decompose()
  69. content = " ".join(soup.stripped_strings)
  70. output.append(
  71. {
  72. "content": content,
  73. "meta_data": {"url": url},
  74. }
  75. )
  76. return output
  77. def load_data(self, url):
  78. all_urls = self._get_all_urls(url)
  79. output = []
  80. for u in all_urls:
  81. output.extend(self._load_data_from_url(u))
  82. return output