youtube_channel.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import concurrent.futures
  2. import hashlib
  3. import logging
  4. from embedchain.loaders.base_loader import BaseLoader
  5. from embedchain.loaders.youtube_video import YoutubeVideoLoader
  6. class YoutubeChannelLoader(BaseLoader):
  7. """Loader for youtube channel."""
  8. def load_data(self, channel_name):
  9. try:
  10. import yt_dlp
  11. except ImportError as e:
  12. raise ValueError(
  13. "YoutubeLoader requires extra dependencies. Install with `pip install --upgrade 'embedchain[youtube_channel]'`" # noqa: E501
  14. ) from e
  15. data = []
  16. data_urls = []
  17. youtube_url = f"https://www.youtube.com/{channel_name}/videos"
  18. youtube_video_loader = YoutubeVideoLoader()
  19. def _get_yt_video_links():
  20. try:
  21. ydl_opts = {
  22. "quiet": True,
  23. "extract_flat": True,
  24. }
  25. with yt_dlp.YoutubeDL(ydl_opts) as ydl:
  26. info_dict = ydl.extract_info(youtube_url, download=False)
  27. if "entries" in info_dict:
  28. videos = [entry["url"] for entry in info_dict["entries"]]
  29. return videos
  30. except Exception:
  31. logging.error(f"Failed to fetch youtube videos for channel: {channel_name}")
  32. return []
  33. def _load_yt_video(video_link):
  34. try:
  35. each_load_data = youtube_video_loader.load_data(video_link)
  36. if each_load_data:
  37. return each_load_data.get("data")
  38. except Exception as e:
  39. logging.error(f"Failed to load youtube video {video_link}: {e}")
  40. return None
  41. def _add_youtube_channel():
  42. video_links = _get_yt_video_links()
  43. with concurrent.futures.ThreadPoolExecutor() as executor:
  44. future_to_video = {
  45. executor.submit(_load_yt_video, video_link): video_link for video_link in video_links
  46. } # noqa: E501
  47. for future in concurrent.futures.as_completed(future_to_video):
  48. video = future_to_video[future]
  49. try:
  50. results = future.result()
  51. if results:
  52. data.extend(results)
  53. data_urls.extend([result.get("meta_data").get("url") for result in results])
  54. except Exception as e:
  55. logging.error(f"Failed to process youtube video {video}: {e}")
  56. _add_youtube_channel()
  57. doc_id = hashlib.sha256((youtube_url + ", ".join(data_urls)).encode()).hexdigest()
  58. return {
  59. "doc_id": doc_id,
  60. "data": data,
  61. }