zilliz.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import logging
  2. from typing import Any, Dict, List, Optional, Tuple, Union
  3. from embedchain.config import ZillizDBConfig
  4. from embedchain.helpers.json_serializable import register_deserializable
  5. from embedchain.vectordb.base import BaseVectorDB
  6. try:
  7. from pymilvus import (Collection, CollectionSchema, DataType, FieldSchema,
  8. MilvusClient, connections, utility)
  9. except ImportError:
  10. raise ImportError(
  11. "Zilliz requires extra dependencies. Install with `pip install --upgrade embedchain[milvus]`"
  12. ) from None
  13. @register_deserializable
  14. class ZillizVectorDB(BaseVectorDB):
  15. """Base class for vector database."""
  16. def __init__(self, config: ZillizDBConfig = None):
  17. """Initialize the database. Save the config and client as an attribute.
  18. :param config: Database configuration class instance.
  19. :type config: ZillizDBConfig
  20. """
  21. if config is None:
  22. self.config = ZillizDBConfig()
  23. else:
  24. self.config = config
  25. self.client = MilvusClient(
  26. uri=self.config.uri,
  27. token=self.config.token,
  28. )
  29. self.connection = connections.connect(
  30. uri=self.config.uri,
  31. token=self.config.token,
  32. )
  33. super().__init__(config=self.config)
  34. def _initialize(self):
  35. """
  36. This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
  37. So it's can't be done in __init__ in one step.
  38. """
  39. self._get_or_create_collection(self.config.collection_name)
  40. def _get_or_create_db(self):
  41. """Get or create the database."""
  42. return self.client
  43. def _get_or_create_collection(self, name):
  44. """
  45. Get or create a named collection.
  46. :param name: Name of the collection
  47. :type name: str
  48. """
  49. if utility.has_collection(name):
  50. logging.info(f"[ZillizDB]: found an existing collection {name}, make sure the auto-id is disabled.")
  51. self.collection = Collection(name)
  52. else:
  53. fields = [
  54. FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=512),
  55. FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2048),
  56. FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=self.embedder.vector_dimension),
  57. ]
  58. schema = CollectionSchema(fields, enable_dynamic_field=True)
  59. self.collection = Collection(name=name, schema=schema)
  60. index = {
  61. "index_type": "AUTOINDEX",
  62. "metric_type": self.config.metric_type,
  63. }
  64. self.collection.create_index("embeddings", index)
  65. return self.collection
  66. def get(self, ids: Optional[List[str]] = None, where: Optional[Dict[str, any]] = None, limit: Optional[int] = None):
  67. """
  68. Get existing doc ids present in vector database
  69. :param ids: list of doc ids to check for existence
  70. :type ids: List[str]
  71. :param where: Optional. to filter data
  72. :type where: Dict[str, Any]
  73. :param limit: Optional. maximum number of documents
  74. :type limit: Optional[int]
  75. :return: Existing documents.
  76. :rtype: Set[str]
  77. """
  78. if ids is None or len(ids) == 0 or self.collection.num_entities == 0:
  79. return {"ids": []}
  80. if not (self.collection.is_empty):
  81. filter = f"id in {ids}"
  82. results = self.client.query(
  83. collection_name=self.config.collection_name, filter=filter, output_fields=["id"]
  84. )
  85. results = [res["id"] for res in results]
  86. return {"ids": set(results)}
  87. def add(
  88. self,
  89. embeddings: List[List[float]],
  90. documents: List[str],
  91. metadatas: List[object],
  92. ids: List[str],
  93. skip_embedding: bool,
  94. **kwargs: Optional[Dict[str, any]],
  95. ):
  96. """Add to database"""
  97. if not skip_embedding:
  98. embeddings = self.embedder.embedding_fn(documents)
  99. for id, doc, metadata, embedding in zip(ids, documents, metadatas, embeddings):
  100. data = {**metadata, "id": id, "text": doc, "embeddings": embedding}
  101. self.client.insert(collection_name=self.config.collection_name, data=data, **kwargs)
  102. self.collection.load()
  103. self.collection.flush()
  104. self.client.flush(self.config.collection_name)
  105. def query(
  106. self,
  107. input_query: List[str],
  108. n_results: int,
  109. where: Dict[str, any],
  110. skip_embedding: bool,
  111. citations: bool = False,
  112. **kwargs: Optional[Dict[str, Any]],
  113. ) -> Union[List[Tuple[str, Dict]], List[str]]:
  114. """
  115. Query contents from vector data base based on vector similarity
  116. :param input_query: list of query string
  117. :type input_query: List[str]
  118. :param n_results: no of similar documents to fetch from database
  119. :type n_results: int
  120. :param where: to filter data
  121. :type where: str
  122. :raises InvalidDimensionException: Dimensions do not match.
  123. :param citations: we use citations boolean param to return context along with the answer.
  124. :type citations: bool, default is False.
  125. :return: The content of the document that matched your query,
  126. along with url of the source and doc_id (if citations flag is true)
  127. :rtype: List[str], if citations=False, otherwise List[Tuple[str, str, str]]
  128. """
  129. if self.collection.is_empty:
  130. return []
  131. if not isinstance(where, str):
  132. where = None
  133. output_fields = ["*"]
  134. if skip_embedding:
  135. query_vector = input_query
  136. query_result = self.client.search(
  137. collection_name=self.config.collection_name,
  138. data=query_vector,
  139. limit=n_results,
  140. output_fields=output_fields,
  141. **kwargs,
  142. )
  143. else:
  144. input_query_vector = self.embedder.embedding_fn([input_query])
  145. query_vector = input_query_vector[0]
  146. query_result = self.client.search(
  147. collection_name=self.config.collection_name,
  148. data=[query_vector],
  149. limit=n_results,
  150. output_fields=output_fields,
  151. **kwargs,
  152. )
  153. query_result = query_result[0]
  154. contexts = []
  155. for query in query_result:
  156. data = query["entity"]
  157. score = query["distance"]
  158. context = data["text"]
  159. if "embeddings" in data:
  160. data.pop("embeddings")
  161. if citations:
  162. data["score"] = score
  163. contexts.append(tuple((context, data)))
  164. else:
  165. contexts.append(context)
  166. return contexts
  167. def count(self) -> int:
  168. """
  169. Count number of documents/chunks embedded in the database.
  170. :return: number of documents
  171. :rtype: int
  172. """
  173. return self.collection.num_entities
  174. def reset(self, collection_names: List[str] = None):
  175. """
  176. Resets the database. Deletes all embeddings irreversibly.
  177. """
  178. if self.config.collection_name:
  179. if collection_names:
  180. for collection_name in collection_names:
  181. if collection_name in self.client.list_collections():
  182. self.client.drop_collection(collection_name=collection_name)
  183. else:
  184. self.client.drop_collection(collection_name=self.config.collection_name)
  185. self._get_or_create_collection(self.config.collection_name)
  186. def set_collection_name(self, name: str):
  187. """
  188. Set the name of the collection. A collection is an isolated space for vectors.
  189. :param name: Name of the collection.
  190. :type name: str
  191. """
  192. if not isinstance(name, str):
  193. raise TypeError("Collection name must be a string")
  194. self.config.collection_name = name
  195. def delete(self, keys: Union[list, str, int]):
  196. """
  197. Delete the embeddings from DB. Zilliz only support deleting with keys.
  198. :param keys: Primary keys of the table entries to delete.
  199. :type keys: Union[list, str, int]
  200. """
  201. self.client.delete(
  202. collection_name=self.config.collection_name,
  203. pks=keys,
  204. )