gmail.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import base64
  2. import hashlib
  3. import logging
  4. import os
  5. from email import message_from_bytes
  6. from email.utils import parsedate_to_datetime
  7. from textwrap import dedent
  8. from typing import Optional
  9. from bs4 import BeautifulSoup
  10. try:
  11. from google.auth.transport.requests import Request
  12. from google.oauth2.credentials import Credentials
  13. from google_auth_oauthlib.flow import InstalledAppFlow
  14. from googleapiclient.discovery import build
  15. except ImportError:
  16. raise ImportError(
  17. 'Gmail requires extra dependencies. Install with `pip install --upgrade "embedchain[gmail]"`'
  18. ) from None
  19. from embedchain.loaders.base_loader import BaseLoader
  20. from embedchain.utils.misc import clean_string
  21. logger = logging.getLogger(__name__)
  22. class GmailReader:
  23. SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
  24. def __init__(self, query: str, service=None, results_per_page: int = 10):
  25. self.query = query
  26. self.service = service or self._initialize_service()
  27. self.results_per_page = results_per_page
  28. @staticmethod
  29. def _initialize_service():
  30. credentials = GmailReader._get_credentials()
  31. return build("gmail", "v1", credentials=credentials)
  32. @staticmethod
  33. def _get_credentials():
  34. if not os.path.exists("credentials.json"):
  35. raise FileNotFoundError("Missing 'credentials.json'. Download it from your Google Developer account.")
  36. creds = (
  37. Credentials.from_authorized_user_file("token.json", GmailReader.SCOPES)
  38. if os.path.exists("token.json")
  39. else None
  40. )
  41. if not creds or not creds.valid:
  42. if creds and creds.expired and creds.refresh_token:
  43. creds.refresh(Request())
  44. else:
  45. flow = InstalledAppFlow.from_client_secrets_file("credentials.json", GmailReader.SCOPES)
  46. creds = flow.run_local_server(port=8080)
  47. with open("token.json", "w") as token:
  48. token.write(creds.to_json())
  49. return creds
  50. def load_emails(self) -> list[dict]:
  51. response = self.service.users().messages().list(userId="me", q=self.query).execute()
  52. messages = response.get("messages", [])
  53. return [self._parse_email(self._get_email(message["id"])) for message in messages]
  54. def _get_email(self, message_id: str):
  55. raw_message = self.service.users().messages().get(userId="me", id=message_id, format="raw").execute()
  56. return base64.urlsafe_b64decode(raw_message["raw"])
  57. def _parse_email(self, raw_email) -> dict:
  58. mime_msg = message_from_bytes(raw_email)
  59. return {
  60. "subject": self._get_header(mime_msg, "Subject"),
  61. "from": self._get_header(mime_msg, "From"),
  62. "to": self._get_header(mime_msg, "To"),
  63. "date": self._format_date(mime_msg),
  64. "body": self._get_body(mime_msg),
  65. }
  66. @staticmethod
  67. def _get_header(mime_msg, header_name: str) -> str:
  68. return mime_msg.get(header_name, "")
  69. @staticmethod
  70. def _format_date(mime_msg) -> Optional[str]:
  71. date_header = GmailReader._get_header(mime_msg, "Date")
  72. return parsedate_to_datetime(date_header).isoformat() if date_header else None
  73. @staticmethod
  74. def _get_body(mime_msg) -> str:
  75. def decode_payload(part):
  76. charset = part.get_content_charset() or "utf-8"
  77. try:
  78. return part.get_payload(decode=True).decode(charset)
  79. except UnicodeDecodeError:
  80. return part.get_payload(decode=True).decode(charset, errors="replace")
  81. if mime_msg.is_multipart():
  82. for part in mime_msg.walk():
  83. ctype = part.get_content_type()
  84. cdispo = str(part.get("Content-Disposition"))
  85. if ctype == "text/plain" and "attachment" not in cdispo:
  86. return decode_payload(part)
  87. elif ctype == "text/html":
  88. return decode_payload(part)
  89. else:
  90. return decode_payload(mime_msg)
  91. return ""
  92. class GmailLoader(BaseLoader):
  93. def load_data(self, query: str):
  94. reader = GmailReader(query=query)
  95. emails = reader.load_emails()
  96. logger.info(f"Gmail Loader: {len(emails)} emails found for query '{query}'")
  97. data = []
  98. for email in emails:
  99. content = self._process_email(email)
  100. data.append({"content": content, "meta_data": email})
  101. return {"doc_id": self._generate_doc_id(query, data), "data": data}
  102. @staticmethod
  103. def _process_email(email: dict) -> str:
  104. content = BeautifulSoup(email["body"], "html.parser").get_text()
  105. content = clean_string(content)
  106. return dedent(
  107. f"""
  108. Email from '{email['from']}' to '{email['to']}'
  109. Subject: {email['subject']}
  110. Date: {email['date']}
  111. Content: {content}
  112. """
  113. )
  114. @staticmethod
  115. def _generate_doc_id(query: str, data: list[dict]) -> str:
  116. content_strings = [email["content"] for email in data]
  117. return hashlib.sha256((query + ", ".join(content_strings)).encode()).hexdigest()