gmail.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import base64
  2. import hashlib
  3. import logging
  4. import os
  5. from email import message_from_bytes
  6. from email.utils import parsedate_to_datetime
  7. from textwrap import dedent
  8. from typing import Optional
  9. from bs4 import BeautifulSoup
  10. try:
  11. from google.auth.transport.requests import Request
  12. from google.oauth2.credentials import Credentials
  13. from google_auth_oauthlib.flow import InstalledAppFlow
  14. from googleapiclient.discovery import build
  15. except ImportError:
  16. raise ImportError(
  17. 'Gmail requires extra dependencies. Install with `pip install --upgrade "embedchain[gmail]"`'
  18. ) from None
  19. from embedchain.loaders.base_loader import BaseLoader
  20. from embedchain.utils.misc import clean_string
  21. class GmailReader:
  22. SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
  23. def __init__(self, query: str, service=None, results_per_page: int = 10):
  24. self.query = query
  25. self.service = service or self._initialize_service()
  26. self.results_per_page = results_per_page
  27. @staticmethod
  28. def _initialize_service():
  29. credentials = GmailReader._get_credentials()
  30. return build("gmail", "v1", credentials=credentials)
  31. @staticmethod
  32. def _get_credentials():
  33. if not os.path.exists("credentials.json"):
  34. raise FileNotFoundError("Missing 'credentials.json'. Download it from your Google Developer account.")
  35. creds = (
  36. Credentials.from_authorized_user_file("token.json", GmailReader.SCOPES)
  37. if os.path.exists("token.json")
  38. else None
  39. )
  40. if not creds or not creds.valid:
  41. if creds and creds.expired and creds.refresh_token:
  42. creds.refresh(Request())
  43. else:
  44. flow = InstalledAppFlow.from_client_secrets_file("credentials.json", GmailReader.SCOPES)
  45. creds = flow.run_local_server(port=8080)
  46. with open("token.json", "w") as token:
  47. token.write(creds.to_json())
  48. return creds
  49. def load_emails(self) -> list[dict]:
  50. response = self.service.users().messages().list(userId="me", q=self.query).execute()
  51. messages = response.get("messages", [])
  52. return [self._parse_email(self._get_email(message["id"])) for message in messages]
  53. def _get_email(self, message_id: str):
  54. raw_message = self.service.users().messages().get(userId="me", id=message_id, format="raw").execute()
  55. return base64.urlsafe_b64decode(raw_message["raw"])
  56. def _parse_email(self, raw_email) -> dict:
  57. mime_msg = message_from_bytes(raw_email)
  58. return {
  59. "subject": self._get_header(mime_msg, "Subject"),
  60. "from": self._get_header(mime_msg, "From"),
  61. "to": self._get_header(mime_msg, "To"),
  62. "date": self._format_date(mime_msg),
  63. "body": self._get_body(mime_msg),
  64. }
  65. @staticmethod
  66. def _get_header(mime_msg, header_name: str) -> str:
  67. return mime_msg.get(header_name, "")
  68. @staticmethod
  69. def _format_date(mime_msg) -> Optional[str]:
  70. date_header = GmailReader._get_header(mime_msg, "Date")
  71. return parsedate_to_datetime(date_header).isoformat() if date_header else None
  72. @staticmethod
  73. def _get_body(mime_msg) -> str:
  74. def decode_payload(part):
  75. charset = part.get_content_charset() or "utf-8"
  76. try:
  77. return part.get_payload(decode=True).decode(charset)
  78. except UnicodeDecodeError:
  79. return part.get_payload(decode=True).decode(charset, errors="replace")
  80. if mime_msg.is_multipart():
  81. for part in mime_msg.walk():
  82. ctype = part.get_content_type()
  83. cdispo = str(part.get("Content-Disposition"))
  84. if ctype == "text/plain" and "attachment" not in cdispo:
  85. return decode_payload(part)
  86. elif ctype == "text/html":
  87. return decode_payload(part)
  88. else:
  89. return decode_payload(mime_msg)
  90. return ""
  91. class GmailLoader(BaseLoader):
  92. def load_data(self, query: str):
  93. reader = GmailReader(query=query)
  94. emails = reader.load_emails()
  95. logging.info(f"Gmail Loader: {len(emails)} emails found for query '{query}'")
  96. data = []
  97. for email in emails:
  98. content = self._process_email(email)
  99. data.append({"content": content, "meta_data": email})
  100. return {"doc_id": self._generate_doc_id(query, data), "data": data}
  101. @staticmethod
  102. def _process_email(email: dict) -> str:
  103. content = BeautifulSoup(email["body"], "html.parser").get_text()
  104. content = clean_string(content)
  105. return dedent(
  106. f"""
  107. Email from '{email['from']}' to '{email['to']}'
  108. Subject: {email['subject']}
  109. Date: {email['date']}
  110. Content: {content}
  111. """
  112. )
  113. @staticmethod
  114. def _generate_doc_id(query: str, data: list[dict]) -> str:
  115. content_strings = [email["content"] for email in data]
  116. return hashlib.sha256((query + ", ".join(content_strings)).encode()).hexdigest()