1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import threading
- import uuid
- import yaml
- from embedchain.config import PipelineConfig
- from embedchain.embedchain import EmbedChain
- from embedchain.embedder.base import BaseEmbedder
- from embedchain.embedder.openai import OpenAIEmbedder
- from embedchain.factory import EmbedderFactory, VectorDBFactory
- from embedchain.helper.json_serializable import register_deserializable
- from embedchain.vectordb.base import BaseVectorDB
- from embedchain.vectordb.chroma import ChromaDB
- @register_deserializable
- class Pipeline(EmbedChain):
- """
- EmbedChain pipeline lets you create a LLM powered app for your unstructured
- data by defining a pipeline with your chosen data source, embedding model,
- and vector database.
- """
- def __init__(self, config: PipelineConfig = None, db: BaseVectorDB = None, embedding_model: BaseEmbedder = None):
- """
- Initialize a new `App` instance.
- :param config: Configuration for the pipeline, defaults to None
- :type config: PipelineConfig, optional
- :param db: The database to use for storing and retrieving embeddings, defaults to None
- :type db: BaseVectorDB, optional
- :param embedding_model: The embedding model used to calculate embeddings, defaults to None
- :type embedding_model: BaseEmbedder, optional
- """
- super().__init__()
- self.config = config or PipelineConfig()
- self.name = self.config.name
- self.id = self.config.id or str(uuid.uuid4())
- self.embedding_model = embedding_model or OpenAIEmbedder()
- self.db = db or ChromaDB()
- self._initialize_db()
- self.user_asks = [] # legacy defaults
- self.s_id = self.config.id or str(uuid.uuid4())
- self.u_id = self._load_or_generate_user_id()
- thread_telemetry = threading.Thread(target=self._send_telemetry_event, args=("pipeline_init",))
- thread_telemetry.start()
- def _initialize_db(self):
- """
- Initialize the database.
- """
- self.db._set_embedder(self.embedding_model)
- self.db._initialize()
- self.db.set_collection_name(self.name)
- def search(self, query, num_documents=3):
- """
- Search for similar documents related to the query in the vector database.
- """
- where = {"app_id": self.id}
- return self.db.query(
- query,
- n_results=num_documents,
- where=where,
- skip_embedding=False,
- )
- @classmethod
- def from_config(cls, yaml_path: str):
- """
- Instantiate a Pipeline object from a YAML configuration file.
- :param yaml_path: Path to the YAML configuration file.
- :type yaml_path: str
- :return: An instance of the Pipeline class.
- :rtype: Pipeline
- """
- with open(yaml_path, "r") as file:
- config_data = yaml.safe_load(file)
- pipeline_config_data = config_data.get("pipeline", {})
- db_config_data = config_data.get("vectordb", {})
- embedding_model_config_data = config_data.get("embedding_model", {})
- pipeline_config = PipelineConfig(**pipeline_config_data)
- db_provider = db_config_data.get("provider", "chroma")
- db = VectorDBFactory.create(db_provider, db_config_data.get("config", {}))
- embedding_model_provider = embedding_model_config_data.get("provider", "openai")
- embedding_model = EmbedderFactory.create(
- embedding_model_provider, embedding_model_config_data.get("config", {})
- )
- return cls(config=pipeline_config, db=db, embedding_model=embedding_model)
|