substack.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import hashlib
  2. import logging
  3. import time
  4. from xml.etree import ElementTree
  5. import requests
  6. from embedchain.helpers.json_serializable import register_deserializable
  7. from embedchain.loaders.base_loader import BaseLoader
  8. from embedchain.utils.misc import is_readable
  9. @register_deserializable
  10. class SubstackLoader(BaseLoader):
  11. """
  12. This loader is used to load data from Substack URLs.
  13. """
  14. def load_data(self, url: str):
  15. try:
  16. from bs4 import BeautifulSoup
  17. from bs4.builder import ParserRejectedMarkup
  18. except ImportError:
  19. raise ImportError(
  20. 'Substack requires extra dependencies. Install with `pip install --upgrade "embedchain[dataloaders]"`'
  21. ) from None
  22. if not url.endswith("sitemap.xml"):
  23. url = url + "/sitemap.xml"
  24. output = []
  25. response = requests.get(url)
  26. try:
  27. response.raise_for_status()
  28. except requests.exceptions.HTTPError as e:
  29. raise ValueError(
  30. f"""
  31. Failed to load {url}: {e}. Please use the root substack URL. For example, https://example.substack.com
  32. """
  33. )
  34. try:
  35. ElementTree.fromstring(response.content)
  36. except ElementTree.ParseError:
  37. raise ValueError(
  38. f"""
  39. Failed to parse {url}. Please use the root substack URL. For example, https://example.substack.com
  40. """
  41. )
  42. soup = BeautifulSoup(response.text, "xml")
  43. links = [link.text for link in soup.find_all("loc") if link.parent.name == "url" and "/p/" in link.text]
  44. if len(links) == 0:
  45. links = [link.text for link in soup.find_all("loc") if "/p/" in link.text]
  46. doc_id = hashlib.sha256((" ".join(links) + url).encode()).hexdigest()
  47. def serialize_response(soup: BeautifulSoup):
  48. data = {}
  49. h1_els = soup.find_all("h1")
  50. if h1_els is not None and len(h1_els) > 0:
  51. data["title"] = h1_els[1].text
  52. description_el = soup.find("meta", {"name": "description"})
  53. if description_el is not None:
  54. data["description"] = description_el["content"]
  55. content_el = soup.find("div", {"class": "available-content"})
  56. if content_el is not None:
  57. data["content"] = content_el.text
  58. like_btn = soup.find("div", {"class": "like-button-container"})
  59. if like_btn is not None:
  60. no_of_likes_div = like_btn.find("div", {"class": "label"})
  61. if no_of_likes_div is not None:
  62. data["no_of_likes"] = no_of_likes_div.text
  63. return data
  64. def load_link(link: str):
  65. try:
  66. substack_data = requests.get(link)
  67. substack_data.raise_for_status()
  68. soup = BeautifulSoup(substack_data.text, "html.parser")
  69. data = serialize_response(soup)
  70. data = str(data)
  71. if is_readable(data):
  72. return data
  73. else:
  74. logging.warning(f"Page is not readable (too many invalid characters): {link}")
  75. except ParserRejectedMarkup as e:
  76. logging.error(f"Failed to parse {link}: {e}")
  77. return None
  78. for link in links:
  79. data = load_link(link)
  80. if data:
  81. output.append({"content": data, "meta_data": {"url": link}})
  82. # TODO: allow users to configure this
  83. time.sleep(1.0) # added to avoid rate limiting
  84. return {"doc_id": doc_id, "data": output}