zilliz.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from typing import Dict, List, Optional
  2. from embedchain.config import ZillizDBConfig
  3. from embedchain.helper.json_serializable import register_deserializable
  4. from embedchain.vectordb.base import BaseVectorDB
  5. try:
  6. from pymilvus import (Collection, CollectionSchema, DataType, FieldSchema,
  7. MilvusClient, connections, utility)
  8. except ImportError:
  9. raise ImportError(
  10. "Zilliz requires extra dependencies. Install with `pip install --upgrade embedchain[milvus]`"
  11. ) from None
  12. @register_deserializable
  13. class ZillizVectorDB(BaseVectorDB):
  14. """Base class for vector database."""
  15. def __init__(self, config: ZillizDBConfig = None):
  16. """Initialize the database. Save the config and client as an attribute.
  17. :param config: Database configuration class instance.
  18. :type config: ZillizDBConfig
  19. """
  20. if config is None:
  21. self.config = ZillizDBConfig()
  22. else:
  23. self.config = config
  24. self.client = MilvusClient(
  25. uri=self.config.uri,
  26. token=self.config.token,
  27. )
  28. self.connection = connections.connect(
  29. uri=self.config.uri,
  30. token=self.config.token,
  31. )
  32. super().__init__(config=self.config)
  33. def _initialize(self):
  34. """
  35. This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
  36. So it's can't be done in __init__ in one step.
  37. """
  38. self._get_or_create_collection(self.config.collection_name)
  39. def _get_or_create_db(self):
  40. """Get or create the database."""
  41. return self.client
  42. def _get_or_create_collection(self, name):
  43. """
  44. Get or create a named collection.
  45. :param name: Name of the collection
  46. :type name: str
  47. """
  48. if utility.has_collection(name):
  49. self.collection = Collection(name)
  50. else:
  51. fields = [
  52. FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=512),
  53. FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2048),
  54. FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=self.embedder.vector_dimension),
  55. ]
  56. schema = CollectionSchema(fields, enable_dynamic_field=True)
  57. self.collection = Collection(name=name, schema=schema)
  58. index = {
  59. "index_type": "AUTOINDEX",
  60. "metric_type": self.config.metric_type,
  61. }
  62. self.collection.create_index("embeddings", index)
  63. return self.collection
  64. def get(self, ids: Optional[List[str]] = None, where: Optional[Dict[str, any]] = None, limit: Optional[int] = None):
  65. """
  66. Get existing doc ids present in vector database
  67. :param ids: list of doc ids to check for existence
  68. :type ids: List[str]
  69. :param where: Optional. to filter data
  70. :type where: Dict[str, Any]
  71. :param limit: Optional. maximum number of documents
  72. :type limit: Optional[int]
  73. :return: Existing documents.
  74. :rtype: Set[str]
  75. """
  76. if ids is None or len(ids) == 0 or self.collection.num_entities == 0:
  77. return {"ids": []}
  78. if not (self.collection.is_empty):
  79. filter = f"id in {ids}"
  80. results = self.client.query(
  81. collection_name=self.config.collection_name, filter=filter, output_fields=["id"]
  82. )
  83. results = [res["id"] for res in results]
  84. return {"ids": set(results)}
  85. def add(
  86. self,
  87. embeddings: List[List[float]],
  88. documents: List[str],
  89. metadatas: List[object],
  90. ids: List[str],
  91. skip_embedding: bool,
  92. ):
  93. """Add to database"""
  94. if not skip_embedding:
  95. embeddings = self.embedder.embedding_fn(documents)
  96. for id, doc, metadata, embedding in zip(ids, documents, metadatas, embeddings):
  97. data = {**metadata, "id": id, "text": doc, "embeddings": embedding}
  98. self.client.insert(collection_name=self.config.collection_name, data=data)
  99. self.collection.load()
  100. self.collection.flush()
  101. self.client.flush(self.config.collection_name)
  102. def query(self, input_query: List[str], n_results: int, where: Dict[str, any], skip_embedding: bool) -> List[str]:
  103. """
  104. Query contents from vector data base based on vector similarity
  105. :param input_query: list of query string
  106. :type input_query: List[str]
  107. :param n_results: no of similar documents to fetch from database
  108. :type n_results: int
  109. :param where: to filter data
  110. :type where: str
  111. :raises InvalidDimensionException: Dimensions do not match.
  112. :return: The content of the document that matched your query.
  113. :rtype: List[str]
  114. """
  115. if self.collection.is_empty:
  116. return []
  117. if not isinstance(where, str):
  118. where = None
  119. if skip_embedding:
  120. query_vector = input_query
  121. query_result = self.client.search(
  122. collection_name=self.config.collection_name,
  123. data=query_vector,
  124. limit=n_results,
  125. output_fields=["text"],
  126. )
  127. else:
  128. input_query_vector = self.embedder.embedding_fn([input_query])
  129. query_vector = input_query_vector[0]
  130. query_result = self.client.search(
  131. collection_name=self.config.collection_name,
  132. data=[query_vector],
  133. limit=n_results,
  134. output_fields=["text"],
  135. )
  136. doc_list = []
  137. for query in query_result:
  138. doc_list.append(query[0]["entity"]["text"])
  139. return doc_list
  140. def count(self) -> int:
  141. """
  142. Count number of documents/chunks embedded in the database.
  143. :return: number of documents
  144. :rtype: int
  145. """
  146. return self.collection.num_entities
  147. def reset(self, collection_names: List[str] = None):
  148. """
  149. Resets the database. Deletes all embeddings irreversibly.
  150. """
  151. if self.config.collection_name:
  152. if collection_names:
  153. for collection_name in collection_names:
  154. if collection_name in self.client.list_collections():
  155. self.client.drop_collection(collection_name=collection_name)
  156. else:
  157. self.client.drop_collection(collection_name=self.config.collection_name)
  158. self._get_or_create_collection(self.config.collection_name)
  159. def set_collection_name(self, name: str):
  160. """
  161. Set the name of the collection. A collection is an isolated space for vectors.
  162. :param name: Name of the collection.
  163. :type name: str
  164. """
  165. if not isinstance(name, str):
  166. raise TypeError("Collection name must be a string")
  167. self.config.collection_name = name