123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import hashlib
- import logging
- import os
- import ssl
- from typing import Any, Optional
- import certifi
- from embedchain.loaders.base_loader import BaseLoader
- from embedchain.utils.misc import clean_string
- SLACK_API_BASE_URL = "https://www.slack.com/api/"
- class SlackLoader(BaseLoader):
- def __init__(self, config: Optional[dict[str, Any]] = None):
- super().__init__()
- self.config = config if config else {}
- if "base_url" not in self.config:
- self.config["base_url"] = SLACK_API_BASE_URL
- self.client = None
- self._setup_loader(self.config)
- def _setup_loader(self, config: dict[str, Any]):
- try:
- from slack_sdk import WebClient
- except ImportError as e:
- raise ImportError(
- "Slack loader requires extra dependencies. \
- Install with `pip install --upgrade embedchain[slack]`"
- ) from e
- if os.getenv("SLACK_USER_TOKEN") is None:
- raise ValueError(
- "SLACK_USER_TOKEN environment variables not provided. Check `https://docs.embedchain.ai/data-sources/slack` to learn more." # noqa:E501
- )
- logging.info(f"Creating Slack Loader with config: {config}")
- # get slack client config params
- slack_bot_token = os.getenv("SLACK_USER_TOKEN")
- ssl_cert = ssl.create_default_context(cafile=certifi.where())
- base_url = config.get("base_url", SLACK_API_BASE_URL)
- headers = config.get("headers")
- # for Org-Wide App
- team_id = config.get("team_id")
- self.client = WebClient(
- token=slack_bot_token,
- base_url=base_url,
- ssl=ssl_cert,
- headers=headers,
- team_id=team_id,
- )
- logging.info("Slack Loader setup successful!")
- @staticmethod
- def _check_query(query):
- if not isinstance(query, str):
- raise ValueError(
- f"Invalid query passed to Slack loader, found: {query}. Check `https://docs.embedchain.ai/data-sources/slack` to learn more." # noqa:E501
- )
- def load_data(self, query):
- self._check_query(query)
- try:
- data = []
- data_content = []
- logging.info(f"Searching slack conversations for query: {query}")
- results = self.client.search_messages(
- query=query,
- sort="timestamp",
- sort_dir="desc",
- count=self.config.get("count", 100),
- )
- messages = results.get("messages")
- num_message = len(messages)
- logging.info(f"Found {num_message} messages for query: {query}")
- matches = messages.get("matches", [])
- for message in matches:
- url = message.get("permalink")
- text = message.get("text")
- content = clean_string(text)
- message_meta_data_keys = ["iid", "team", "ts", "type", "user", "username"]
- meta_data = {}
- for key in message.keys():
- if key in message_meta_data_keys:
- meta_data[key] = message.get(key)
- meta_data.update({"url": url})
- data.append(
- {
- "content": content,
- "meta_data": meta_data,
- }
- )
- data_content.append(content)
- doc_id = hashlib.md5((query + ", ".join(data_content)).encode()).hexdigest()
- return {
- "doc_id": doc_id,
- "data": data,
- }
- except Exception as e:
- logging.warning(f"Error in loading slack data: {e}")
- raise ValueError(
- f"Error in loading slack data: {e}. Check `https://docs.embedchain.ai/data-sources/slack` to learn more." # noqa:E501
- ) from e
|