youtube_channel.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import concurrent.futures
  2. import hashlib
  3. import logging
  4. from tqdm import tqdm
  5. from embedchain.loaders.base_loader import BaseLoader
  6. from embedchain.loaders.youtube_video import YoutubeVideoLoader
  7. class YoutubeChannelLoader(BaseLoader):
  8. """Loader for youtube channel."""
  9. def load_data(self, channel_name):
  10. try:
  11. import yt_dlp
  12. except ImportError as e:
  13. raise ValueError(
  14. "YoutubeLoader requires extra dependencies. Install with `pip install --upgrade 'embedchain[youtube_channel]'`" # noqa: E501
  15. ) from e
  16. data = []
  17. data_urls = []
  18. youtube_url = f"https://www.youtube.com/{channel_name}/videos"
  19. youtube_video_loader = YoutubeVideoLoader()
  20. def _get_yt_video_links():
  21. try:
  22. ydl_opts = {
  23. "quiet": True,
  24. "extract_flat": True,
  25. }
  26. with yt_dlp.YoutubeDL(ydl_opts) as ydl:
  27. info_dict = ydl.extract_info(youtube_url, download=False)
  28. if "entries" in info_dict:
  29. videos = [entry["url"] for entry in info_dict["entries"]]
  30. return videos
  31. except Exception:
  32. logging.error(f"Failed to fetch youtube videos for channel: {channel_name}")
  33. return []
  34. def _load_yt_video(video_link):
  35. try:
  36. each_load_data = youtube_video_loader.load_data(video_link)
  37. if each_load_data:
  38. return each_load_data.get("data")
  39. except Exception as e:
  40. logging.error(f"Failed to load youtube video {video_link}: {e}")
  41. return None
  42. def _add_youtube_channel():
  43. video_links = _get_yt_video_links()
  44. logging.info("Loading videos from youtube channel...")
  45. with concurrent.futures.ThreadPoolExecutor() as executor:
  46. # Submitting all tasks and storing the future object with the video link
  47. future_to_video = {
  48. executor.submit(_load_yt_video, video_link): video_link for video_link in video_links
  49. }
  50. for future in tqdm(
  51. concurrent.futures.as_completed(future_to_video), total=len(video_links), desc="Processing videos"
  52. ):
  53. video = future_to_video[future]
  54. try:
  55. results = future.result()
  56. if results:
  57. data.extend(results)
  58. data_urls.extend([result.get("meta_data").get("url") for result in results])
  59. except Exception as e:
  60. logging.error(f"Failed to process youtube video {video}: {e}")
  61. _add_youtube_channel()
  62. doc_id = hashlib.sha256((youtube_url + ", ".join(data_urls)).encode()).hexdigest()
  63. return {
  64. "doc_id": doc_id,
  65. "data": data,
  66. }