123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- import ast
- import concurrent.futures
- import json
- import logging
- import os
- from typing import Any, Optional, Union
- import requests
- import yaml
- from tqdm import tqdm
- from mem0 import Mem0
- from embedchain.cache import (
- Config,
- ExactMatchEvaluation,
- SearchDistanceEvaluation,
- cache,
- gptcache_data_manager,
- gptcache_pre_function,
- )
- from embedchain.client import Client
- from embedchain.config import AppConfig, CacheConfig, ChunkerConfig, Mem0Config
- from embedchain.core.db.database import get_session, init_db, setup_engine
- from embedchain.core.db.models import DataSource
- from embedchain.embedchain import EmbedChain
- from embedchain.embedder.base import BaseEmbedder
- from embedchain.embedder.openai import OpenAIEmbedder
- from embedchain.evaluation.base import BaseMetric
- from embedchain.evaluation.metrics import AnswerRelevance, ContextRelevance, Groundedness
- from embedchain.factory import EmbedderFactory, LlmFactory, VectorDBFactory
- from embedchain.helpers.json_serializable import register_deserializable
- from embedchain.llm.base import BaseLlm
- from embedchain.llm.openai import OpenAILlm
- from embedchain.telemetry.posthog import AnonymousTelemetry
- from embedchain.utils.evaluation import EvalData, EvalMetric
- from embedchain.utils.misc import validate_config
- from embedchain.vectordb.base import BaseVectorDB
- from embedchain.vectordb.chroma import ChromaDB
- logger = logging.getLogger(__name__)
- @register_deserializable
- class App(EmbedChain):
- """
- EmbedChain App lets you create a LLM powered app for your unstructured
- data by defining your chosen data source, embedding model,
- and vector database.
- """
- def __init__(
- self,
- id: str = None,
- name: str = None,
- config: AppConfig = None,
- db: BaseVectorDB = None,
- embedding_model: BaseEmbedder = None,
- llm: BaseLlm = None,
- config_data: dict = None,
- auto_deploy: bool = False,
- chunker: ChunkerConfig = None,
- cache_config: CacheConfig = None,
- memory_config: Mem0Config = None,
- log_level: int = logging.WARN,
- ):
- """
- Initialize a new `App` instance.
- :param config: Configuration for the pipeline, defaults to None
- :type config: AppConfig, optional
- :param db: The database to use for storing and retrieving embeddings, defaults to None
- :type db: BaseVectorDB, optional
- :param embedding_model: The embedding model used to calculate embeddings, defaults to None
- :type embedding_model: BaseEmbedder, optional
- :param llm: The LLM model used to calculate embeddings, defaults to None
- :type llm: BaseLlm, optional
- :param config_data: Config dictionary, defaults to None
- :type config_data: dict, optional
- :param auto_deploy: Whether to deploy the pipeline automatically, defaults to False
- :type auto_deploy: bool, optional
- :raises Exception: If an error occurs while creating the pipeline
- """
- if id and config_data:
- raise Exception("Cannot provide both id and config. Please provide only one of them.")
- if id and name:
- raise Exception("Cannot provide both id and name. Please provide only one of them.")
- if name and config:
- raise Exception("Cannot provide both name and config. Please provide only one of them.")
- # Initialize the metadata db for the app
- setup_engine(database_uri=os.environ.get("EMBEDCHAIN_DB_URI"))
- init_db()
- self.auto_deploy = auto_deploy
- # Store the dict config as an attribute to be able to send it
- self.config_data = config_data if (config_data and validate_config(config_data)) else None
- self.client = None
- # pipeline_id from the backend
- self.id = None
- self.chunker = ChunkerConfig(**chunker) if chunker else None
- self.cache_config = cache_config
- self.memory_config = memory_config
- self.config = config or AppConfig()
- self.name = self.config.name
- self.config.id = self.local_id = "default-app-id" if self.config.id is None else self.config.id
- if id is not None:
- # Init client first since user is trying to fetch the pipeline
- # details from the platform
- self._init_client()
- pipeline_details = self._get_pipeline(id)
- self.config.id = self.local_id = pipeline_details["metadata"]["local_id"]
- self.id = id
- if name is not None:
- self.name = name
- self.embedding_model = embedding_model or OpenAIEmbedder()
- self.db = db or ChromaDB()
- self.llm = llm or OpenAILlm()
- self._init_db()
- # Session for the metadata db
- self.db_session = get_session()
- # If cache_config is provided, initializing the cache ...
- if self.cache_config is not None:
- self._init_cache()
- # If memory_config is provided, initializing the memory ...
- self.mem0_client = None
- if self.memory_config is not None:
- self.mem0_client = Mem0(api_key=self.memory_config.api_key)
- # Send anonymous telemetry
- self._telemetry_props = {"class": self.__class__.__name__}
- self.telemetry = AnonymousTelemetry(enabled=self.config.collect_metrics)
- self.telemetry.capture(event_name="init", properties=self._telemetry_props)
- self.user_asks = []
- if self.auto_deploy:
- self.deploy()
- def _init_db(self):
- """
- Initialize the database.
- """
- self.db._set_embedder(self.embedding_model)
- self.db._initialize()
- self.db.set_collection_name(self.db.config.collection_name)
- def _init_cache(self):
- if self.cache_config.similarity_eval_config.strategy == "exact":
- similarity_eval_func = ExactMatchEvaluation()
- else:
- similarity_eval_func = SearchDistanceEvaluation(
- max_distance=self.cache_config.similarity_eval_config.max_distance,
- positive=self.cache_config.similarity_eval_config.positive,
- )
- cache.init(
- pre_embedding_func=gptcache_pre_function,
- embedding_func=self.embedding_model.to_embeddings,
- data_manager=gptcache_data_manager(vector_dimension=self.embedding_model.vector_dimension),
- similarity_evaluation=similarity_eval_func,
- config=Config(**self.cache_config.init_config.as_dict()),
- )
- def _init_client(self):
- """
- Initialize the client.
- """
- config = Client.load_config()
- if config.get("api_key"):
- self.client = Client()
- else:
- api_key = input(
- "🔑 Enter your Embedchain API key. You can find the API key at https://app.embedchain.ai/settings/keys/ \n" # noqa: E501
- )
- self.client = Client(api_key=api_key)
- def _get_pipeline(self, id):
- """
- Get existing pipeline
- """
- print("🛠️ Fetching pipeline details from the platform...")
- url = f"{self.client.host}/api/v1/pipelines/{id}/cli/"
- r = requests.get(
- url,
- headers={"Authorization": f"Token {self.client.api_key}"},
- )
- if r.status_code == 404:
- raise Exception(f"❌ Pipeline with id {id} not found!")
- print(
- f"🎉 Pipeline loaded successfully! Pipeline url: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501
- )
- return r.json()
- def _create_pipeline(self):
- """
- Create a pipeline on the platform.
- """
- print("🛠️ Creating pipeline on the platform...")
- # self.config_data is a dict. Pass it inside the key 'yaml_config' to the backend
- payload = {
- "yaml_config": json.dumps(self.config_data),
- "name": self.name,
- "local_id": self.local_id,
- }
- url = f"{self.client.host}/api/v1/pipelines/cli/create/"
- r = requests.post(
- url,
- json=payload,
- headers={"Authorization": f"Token {self.client.api_key}"},
- )
- if r.status_code not in [200, 201]:
- raise Exception(f"❌ Error occurred while creating pipeline. API response: {r.text}")
- if r.status_code == 200:
- print(
- f"🎉🎉🎉 Existing pipeline found! View your pipeline: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501
- ) # noqa: E501
- elif r.status_code == 201:
- print(
- f"🎉🎉🎉 Pipeline created successfully! View your pipeline: https://app.embedchain.ai/pipelines/{r.json()['id']}\n" # noqa: E501
- )
- return r.json()
- def _get_presigned_url(self, data_type, data_value):
- payload = {"data_type": data_type, "data_value": data_value}
- r = requests.post(
- f"{self.client.host}/api/v1/pipelines/{self.id}/cli/presigned_url/",
- json=payload,
- headers={"Authorization": f"Token {self.client.api_key}"},
- )
- r.raise_for_status()
- return r.json()
- def _upload_file_to_presigned_url(self, presigned_url, file_path):
- try:
- with open(file_path, "rb") as file:
- response = requests.put(presigned_url, data=file)
- response.raise_for_status()
- return response.status_code == 200
- except Exception as e:
- logger.exception(f"Error occurred during file upload: {str(e)}")
- print("❌ Error occurred during file upload!")
- return False
- def _upload_data_to_pipeline(self, data_type, data_value, metadata=None):
- payload = {
- "data_type": data_type,
- "data_value": data_value,
- "metadata": metadata,
- }
- try:
- self._send_api_request(f"/api/v1/pipelines/{self.id}/cli/add/", payload)
- # print the local file path if user tries to upload a local file
- printed_value = metadata.get("file_path") if metadata.get("file_path") else data_value
- print(f"✅ Data of type: {data_type}, value: {printed_value} added successfully.")
- except Exception as e:
- print(f"❌ Error occurred during data upload for type {data_type}!. Error: {str(e)}")
- def _send_api_request(self, endpoint, payload):
- url = f"{self.client.host}{endpoint}"
- headers = {"Authorization": f"Token {self.client.api_key}"}
- response = requests.post(url, json=payload, headers=headers)
- response.raise_for_status()
- return response
- def _process_and_upload_data(self, data_hash, data_type, data_value):
- if os.path.isabs(data_value):
- presigned_url_data = self._get_presigned_url(data_type, data_value)
- presigned_url = presigned_url_data["presigned_url"]
- s3_key = presigned_url_data["s3_key"]
- if self._upload_file_to_presigned_url(presigned_url, file_path=data_value):
- metadata = {"file_path": data_value, "s3_key": s3_key}
- data_value = presigned_url
- else:
- logger.error(f"File upload failed for hash: {data_hash}")
- return False
- else:
- if data_type == "qna_pair":
- data_value = list(ast.literal_eval(data_value))
- metadata = {}
- try:
- self._upload_data_to_pipeline(data_type, data_value, metadata)
- self._mark_data_as_uploaded(data_hash)
- return True
- except Exception:
- print(f"❌ Error occurred during data upload for hash {data_hash}!")
- return False
- def _mark_data_as_uploaded(self, data_hash):
- self.db_session.query(DataSource).filter_by(hash=data_hash, app_id=self.local_id).update({"is_uploaded": 1})
- def get_data_sources(self):
- data_sources = self.db_session.query(DataSource).filter_by(app_id=self.local_id).all()
- results = []
- for row in data_sources:
- results.append({"data_type": row.type, "data_value": row.value, "metadata": row.meta_data})
- return results
- def deploy(self):
- if self.client is None:
- self._init_client()
- pipeline_data = self._create_pipeline()
- self.id = pipeline_data["id"]
- results = self.db_session.query(DataSource).filter_by(app_id=self.local_id, is_uploaded=0).all()
- if len(results) > 0:
- print("🛠️ Adding data to your pipeline...")
- for result in results:
- data_hash, data_type, data_value = result.hash, result.data_type, result.data_value
- self._process_and_upload_data(data_hash, data_type, data_value)
- # Send anonymous telemetry
- self.telemetry.capture(event_name="deploy", properties=self._telemetry_props)
- @classmethod
- def from_config(
- cls,
- config_path: Optional[str] = None,
- config: Optional[dict[str, Any]] = None,
- auto_deploy: bool = False,
- yaml_path: Optional[str] = None,
- ):
- """
- Instantiate a App object from a configuration.
- :param config_path: Path to the YAML or JSON configuration file.
- :type config_path: Optional[str]
- :param config: A dictionary containing the configuration.
- :type config: Optional[dict[str, Any]]
- :param auto_deploy: Whether to deploy the app automatically, defaults to False
- :type auto_deploy: bool, optional
- :param yaml_path: (Deprecated) Path to the YAML configuration file. Use config_path instead.
- :type yaml_path: Optional[str]
- :return: An instance of the App class.
- :rtype: App
- """
- # Backward compatibility for yaml_path
- if yaml_path and not config_path:
- config_path = yaml_path
- if config_path and config:
- raise ValueError("Please provide only one of config_path or config.")
- config_data = None
- if config_path:
- file_extension = os.path.splitext(config_path)[1]
- with open(config_path, "r", encoding="UTF-8") as file:
- if file_extension in [".yaml", ".yml"]:
- config_data = yaml.safe_load(file)
- elif file_extension == ".json":
- config_data = json.load(file)
- else:
- raise ValueError("config_path must be a path to a YAML or JSON file.")
- elif config and isinstance(config, dict):
- config_data = config
- else:
- logger.error(
- "Please provide either a config file path (YAML or JSON) or a config dictionary. Falling back to defaults because no config is provided.", # noqa: E501
- )
- config_data = {}
- # Validate the config
- validate_config(config_data)
- app_config_data = config_data.get("app", {}).get("config", {})
- vector_db_config_data = config_data.get("vectordb", {})
- embedding_model_config_data = config_data.get("embedding_model", config_data.get("embedder", {}))
- memory_config_data = config_data.get("memory", {})
- llm_config_data = config_data.get("llm", {})
- chunker_config_data = config_data.get("chunker", {})
- cache_config_data = config_data.get("cache", None)
- app_config = AppConfig(**app_config_data)
- memory_config = Mem0Config(**memory_config_data) if memory_config_data else None
- vector_db_provider = vector_db_config_data.get("provider", "chroma")
- vector_db = VectorDBFactory.create(vector_db_provider, vector_db_config_data.get("config", {}))
- if llm_config_data:
- # Initialize the metadata db for the app here since llmfactory needs it for initialization of
- # the llm memory
- setup_engine(database_uri=os.environ.get("EMBEDCHAIN_DB_URI"))
- init_db()
- llm_provider = llm_config_data.get("provider", "openai")
- llm = LlmFactory.create(llm_provider, llm_config_data.get("config", {}))
- else:
- llm = None
- embedding_model_provider = embedding_model_config_data.get("provider", "openai")
- embedding_model = EmbedderFactory.create(
- embedding_model_provider, embedding_model_config_data.get("config", {})
- )
- if cache_config_data is not None:
- cache_config = CacheConfig.from_config(cache_config_data)
- else:
- cache_config = None
- return cls(
- config=app_config,
- llm=llm,
- db=vector_db,
- embedding_model=embedding_model,
- config_data=config_data,
- auto_deploy=auto_deploy,
- chunker=chunker_config_data,
- cache_config=cache_config,
- memory_config=memory_config,
- )
- def _eval(self, dataset: list[EvalData], metric: Union[BaseMetric, str]):
- """
- Evaluate the app on a dataset for a given metric.
- """
- metric_str = metric.name if isinstance(metric, BaseMetric) else metric
- eval_class_map = {
- EvalMetric.CONTEXT_RELEVANCY.value: ContextRelevance,
- EvalMetric.ANSWER_RELEVANCY.value: AnswerRelevance,
- EvalMetric.GROUNDEDNESS.value: Groundedness,
- }
- if metric_str in eval_class_map:
- return eval_class_map[metric_str]().evaluate(dataset)
- # Handle the case for custom metrics
- if isinstance(metric, BaseMetric):
- return metric.evaluate(dataset)
- else:
- raise ValueError(f"Invalid metric: {metric}")
- def evaluate(
- self,
- questions: Union[str, list[str]],
- metrics: Optional[list[Union[BaseMetric, str]]] = None,
- num_workers: int = 4,
- ):
- """
- Evaluate the app on a question.
- param: questions: A question or a list of questions to evaluate.
- type: questions: Union[str, list[str]]
- param: metrics: A list of metrics to evaluate. Defaults to all metrics.
- type: metrics: Optional[list[Union[BaseMetric, str]]]
- param: num_workers: Number of workers to use for parallel processing.
- type: num_workers: int
- return: A dictionary containing the evaluation results.
- rtype: dict
- """
- if "OPENAI_API_KEY" not in os.environ:
- raise ValueError("Please set the OPENAI_API_KEY environment variable with permission to use `gpt4` model.")
- queries, answers, contexts = [], [], []
- if isinstance(questions, list):
- with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
- future_to_data = {executor.submit(self.query, q, citations=True): q for q in questions}
- for future in tqdm(
- concurrent.futures.as_completed(future_to_data),
- total=len(future_to_data),
- desc="Getting answer and contexts for questions",
- ):
- question = future_to_data[future]
- queries.append(question)
- answer, context = future.result()
- answers.append(answer)
- contexts.append(list(map(lambda x: x[0], context)))
- else:
- answer, context = self.query(questions, citations=True)
- queries = [questions]
- answers = [answer]
- contexts = [list(map(lambda x: x[0], context))]
- metrics = metrics or [
- EvalMetric.CONTEXT_RELEVANCY.value,
- EvalMetric.ANSWER_RELEVANCY.value,
- EvalMetric.GROUNDEDNESS.value,
- ]
- logger.info(f"Collecting data from {len(queries)} questions for evaluation...")
- dataset = []
- for q, a, c in zip(queries, answers, contexts):
- dataset.append(EvalData(question=q, answer=a, contexts=c))
- logger.info(f"Evaluating {len(dataset)} data points...")
- result = {}
- with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
- future_to_metric = {executor.submit(self._eval, dataset, metric): metric for metric in metrics}
- for future in tqdm(
- concurrent.futures.as_completed(future_to_metric),
- total=len(future_to_metric),
- desc="Evaluating metrics",
- ):
- metric = future_to_metric[future]
- if isinstance(metric, BaseMetric):
- result[metric.name] = future.result()
- else:
- result[metric] = future.result()
- if self.config.collect_metrics:
- telemetry_props = self._telemetry_props
- metrics_names = []
- for metric in metrics:
- if isinstance(metric, BaseMetric):
- metrics_names.append(metric.name)
- else:
- metrics_names.append(metric)
- telemetry_props["metrics"] = metrics_names
- self.telemetry.capture(event_name="evaluate", properties=telemetry_props)
- return result
|