sitemap.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import concurrent.futures
  2. import hashlib
  3. import logging
  4. from urllib.parse import urlparse
  5. import requests
  6. from tqdm import tqdm
  7. try:
  8. from bs4 import BeautifulSoup
  9. from bs4.builder import ParserRejectedMarkup
  10. except ImportError:
  11. raise ImportError(
  12. 'Sitemap requires extra dependencies. Install with `pip install --upgrade "embedchain[dataloaders]"`'
  13. ) from None
  14. from embedchain.helpers.json_serializable import register_deserializable
  15. from embedchain.loaders.base_loader import BaseLoader
  16. from embedchain.loaders.web_page import WebPageLoader
  17. @register_deserializable
  18. class SitemapLoader(BaseLoader):
  19. """
  20. This method takes a sitemap URL as input and retrieves
  21. all the URLs to use the WebPageLoader to load content
  22. of each page.
  23. """
  24. def load_data(self, sitemap_url):
  25. output = []
  26. web_page_loader = WebPageLoader()
  27. if urlparse(sitemap_url).scheme not in ["file", "http", "https"]:
  28. raise ValueError("Not a valid URL.")
  29. if urlparse(sitemap_url).scheme in ["http", "https"]:
  30. response = requests.get(sitemap_url)
  31. response.raise_for_status()
  32. else:
  33. with open(sitemap_url, "r") as file:
  34. soup = BeautifulSoup(file, "xml")
  35. links = [link.text for link in soup.find_all("loc") if link.parent.name == "url"]
  36. if len(links) == 0:
  37. links = [link.text for link in soup.find_all("loc")]
  38. doc_id = hashlib.sha256((" ".join(links) + sitemap_url).encode()).hexdigest()
  39. def load_web_page(link):
  40. try:
  41. loader_data = web_page_loader.load_data(link)
  42. return loader_data.get("data")
  43. except ParserRejectedMarkup as e:
  44. logging.error(f"Failed to parse {link}: {e}")
  45. return None
  46. with concurrent.futures.ThreadPoolExecutor() as executor:
  47. future_to_link = {executor.submit(load_web_page, link): link for link in links}
  48. for future in tqdm(concurrent.futures.as_completed(future_to_link), total=len(links), desc="Loading pages"):
  49. link = future_to_link[future]
  50. try:
  51. data = future.result()
  52. if data:
  53. output.extend(data)
  54. except Exception as e:
  55. logging.error(f"Error loading page {link}: {e}")
  56. return {"doc_id": doc_id, "data": output}