substack.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import hashlib
  2. import logging
  3. import time
  4. import requests
  5. from embedchain.helper.json_serializable import register_deserializable
  6. from embedchain.loaders.base_loader import BaseLoader
  7. from embedchain.utils import is_readable
  8. @register_deserializable
  9. class SubstackLoader(BaseLoader):
  10. """
  11. This method takes a sitemap URL as input and retrieves
  12. all the URLs to use the WebPageLoader to load content
  13. of each page.
  14. """
  15. def load_data(self, url: str):
  16. try:
  17. from bs4 import BeautifulSoup
  18. from bs4.builder import ParserRejectedMarkup
  19. except ImportError:
  20. raise ImportError(
  21. 'Substack requires extra dependencies. Install with `pip install --upgrade "embedchain[dataloaders]"`'
  22. ) from None
  23. output = []
  24. response = requests.get(url)
  25. response.raise_for_status()
  26. soup = BeautifulSoup(response.text, "xml")
  27. links = [link.text for link in soup.find_all("loc") if link.parent.name == "url" and "/p/" in link.text]
  28. if len(links) == 0:
  29. links = [link.text for link in soup.find_all("loc") if "/p/" in link.text]
  30. doc_id = hashlib.sha256((" ".join(links) + url).encode()).hexdigest()
  31. def serialize_response(soup: BeautifulSoup):
  32. data = {}
  33. h1_els = soup.find_all("h1")
  34. if h1_els is not None and len(h1_els) > 0:
  35. data["title"] = h1_els[1].text
  36. description_el = soup.find("meta", {"name": "description"})
  37. if description_el is not None:
  38. data["description"] = description_el["content"]
  39. content_el = soup.find("div", {"class": "available-content"})
  40. if content_el is not None:
  41. data["content"] = content_el.text
  42. like_btn = soup.find("div", {"class": "like-button-container"})
  43. if like_btn is not None:
  44. no_of_likes_div = like_btn.find("div", {"class": "label"})
  45. if no_of_likes_div is not None:
  46. data["no_of_likes"] = no_of_likes_div.text
  47. return data
  48. def load_link(link: str):
  49. try:
  50. each_load_data = requests.get(link)
  51. each_load_data.raise_for_status()
  52. soup = BeautifulSoup(response.text, "html.parser")
  53. data = serialize_response(soup)
  54. data = str(data)
  55. if is_readable(data):
  56. return data
  57. else:
  58. logging.warning(f"Page is not readable (too many invalid characters): {link}")
  59. except ParserRejectedMarkup as e:
  60. logging.error(f"Failed to parse {link}: {e}")
  61. return None
  62. for link in links:
  63. data = load_link(link)
  64. if data:
  65. output.append({"content": data, "meta_data": {"url": link}})
  66. # TODO: allow users to configure this
  67. time.sleep(1.0) # added to avoid rate limiting
  68. return {"doc_id": doc_id, "data": output}