zilliz.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import logging
  2. from typing import Any, Optional, Union
  3. from embedchain.config import ZillizDBConfig
  4. from embedchain.helpers.json_serializable import register_deserializable
  5. from embedchain.vectordb.base import BaseVectorDB
  6. try:
  7. from pymilvus import (Collection, CollectionSchema, DataType, FieldSchema,
  8. MilvusClient, connections, utility)
  9. except ImportError:
  10. raise ImportError(
  11. "Zilliz requires extra dependencies. Install with `pip install --upgrade embedchain[milvus]`"
  12. ) from None
  13. logger = logging.getLogger(__name__)
  14. @register_deserializable
  15. class ZillizVectorDB(BaseVectorDB):
  16. """Base class for vector database."""
  17. def __init__(self, config: ZillizDBConfig = None):
  18. """Initialize the database. Save the config and client as an attribute.
  19. :param config: Database configuration class instance.
  20. :type config: ZillizDBConfig
  21. """
  22. if config is None:
  23. self.config = ZillizDBConfig()
  24. else:
  25. self.config = config
  26. self.client = MilvusClient(
  27. uri=self.config.uri,
  28. token=self.config.token,
  29. )
  30. self.connection = connections.connect(
  31. uri=self.config.uri,
  32. token=self.config.token,
  33. )
  34. super().__init__(config=self.config)
  35. def _initialize(self):
  36. """
  37. This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
  38. So it's can't be done in __init__ in one step.
  39. """
  40. self._get_or_create_collection(self.config.collection_name)
  41. def _get_or_create_db(self):
  42. """Get or create the database."""
  43. return self.client
  44. def _get_or_create_collection(self, name):
  45. """
  46. Get or create a named collection.
  47. :param name: Name of the collection
  48. :type name: str
  49. """
  50. if utility.has_collection(name):
  51. logger.info(f"[ZillizDB]: found an existing collection {name}, make sure the auto-id is disabled.")
  52. self.collection = Collection(name)
  53. else:
  54. fields = [
  55. FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=512),
  56. FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2048),
  57. FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=self.embedder.vector_dimension),
  58. FieldSchema(name="metadata", dtype=DataType.JSON),
  59. ]
  60. schema = CollectionSchema(fields, enable_dynamic_field=True)
  61. self.collection = Collection(name=name, schema=schema)
  62. index = {
  63. "index_type": "AUTOINDEX",
  64. "metric_type": self.config.metric_type,
  65. }
  66. self.collection.create_index("embeddings", index)
  67. return self.collection
  68. def get(self, ids: Optional[list[str]] = None, where: Optional[dict[str, any]] = None, limit: Optional[int] = None):
  69. """
  70. Get existing doc ids present in vector database
  71. :param ids: list of doc ids to check for existence
  72. :type ids: list[str]
  73. :param where: Optional. to filter data
  74. :type where: dict[str, Any]
  75. :param limit: Optional. maximum number of documents
  76. :type limit: Optional[int]
  77. :return: Existing documents.
  78. :rtype: Set[str]
  79. """
  80. data_ids = []
  81. metadatas = []
  82. if self.collection.num_entities == 0 or self.collection.is_empty:
  83. return {"ids": data_ids, "metadatas": metadatas}
  84. filter_ = ""
  85. if ids:
  86. filter_ = f'id in "{ids}"'
  87. if where:
  88. if filter_:
  89. filter_ += " and "
  90. filter_ = f"{self._generate_zilliz_filter(where)}"
  91. results = self.client.query(collection_name=self.config.collection_name, filter=filter_, output_fields=["*"])
  92. for res in results:
  93. data_ids.append(res.get("id"))
  94. metadatas.append(res.get("metadata", {}))
  95. return {"ids": data_ids, "metadatas": metadatas}
  96. def add(
  97. self,
  98. documents: list[str],
  99. metadatas: list[object],
  100. ids: list[str],
  101. **kwargs: Optional[dict[str, any]],
  102. ):
  103. """Add to database"""
  104. embeddings = self.embedder.embedding_fn(documents)
  105. for id, doc, metadata, embedding in zip(ids, documents, metadatas, embeddings):
  106. data = {"id": id, "text": doc, "embeddings": embedding, "metadata": metadata}
  107. self.client.insert(collection_name=self.config.collection_name, data=data, **kwargs)
  108. self.collection.load()
  109. self.collection.flush()
  110. self.client.flush(self.config.collection_name)
  111. def query(
  112. self,
  113. input_query: str,
  114. n_results: int,
  115. where: dict[str, Any],
  116. citations: bool = False,
  117. **kwargs: Optional[dict[str, Any]],
  118. ) -> Union[list[tuple[str, dict]], list[str]]:
  119. """
  120. Query contents from vector database based on vector similarity
  121. :param input_query: query string
  122. :type input_query: str
  123. :param n_results: no of similar documents to fetch from database
  124. :type n_results: int
  125. :param where: to filter data
  126. :type where: dict[str, Any]
  127. :raises InvalidDimensionException: Dimensions do not match.
  128. :param citations: we use citations boolean param to return context along with the answer.
  129. :type citations: bool, default is False.
  130. :return: The content of the document that matched your query,
  131. along with url of the source and doc_id (if citations flag is true)
  132. :rtype: list[str], if citations=False, otherwise list[tuple[str, str, str]]
  133. """
  134. if self.collection.is_empty:
  135. return []
  136. output_fields = ["*"]
  137. input_query_vector = self.embedder.embedding_fn([input_query])
  138. query_vector = input_query_vector[0]
  139. query_filter = self._generate_zilliz_filter(where)
  140. query_result = self.client.search(
  141. collection_name=self.config.collection_name,
  142. data=[query_vector],
  143. filter=query_filter,
  144. limit=n_results,
  145. output_fields=output_fields,
  146. **kwargs,
  147. )
  148. query_result = query_result[0]
  149. contexts = []
  150. for query in query_result:
  151. data = query["entity"]
  152. score = query["distance"]
  153. context = data["text"]
  154. if citations:
  155. metadata = data.get("metadata", {})
  156. metadata["score"] = score
  157. contexts.append(tuple((context, metadata)))
  158. else:
  159. contexts.append(context)
  160. return contexts
  161. def count(self) -> int:
  162. """
  163. Count number of documents/chunks embedded in the database.
  164. :return: number of documents
  165. :rtype: int
  166. """
  167. return self.collection.num_entities
  168. def reset(self, collection_names: list[str] = None):
  169. """
  170. Resets the database. Deletes all embeddings irreversibly.
  171. """
  172. if self.config.collection_name:
  173. if collection_names:
  174. for collection_name in collection_names:
  175. if collection_name in self.client.list_collections():
  176. self.client.drop_collection(collection_name=collection_name)
  177. else:
  178. self.client.drop_collection(collection_name=self.config.collection_name)
  179. self._get_or_create_collection(self.config.collection_name)
  180. def set_collection_name(self, name: str):
  181. """
  182. Set the name of the collection. A collection is an isolated space for vectors.
  183. :param name: Name of the collection.
  184. :type name: str
  185. """
  186. if not isinstance(name, str):
  187. raise TypeError("Collection name must be a string")
  188. self.config.collection_name = name
  189. def _generate_zilliz_filter(self, where: dict[str, str]):
  190. operands = []
  191. for key, value in where.items():
  192. operands.append(f'(metadata["{key}"] == "{value}")')
  193. return " and ".join(operands)
  194. def delete(self, where: dict[str, Any]):
  195. """
  196. Delete the embeddings from DB. Zilliz only support deleting with keys.
  197. :param keys: Primary keys of the table entries to delete.
  198. :type keys: Union[list, str, int]
  199. """
  200. data = self.get(where=where)
  201. keys = data.get("ids", [])
  202. if keys:
  203. self.client.delete(collection_name=self.config.collection_name, pks=keys)