slack.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import hashlib
  2. import logging
  3. import os
  4. import ssl
  5. from typing import Any, Optional
  6. import certifi
  7. from embedchain.loaders.base_loader import BaseLoader
  8. from embedchain.utils.misc import clean_string
  9. SLACK_API_BASE_URL = "https://www.slack.com/api/"
  10. logger = logging.getLogger(__name__)
  11. class SlackLoader(BaseLoader):
  12. def __init__(self, config: Optional[dict[str, Any]] = None):
  13. super().__init__()
  14. self.config = config if config else {}
  15. if "base_url" not in self.config:
  16. self.config["base_url"] = SLACK_API_BASE_URL
  17. self.client = None
  18. self._setup_loader(self.config)
  19. def _setup_loader(self, config: dict[str, Any]):
  20. try:
  21. from slack_sdk import WebClient
  22. except ImportError as e:
  23. raise ImportError(
  24. "Slack loader requires extra dependencies. \
  25. Install with `pip install --upgrade embedchain[slack]`"
  26. ) from e
  27. if os.getenv("SLACK_USER_TOKEN") is None:
  28. raise ValueError(
  29. "SLACK_USER_TOKEN environment variables not provided. Check `https://docs.embedchain.ai/data-sources/slack` to learn more." # noqa:E501
  30. )
  31. logger.info(f"Creating Slack Loader with config: {config}")
  32. # get slack client config params
  33. slack_bot_token = os.getenv("SLACK_USER_TOKEN")
  34. ssl_cert = ssl.create_default_context(cafile=certifi.where())
  35. base_url = config.get("base_url", SLACK_API_BASE_URL)
  36. headers = config.get("headers")
  37. # for Org-Wide App
  38. team_id = config.get("team_id")
  39. self.client = WebClient(
  40. token=slack_bot_token,
  41. base_url=base_url,
  42. ssl=ssl_cert,
  43. headers=headers,
  44. team_id=team_id,
  45. )
  46. logger.info("Slack Loader setup successful!")
  47. @staticmethod
  48. def _check_query(query):
  49. if not isinstance(query, str):
  50. raise ValueError(
  51. f"Invalid query passed to Slack loader, found: {query}. Check `https://docs.embedchain.ai/data-sources/slack` to learn more." # noqa:E501
  52. )
  53. def load_data(self, query):
  54. self._check_query(query)
  55. try:
  56. data = []
  57. data_content = []
  58. logger.info(f"Searching slack conversations for query: {query}")
  59. results = self.client.search_messages(
  60. query=query,
  61. sort="timestamp",
  62. sort_dir="desc",
  63. count=self.config.get("count", 100),
  64. )
  65. messages = results.get("messages")
  66. num_message = len(messages)
  67. logger.info(f"Found {num_message} messages for query: {query}")
  68. matches = messages.get("matches", [])
  69. for message in matches:
  70. url = message.get("permalink")
  71. text = message.get("text")
  72. content = clean_string(text)
  73. message_meta_data_keys = ["iid", "team", "ts", "type", "user", "username"]
  74. metadata = {}
  75. for key in message.keys():
  76. if key in message_meta_data_keys:
  77. metadata[key] = message.get(key)
  78. metadata.update({"url": url})
  79. data.append(
  80. {
  81. "content": content,
  82. "meta_data": metadata,
  83. }
  84. )
  85. data_content.append(content)
  86. doc_id = hashlib.md5((query + ", ".join(data_content)).encode()).hexdigest()
  87. return {
  88. "doc_id": doc_id,
  89. "data": data,
  90. }
  91. except Exception as e:
  92. logger.warning(f"Error in loading slack data: {e}")
  93. raise ValueError(
  94. f"Error in loading slack data: {e}. Check `https://docs.embedchain.ai/data-sources/slack` to learn more." # noqa:E501
  95. ) from e