beehiiv.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import hashlib
  2. import logging
  3. import time
  4. from xml.etree import ElementTree
  5. import requests
  6. from embedchain.helpers.json_serializable import register_deserializable
  7. from embedchain.loaders.base_loader import BaseLoader
  8. from embedchain.utils.misc import is_readable
  9. logger = logging.getLogger(__name__)
  10. @register_deserializable
  11. class BeehiivLoader(BaseLoader):
  12. """
  13. This loader is used to load data from Beehiiv URLs.
  14. """
  15. def load_data(self, url: str):
  16. try:
  17. from bs4 import BeautifulSoup
  18. from bs4.builder import ParserRejectedMarkup
  19. except ImportError:
  20. raise ImportError(
  21. 'Beehiiv requires extra dependencies. Install with `pip install --upgrade "embedchain[dataloaders]"`'
  22. ) from None
  23. if not url.endswith("sitemap.xml"):
  24. url = url + "/sitemap.xml"
  25. output = []
  26. # we need to set this as a header to avoid 403
  27. headers = {
  28. "User-Agent": (
  29. "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) "
  30. "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 "
  31. "Safari/537.36"
  32. ),
  33. }
  34. response = requests.get(url, headers=headers)
  35. try:
  36. response.raise_for_status()
  37. except requests.exceptions.HTTPError as e:
  38. raise ValueError(
  39. f"""
  40. Failed to load {url}: {e}. Please use the root substack URL. For example, https://example.substack.com
  41. """
  42. )
  43. try:
  44. ElementTree.fromstring(response.content)
  45. except ElementTree.ParseError:
  46. raise ValueError(
  47. f"""
  48. Failed to parse {url}. Please use the root substack URL. For example, https://example.substack.com
  49. """
  50. )
  51. soup = BeautifulSoup(response.text, "xml")
  52. links = [link.text for link in soup.find_all("loc") if link.parent.name == "url" and "/p/" in link.text]
  53. if len(links) == 0:
  54. links = [link.text for link in soup.find_all("loc") if "/p/" in link.text]
  55. doc_id = hashlib.sha256((" ".join(links) + url).encode()).hexdigest()
  56. def serialize_response(soup: BeautifulSoup):
  57. data = {}
  58. h1_el = soup.find("h1")
  59. if h1_el is not None:
  60. data["title"] = h1_el.text
  61. description_el = soup.find("meta", {"name": "description"})
  62. if description_el is not None:
  63. data["description"] = description_el["content"]
  64. content_el = soup.find("div", {"id": "content-blocks"})
  65. if content_el is not None:
  66. data["content"] = content_el.text
  67. return data
  68. def load_link(link: str):
  69. try:
  70. beehiiv_data = requests.get(link, headers=headers)
  71. beehiiv_data.raise_for_status()
  72. soup = BeautifulSoup(beehiiv_data.text, "html.parser")
  73. data = serialize_response(soup)
  74. data = str(data)
  75. if is_readable(data):
  76. return data
  77. else:
  78. logger.warning(f"Page is not readable (too many invalid characters): {link}")
  79. except ParserRejectedMarkup as e:
  80. logger.error(f"Failed to parse {link}: {e}")
  81. return None
  82. for link in links:
  83. data = load_link(link)
  84. if data:
  85. output.append({"content": data, "meta_data": {"url": link}})
  86. # TODO: allow users to configure this
  87. time.sleep(1.0) # added to avoid rate limiting
  88. return {"doc_id": doc_id, "data": output}