chroma_db.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. import logging
  2. from typing import Dict, List, Optional
  3. from chromadb import Collection, QueryResult
  4. from langchain.docstore.document import Document
  5. from embedchain.config import ChromaDbConfig
  6. from embedchain.helper_classes.json_serializable import register_deserializable
  7. from embedchain.vectordb.base_vector_db import BaseVectorDB
  8. try:
  9. import chromadb
  10. from chromadb.config import Settings
  11. from chromadb.errors import InvalidDimensionException
  12. except RuntimeError:
  13. from embedchain.utils import use_pysqlite3
  14. use_pysqlite3()
  15. import chromadb
  16. from chromadb.config import Settings
  17. from chromadb.errors import InvalidDimensionException
  18. @register_deserializable
  19. class ChromaDB(BaseVectorDB):
  20. """Vector database using ChromaDB."""
  21. def __init__(self, config: Optional[ChromaDbConfig] = None):
  22. """Initialize a new ChromaDB instance
  23. :param config: Configuration options for Chroma, defaults to None
  24. :type config: Optional[ChromaDbConfig], optional
  25. """
  26. if config:
  27. self.config = config
  28. else:
  29. self.config = ChromaDbConfig()
  30. self.settings = Settings()
  31. if self.config.chroma_settings:
  32. for key, value in self.config.chroma_settings.items():
  33. if hasattr(self.settings, key):
  34. setattr(self.settings, key, value)
  35. if self.config.host and self.config.port:
  36. logging.info(f"Connecting to ChromaDB server: {self.config.host}:{self.config.port}")
  37. self.settings.chroma_server_host = self.config.host
  38. self.settings.chroma_server_http_port = self.config.port
  39. self.settings.chroma_api_impl = "chromadb.api.fastapi.FastAPI"
  40. else:
  41. if self.config.dir is None:
  42. self.config.dir = "db"
  43. self.settings.persist_directory = self.config.dir
  44. self.settings.is_persistent = True
  45. self.client = chromadb.Client(self.settings)
  46. super().__init__(config=self.config)
  47. def _initialize(self):
  48. """
  49. This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
  50. """
  51. if not self.embedder:
  52. raise ValueError("Embedder not set. Please set an embedder with `set_embedder` before initialization.")
  53. self._get_or_create_collection(self.config.collection_name)
  54. def _get_or_create_db(self):
  55. """Called during initialization"""
  56. return self.client
  57. def _get_or_create_collection(self, name: str) -> Collection:
  58. """
  59. Get or create a named collection.
  60. :param name: Name of the collection
  61. :type name: str
  62. :raises ValueError: No embedder configured.
  63. :return: Created collection
  64. :rtype: Collection
  65. """
  66. if not hasattr(self, "embedder") or not self.embedder:
  67. raise ValueError("Cannot create a Chroma database collection without an embedder.")
  68. self.collection = self.client.get_or_create_collection(
  69. name=name,
  70. embedding_function=self.embedder.embedding_fn,
  71. )
  72. return self.collection
  73. def get(self, ids: List[str], where: Dict[str, any]) -> List[str]:
  74. """
  75. Get existing doc ids present in vector database
  76. :param ids: list of doc ids to check for existence
  77. :type ids: List[str]
  78. :param where: Optional. to filter data
  79. :type where: Dict[str, any]
  80. :return: Existing documents.
  81. :rtype: List[str]
  82. """
  83. existing_docs = self.collection.get(
  84. ids=ids,
  85. where=where, # optional filter
  86. )
  87. return set(existing_docs["ids"])
  88. def add(self, documents: List[str], metadatas: List[object], ids: List[str]):
  89. """
  90. Add vectors to chroma database
  91. :param documents: Documents
  92. :type documents: List[str]
  93. :param metadatas: Metadatas
  94. :type metadatas: List[object]
  95. :param ids: ids
  96. :type ids: List[str]
  97. """
  98. self.collection.add(documents=documents, metadatas=metadatas, ids=ids)
  99. def _format_result(self, results: QueryResult) -> list[tuple[Document, float]]:
  100. """
  101. Format Chroma results
  102. :param results: ChromaDB query results to format.
  103. :type results: QueryResult
  104. :return: Formatted results
  105. :rtype: list[tuple[Document, float]]
  106. """
  107. return [
  108. (Document(page_content=result[0], metadata=result[1] or {}), result[2])
  109. for result in zip(
  110. results["documents"][0],
  111. results["metadatas"][0],
  112. results["distances"][0],
  113. )
  114. ]
  115. def query(self, input_query: List[str], n_results: int, where: Dict[str, any]) -> List[str]:
  116. """
  117. Query contents from vector data base based on vector similarity
  118. :param input_query: list of query string
  119. :type input_query: List[str]
  120. :param n_results: no of similar documents to fetch from database
  121. :type n_results: int
  122. :param where: to filter data
  123. :type where: Dict[str, any]
  124. :raises InvalidDimensionException: Dimensions do not match.
  125. :return: The content of the document that matched your query.
  126. :rtype: List[str]
  127. """
  128. try:
  129. result = self.collection.query(
  130. query_texts=[
  131. input_query,
  132. ],
  133. n_results=n_results,
  134. where=where,
  135. )
  136. except InvalidDimensionException as e:
  137. raise InvalidDimensionException(
  138. e.message()
  139. + ". This is commonly a side-effect when an embedding function, different from the one used to add the embeddings, is used to retrieve an embedding from the database." # noqa E501
  140. ) from None
  141. results_formatted = self._format_result(result)
  142. contents = [result[0].page_content for result in results_formatted]
  143. return contents
  144. def set_collection_name(self, name: str):
  145. """
  146. Set the name of the collection. A collection is an isolated space for vectors.
  147. :param name: Name of the collection.
  148. :type name: str
  149. """
  150. self.config.collection_name = name
  151. self._get_or_create_collection(self.config.collection_name)
  152. def count(self) -> int:
  153. """
  154. Count number of documents/chunks embedded in the database.
  155. :return: number of documents
  156. :rtype: int
  157. """
  158. return self.collection.count()
  159. def reset(self):
  160. """
  161. Resets the database. Deletes all embeddings irreversibly.
  162. """
  163. # Delete all data from the database
  164. try:
  165. self.client.reset()
  166. except ValueError:
  167. raise ValueError(
  168. "For safety reasons, resetting is disabled."
  169. 'Please enable it by including `chromadb_settings={"allow_reset": True}` in your ChromaDbConfig'
  170. ) from None
  171. # Recreate
  172. self._get_or_create_collection(self.config.collection_name)
  173. # Todo: Automatically recreating a collection with the same name cannot be the best way to handle a reset.
  174. # A downside of this implementation is, if you have two instances,
  175. # the other instance will not get the updated `self.collection` attribute.
  176. # A better way would be to create the collection if it is called again after being reset.
  177. # That means, checking if collection exists in the db-consuming methods, and creating it if it doesn't.
  178. # That's an extra steps for all uses, just to satisfy a niche use case in a niche method. For now, this will do.