youtube_channel.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import concurrent.futures
  2. import hashlib
  3. import logging
  4. from tqdm import tqdm
  5. from embedchain.loaders.base_loader import BaseLoader
  6. from embedchain.loaders.youtube_video import YoutubeVideoLoader
  7. logger = logging.getLogger(__name__)
  8. class YoutubeChannelLoader(BaseLoader):
  9. """Loader for youtube channel."""
  10. def load_data(self, channel_name):
  11. try:
  12. import yt_dlp
  13. except ImportError as e:
  14. raise ValueError(
  15. "YoutubeChannelLoader requires extra dependencies. Install with `pip install yt_dlp==2023.11.14 youtube-transcript-api==0.6.1`" # noqa: E501
  16. ) from e
  17. data = []
  18. data_urls = []
  19. youtube_url = f"https://www.youtube.com/{channel_name}/videos"
  20. youtube_video_loader = YoutubeVideoLoader()
  21. def _get_yt_video_links():
  22. try:
  23. ydl_opts = {
  24. "quiet": True,
  25. "extract_flat": True,
  26. }
  27. with yt_dlp.YoutubeDL(ydl_opts) as ydl:
  28. info_dict = ydl.extract_info(youtube_url, download=False)
  29. if "entries" in info_dict:
  30. videos = [entry["url"] for entry in info_dict["entries"]]
  31. return videos
  32. except Exception:
  33. logger.error(f"Failed to fetch youtube videos for channel: {channel_name}")
  34. return []
  35. def _load_yt_video(video_link):
  36. try:
  37. each_load_data = youtube_video_loader.load_data(video_link)
  38. if each_load_data:
  39. return each_load_data.get("data")
  40. except Exception as e:
  41. logger.error(f"Failed to load youtube video {video_link}: {e}")
  42. return None
  43. def _add_youtube_channel():
  44. video_links = _get_yt_video_links()
  45. logger.info("Loading videos from youtube channel...")
  46. with concurrent.futures.ThreadPoolExecutor() as executor:
  47. # Submitting all tasks and storing the future object with the video link
  48. future_to_video = {
  49. executor.submit(_load_yt_video, video_link): video_link for video_link in video_links
  50. }
  51. for future in tqdm(
  52. concurrent.futures.as_completed(future_to_video), total=len(video_links), desc="Processing videos"
  53. ):
  54. video = future_to_video[future]
  55. try:
  56. results = future.result()
  57. if results:
  58. data.extend(results)
  59. data_urls.extend([result.get("meta_data").get("url") for result in results])
  60. except Exception as e:
  61. logger.error(f"Failed to process youtube video {video}: {e}")
  62. _add_youtube_channel()
  63. doc_id = hashlib.sha256((youtube_url + ", ".join(data_urls)).encode()).hexdigest()
  64. return {
  65. "doc_id": doc_id,
  66. "data": data,
  67. }