123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- import logging
- from typing import Any, Optional, Union
- from embedchain.config import ZillizDBConfig
- from embedchain.helpers.json_serializable import register_deserializable
- from embedchain.vectordb.base import BaseVectorDB
- try:
- from pymilvus import (Collection, CollectionSchema, DataType, FieldSchema,
- MilvusClient, connections, utility)
- except ImportError:
- raise ImportError(
- "Zilliz requires extra dependencies. Install with `pip install --upgrade embedchain[milvus]`"
- ) from None
- @register_deserializable
- class ZillizVectorDB(BaseVectorDB):
- """Base class for vector database."""
- def __init__(self, config: ZillizDBConfig = None):
- """Initialize the database. Save the config and client as an attribute.
- :param config: Database configuration class instance.
- :type config: ZillizDBConfig
- """
- if config is None:
- self.config = ZillizDBConfig()
- else:
- self.config = config
- self.client = MilvusClient(
- uri=self.config.uri,
- token=self.config.token,
- )
- self.connection = connections.connect(
- uri=self.config.uri,
- token=self.config.token,
- )
- super().__init__(config=self.config)
- def _initialize(self):
- """
- This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
- So it's can't be done in __init__ in one step.
- """
- self._get_or_create_collection(self.config.collection_name)
- def _get_or_create_db(self):
- """Get or create the database."""
- return self.client
- def _get_or_create_collection(self, name):
- """
- Get or create a named collection.
- :param name: Name of the collection
- :type name: str
- """
- if utility.has_collection(name):
- logging.info(f"[ZillizDB]: found an existing collection {name}, make sure the auto-id is disabled.")
- self.collection = Collection(name)
- else:
- fields = [
- FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=512),
- FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2048),
- FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=self.embedder.vector_dimension),
- FieldSchema(name="metadata", dtype=DataType.JSON),
- ]
- schema = CollectionSchema(fields, enable_dynamic_field=True)
- self.collection = Collection(name=name, schema=schema)
- index = {
- "index_type": "AUTOINDEX",
- "metric_type": self.config.metric_type,
- }
- self.collection.create_index("embeddings", index)
- return self.collection
- def get(self, ids: Optional[list[str]] = None, where: Optional[dict[str, any]] = None, limit: Optional[int] = None):
- """
- Get existing doc ids present in vector database
- :param ids: list of doc ids to check for existence
- :type ids: list[str]
- :param where: Optional. to filter data
- :type where: dict[str, Any]
- :param limit: Optional. maximum number of documents
- :type limit: Optional[int]
- :return: Existing documents.
- :rtype: Set[str]
- """
- data_ids = []
- metadatas = []
- if self.collection.num_entities == 0 or self.collection.is_empty:
- return {"ids": data_ids, "metadatas": metadatas}
- filter_ = ""
- if ids:
- filter_ = f'id in "{ids}"'
- if where:
- if filter_:
- filter_ += " and "
- filter_ = f"{self._generate_zilliz_filter(where)}"
- results = self.client.query(collection_name=self.config.collection_name, filter=filter_, output_fields=["*"])
- for res in results:
- data_ids.append(res.get("id"))
- metadatas.append(res.get("metadata", {}))
- return {"ids": data_ids, "metadatas": metadatas}
- def add(
- self,
- documents: list[str],
- metadatas: list[object],
- ids: list[str],
- **kwargs: Optional[dict[str, any]],
- ):
- """Add to database"""
- embeddings = self.embedder.embedding_fn(documents)
- for id, doc, metadata, embedding in zip(ids, documents, metadatas, embeddings):
- data = {"id": id, "text": doc, "embeddings": embedding, "metadata": metadata}
- self.client.insert(collection_name=self.config.collection_name, data=data, **kwargs)
- self.collection.load()
- self.collection.flush()
- self.client.flush(self.config.collection_name)
- def query(
- self,
- input_query: list[str],
- n_results: int,
- where: dict[str, Any],
- citations: bool = False,
- **kwargs: Optional[dict[str, Any]],
- ) -> Union[list[tuple[str, dict]], list[str]]:
- """
- Query contents from vector database based on vector similarity
- :param input_query: list of query string
- :type input_query: list[str]
- :param n_results: no of similar documents to fetch from database
- :type n_results: int
- :param where: to filter data
- :type where: dict[str, Any]
- :raises InvalidDimensionException: Dimensions do not match.
- :param citations: we use citations boolean param to return context along with the answer.
- :type citations: bool, default is False.
- :return: The content of the document that matched your query,
- along with url of the source and doc_id (if citations flag is true)
- :rtype: list[str], if citations=False, otherwise list[tuple[str, str, str]]
- """
- if self.collection.is_empty:
- return []
- output_fields = ["*"]
- input_query_vector = self.embedder.embedding_fn([input_query])
- query_vector = input_query_vector[0]
- query_filter = self._generate_zilliz_filter(where)
- query_result = self.client.search(
- collection_name=self.config.collection_name,
- data=[query_vector],
- filter=query_filter,
- limit=n_results,
- output_fields=output_fields,
- **kwargs,
- )
- query_result = query_result[0]
- contexts = []
- for query in query_result:
- data = query["entity"]
- score = query["distance"]
- context = data["text"]
- if citations:
- metadata = data.get("metadata", {})
- metadata["score"] = score
- contexts.append(tuple((context, metadata)))
- else:
- contexts.append(context)
- return contexts
- def count(self) -> int:
- """
- Count number of documents/chunks embedded in the database.
- :return: number of documents
- :rtype: int
- """
- return self.collection.num_entities
- def reset(self, collection_names: list[str] = None):
- """
- Resets the database. Deletes all embeddings irreversibly.
- """
- if self.config.collection_name:
- if collection_names:
- for collection_name in collection_names:
- if collection_name in self.client.list_collections():
- self.client.drop_collection(collection_name=collection_name)
- else:
- self.client.drop_collection(collection_name=self.config.collection_name)
- self._get_or_create_collection(self.config.collection_name)
- def set_collection_name(self, name: str):
- """
- Set the name of the collection. A collection is an isolated space for vectors.
- :param name: Name of the collection.
- :type name: str
- """
- if not isinstance(name, str):
- raise TypeError("Collection name must be a string")
- self.config.collection_name = name
- def _generate_zilliz_filter(self, where: dict[str, str]):
- operands = []
- for key, value in where.items():
- operands.append(f'(metadata["{key}"] == "{value}")')
- return " and ".join(operands)
- def delete(self, where: dict[str, Any]):
- """
- Delete the embeddings from DB. Zilliz only support deleting with keys.
- :param keys: Primary keys of the table entries to delete.
- :type keys: Union[list, str, int]
- """
- data = self.get(where=where)
- keys = data.get("ids", [])
- if keys:
- self.client.delete(collection_name=self.config.collection_name, pks=keys)
|