chroma.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import logging
  2. from typing import Any, Optional, Union
  3. from chromadb import Collection, QueryResult
  4. from langchain.docstore.document import Document
  5. from tqdm import tqdm
  6. from embedchain.config import ChromaDbConfig
  7. from embedchain.helpers.json_serializable import register_deserializable
  8. from embedchain.vectordb.base import BaseVectorDB
  9. try:
  10. import chromadb
  11. from chromadb.config import Settings
  12. from chromadb.errors import InvalidDimensionException
  13. except RuntimeError:
  14. from embedchain.utils.misc import use_pysqlite3
  15. use_pysqlite3()
  16. import chromadb
  17. from chromadb.config import Settings
  18. from chromadb.errors import InvalidDimensionException
  19. @register_deserializable
  20. class ChromaDB(BaseVectorDB):
  21. """Vector database using ChromaDB."""
  22. BATCH_SIZE = 100
  23. def __init__(self, config: Optional[ChromaDbConfig] = None):
  24. """Initialize a new ChromaDB instance
  25. :param config: Configuration options for Chroma, defaults to None
  26. :type config: Optional[ChromaDbConfig], optional
  27. """
  28. if config:
  29. self.config = config
  30. else:
  31. self.config = ChromaDbConfig()
  32. self.settings = Settings(anonymized_telemetry=False)
  33. self.settings.allow_reset = self.config.allow_reset if hasattr(self.config, "allow_reset") else False
  34. if self.config.chroma_settings:
  35. for key, value in self.config.chroma_settings.items():
  36. if hasattr(self.settings, key):
  37. setattr(self.settings, key, value)
  38. if self.config.host and self.config.port:
  39. logging.info(f"Connecting to ChromaDB server: {self.config.host}:{self.config.port}")
  40. self.settings.chroma_server_host = self.config.host
  41. self.settings.chroma_server_http_port = self.config.port
  42. self.settings.chroma_api_impl = "chromadb.api.fastapi.FastAPI"
  43. else:
  44. if self.config.dir is None:
  45. self.config.dir = "db"
  46. self.settings.persist_directory = self.config.dir
  47. self.settings.is_persistent = True
  48. self.client = chromadb.Client(self.settings)
  49. super().__init__(config=self.config)
  50. def _initialize(self):
  51. """
  52. This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
  53. """
  54. if not self.embedder:
  55. raise ValueError(
  56. "Embedder not set. Please set an embedder with `_set_embedder()` function before initialization."
  57. )
  58. self._get_or_create_collection(self.config.collection_name)
  59. def _get_or_create_db(self):
  60. """Called during initialization"""
  61. return self.client
  62. @staticmethod
  63. def _generate_where_clause(where: dict[str, any]) -> dict[str, any]:
  64. # If only one filter is supplied, return it as is
  65. # (no need to wrap in $and based on chroma docs)
  66. if len(where.keys()) <= 1:
  67. return where
  68. where_filters = []
  69. for k, v in where.items():
  70. if isinstance(v, str):
  71. where_filters.append({k: v})
  72. return {"$and": where_filters}
  73. def _get_or_create_collection(self, name: str) -> Collection:
  74. """
  75. Get or create a named collection.
  76. :param name: Name of the collection
  77. :type name: str
  78. :raises ValueError: No embedder configured.
  79. :return: Created collection
  80. :rtype: Collection
  81. """
  82. if not hasattr(self, "embedder") or not self.embedder:
  83. raise ValueError("Cannot create a Chroma database collection without an embedder.")
  84. self.collection = self.client.get_or_create_collection(
  85. name=name,
  86. embedding_function=self.embedder.embedding_fn,
  87. )
  88. return self.collection
  89. def get(self, ids: Optional[list[str]] = None, where: Optional[dict[str, any]] = None, limit: Optional[int] = None):
  90. """
  91. Get existing doc ids present in vector database
  92. :param ids: list of doc ids to check for existence
  93. :type ids: list[str]
  94. :param where: Optional. to filter data
  95. :type where: dict[str, Any]
  96. :param limit: Optional. maximum number of documents
  97. :type limit: Optional[int]
  98. :return: Existing documents.
  99. :rtype: list[str]
  100. """
  101. args = {}
  102. if ids:
  103. args["ids"] = ids
  104. if where:
  105. args["where"] = self._generate_where_clause(where)
  106. if limit:
  107. args["limit"] = limit
  108. return self.collection.get(**args)
  109. def add(
  110. self,
  111. documents: list[str],
  112. metadatas: list[object],
  113. ids: list[str],
  114. ) -> Any:
  115. """
  116. Add vectors to chroma database
  117. :param documents: Documents
  118. :type documents: list[str]
  119. :param metadatas: Metadatas
  120. :type metadatas: list[object]
  121. :param ids: ids
  122. :type ids: list[str]
  123. """
  124. size = len(documents)
  125. if len(documents) != size or len(metadatas) != size or len(ids) != size:
  126. raise ValueError(
  127. "Cannot add documents to chromadb with inconsistent sizes. Documents size: {}, Metadata size: {},"
  128. " Ids size: {}".format(len(documents), len(metadatas), len(ids))
  129. )
  130. for i in tqdm(range(0, len(documents), self.BATCH_SIZE), desc="Inserting batches in chromadb"):
  131. self.collection.add(
  132. documents=documents[i : i + self.BATCH_SIZE],
  133. metadatas=metadatas[i : i + self.BATCH_SIZE],
  134. ids=ids[i : i + self.BATCH_SIZE],
  135. )
  136. @staticmethod
  137. def _format_result(results: QueryResult) -> list[tuple[Document, float]]:
  138. """
  139. Format Chroma results
  140. :param results: ChromaDB query results to format.
  141. :type results: QueryResult
  142. :return: Formatted results
  143. :rtype: list[tuple[Document, float]]
  144. """
  145. return [
  146. (Document(page_content=result[0], metadata=result[1] or {}), result[2])
  147. for result in zip(
  148. results["documents"][0],
  149. results["metadatas"][0],
  150. results["distances"][0],
  151. )
  152. ]
  153. def query(
  154. self,
  155. input_query: list[str],
  156. n_results: int,
  157. where: dict[str, any],
  158. citations: bool = False,
  159. **kwargs: Optional[dict[str, Any]],
  160. ) -> Union[list[tuple[str, dict]], list[str]]:
  161. """
  162. Query contents from vector database based on vector similarity
  163. :param input_query: list of query string
  164. :type input_query: list[str]
  165. :param n_results: no of similar documents to fetch from database
  166. :type n_results: int
  167. :param where: to filter data
  168. :type where: dict[str, Any]
  169. :param citations: we use citations boolean param to return context along with the answer.
  170. :type citations: bool, default is False.
  171. :raises InvalidDimensionException: Dimensions do not match.
  172. :return: The content of the document that matched your query,
  173. along with url of the source and doc_id (if citations flag is true)
  174. :rtype: list[str], if citations=False, otherwise list[tuple[str, str, str]]
  175. """
  176. try:
  177. result = self.collection.query(
  178. query_texts=[
  179. input_query,
  180. ],
  181. n_results=n_results,
  182. where=self._generate_where_clause(where),
  183. **kwargs,
  184. )
  185. except InvalidDimensionException as e:
  186. raise InvalidDimensionException(
  187. e.message()
  188. + ". This is commonly a side-effect when an embedding function, different from the one used to add the"
  189. " embeddings, is used to retrieve an embedding from the database."
  190. ) from None
  191. results_formatted = self._format_result(result)
  192. contexts = []
  193. for result in results_formatted:
  194. context = result[0].page_content
  195. if citations:
  196. metadata = result[0].metadata
  197. metadata["score"] = result[1]
  198. contexts.append((context, metadata))
  199. else:
  200. contexts.append(context)
  201. return contexts
  202. def set_collection_name(self, name: str):
  203. """
  204. Set the name of the collection. A collection is an isolated space for vectors.
  205. :param name: Name of the collection.
  206. :type name: str
  207. """
  208. if not isinstance(name, str):
  209. raise TypeError("Collection name must be a string")
  210. self.config.collection_name = name
  211. self._get_or_create_collection(self.config.collection_name)
  212. def count(self) -> int:
  213. """
  214. Count number of documents/chunks embedded in the database.
  215. :return: number of documents
  216. :rtype: int
  217. """
  218. return self.collection.count()
  219. def delete(self, where):
  220. return self.collection.delete(where=self._generate_where_clause(where))
  221. def reset(self):
  222. """
  223. Resets the database. Deletes all embeddings irreversibly.
  224. """
  225. # Delete all data from the collection
  226. try:
  227. self.client.delete_collection(self.config.collection_name)
  228. except ValueError:
  229. raise ValueError(
  230. "For safety reasons, resetting is disabled. "
  231. "Please enable it by setting `allow_reset=True` in your ChromaDbConfig"
  232. ) from None
  233. # Recreate
  234. self._get_or_create_collection(self.config.collection_name)
  235. # Todo: Automatically recreating a collection with the same name cannot be the best way to handle a reset.
  236. # A downside of this implementation is, if you have two instances,
  237. # the other instance will not get the updated `self.collection` attribute.
  238. # A better way would be to create the collection if it is called again after being reset.
  239. # That means, checking if collection exists in the db-consuming methods, and creating it if it doesn't.
  240. # That's an extra steps for all uses, just to satisfy a niche use case in a niche method. For now, this will do.