chroma.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. import logging
  2. from typing import Any, Optional, Union
  3. from chromadb import Collection, QueryResult
  4. from langchain.docstore.document import Document
  5. from tqdm import tqdm
  6. from embedchain.config import ChromaDbConfig
  7. from embedchain.helpers.json_serializable import register_deserializable
  8. from embedchain.vectordb.base import BaseVectorDB
  9. try:
  10. import chromadb
  11. from chromadb.config import Settings
  12. from chromadb.errors import InvalidDimensionException
  13. except RuntimeError:
  14. from embedchain.utils.misc import use_pysqlite3
  15. use_pysqlite3()
  16. import chromadb
  17. from chromadb.config import Settings
  18. from chromadb.errors import InvalidDimensionException
  19. @register_deserializable
  20. class ChromaDB(BaseVectorDB):
  21. """Vector database using ChromaDB."""
  22. BATCH_SIZE = 100
  23. def __init__(self, config: Optional[ChromaDbConfig] = None):
  24. """Initialize a new ChromaDB instance
  25. :param config: Configuration options for Chroma, defaults to None
  26. :type config: Optional[ChromaDbConfig], optional
  27. """
  28. if config:
  29. self.config = config
  30. else:
  31. self.config = ChromaDbConfig()
  32. self.settings = Settings(anonymized_telemetry=False)
  33. self.settings.allow_reset = self.config.allow_reset if hasattr(self.config, "allow_reset") else False
  34. if self.config.chroma_settings:
  35. for key, value in self.config.chroma_settings.items():
  36. if hasattr(self.settings, key):
  37. setattr(self.settings, key, value)
  38. if self.config.host and self.config.port:
  39. logging.info(f"Connecting to ChromaDB server: {self.config.host}:{self.config.port}")
  40. self.settings.chroma_server_host = self.config.host
  41. self.settings.chroma_server_http_port = self.config.port
  42. self.settings.chroma_api_impl = "chromadb.api.fastapi.FastAPI"
  43. else:
  44. if self.config.dir is None:
  45. self.config.dir = "db"
  46. self.settings.persist_directory = self.config.dir
  47. self.settings.is_persistent = True
  48. self.client = chromadb.Client(self.settings)
  49. super().__init__(config=self.config)
  50. def _initialize(self):
  51. """
  52. This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
  53. """
  54. if not self.embedder:
  55. raise ValueError(
  56. "Embedder not set. Please set an embedder with `_set_embedder()` function before initialization."
  57. )
  58. self._get_or_create_collection(self.config.collection_name)
  59. def _get_or_create_db(self):
  60. """Called during initialization"""
  61. return self.client
  62. @staticmethod
  63. def _generate_where_clause(where: dict[str, any]) -> dict[str, any]:
  64. # If only one filter is supplied, return it as is
  65. # (no need to wrap in $and based on chroma docs)
  66. if where is None:
  67. return {}
  68. if len(where.keys()) <= 1:
  69. return where
  70. where_filters = []
  71. for k, v in where.items():
  72. if isinstance(v, str):
  73. where_filters.append({k: v})
  74. return {"$and": where_filters}
  75. def _get_or_create_collection(self, name: str) -> Collection:
  76. """
  77. Get or create a named collection.
  78. :param name: Name of the collection
  79. :type name: str
  80. :raises ValueError: No embedder configured.
  81. :return: Created collection
  82. :rtype: Collection
  83. """
  84. if not hasattr(self, "embedder") or not self.embedder:
  85. raise ValueError("Cannot create a Chroma database collection without an embedder.")
  86. self.collection = self.client.get_or_create_collection(
  87. name=name,
  88. embedding_function=self.embedder.embedding_fn,
  89. )
  90. return self.collection
  91. def get(self, ids: Optional[list[str]] = None, where: Optional[dict[str, any]] = None, limit: Optional[int] = None):
  92. """
  93. Get existing doc ids present in vector database
  94. :param ids: list of doc ids to check for existence
  95. :type ids: list[str]
  96. :param where: Optional. to filter data
  97. :type where: dict[str, Any]
  98. :param limit: Optional. maximum number of documents
  99. :type limit: Optional[int]
  100. :return: Existing documents.
  101. :rtype: list[str]
  102. """
  103. args = {}
  104. if ids:
  105. args["ids"] = ids
  106. if where:
  107. args["where"] = self._generate_where_clause(where)
  108. if limit:
  109. args["limit"] = limit
  110. return self.collection.get(**args)
  111. def add(
  112. self,
  113. documents: list[str],
  114. metadatas: list[object],
  115. ids: list[str],
  116. ) -> Any:
  117. """
  118. Add vectors to chroma database
  119. :param documents: Documents
  120. :type documents: list[str]
  121. :param metadatas: Metadatas
  122. :type metadatas: list[object]
  123. :param ids: ids
  124. :type ids: list[str]
  125. """
  126. size = len(documents)
  127. if len(documents) != size or len(metadatas) != size or len(ids) != size:
  128. raise ValueError(
  129. "Cannot add documents to chromadb with inconsistent sizes. Documents size: {}, Metadata size: {},"
  130. " Ids size: {}".format(len(documents), len(metadatas), len(ids))
  131. )
  132. for i in tqdm(range(0, len(documents), self.BATCH_SIZE), desc="Inserting batches in chromadb"):
  133. self.collection.add(
  134. documents=documents[i : i + self.BATCH_SIZE],
  135. metadatas=metadatas[i : i + self.BATCH_SIZE],
  136. ids=ids[i : i + self.BATCH_SIZE],
  137. )
  138. @staticmethod
  139. def _format_result(results: QueryResult) -> list[tuple[Document, float]]:
  140. """
  141. Format Chroma results
  142. :param results: ChromaDB query results to format.
  143. :type results: QueryResult
  144. :return: Formatted results
  145. :rtype: list[tuple[Document, float]]
  146. """
  147. return [
  148. (Document(page_content=result[0], metadata=result[1] or {}), result[2])
  149. for result in zip(
  150. results["documents"][0],
  151. results["metadatas"][0],
  152. results["distances"][0],
  153. )
  154. ]
  155. def query(
  156. self,
  157. input_query: list[str],
  158. n_results: int,
  159. where: Optional[dict[str, any]] = None,
  160. raw_filter: Optional[dict[str, any]] = None,
  161. citations: bool = False,
  162. **kwargs: Optional[dict[str, any]],
  163. ) -> Union[list[tuple[str, dict]], list[str]]:
  164. """
  165. Query contents from vector database based on vector similarity
  166. :param input_query: list of query string
  167. :type input_query: list[str]
  168. :param n_results: no of similar documents to fetch from database
  169. :type n_results: int
  170. :param where: to filter data
  171. :type where: dict[str, Any]
  172. :param raw_filter: Raw filter to apply
  173. :type raw_filter: dict[str, Any]
  174. :param citations: we use citations boolean param to return context along with the answer.
  175. :type citations: bool, default is False.
  176. :raises InvalidDimensionException: Dimensions do not match.
  177. :return: The content of the document that matched your query,
  178. along with url of the source and doc_id (if citations flag is true)
  179. :rtype: list[str], if citations=False, otherwise list[tuple[str, str, str]]
  180. """
  181. if where and raw_filter:
  182. raise ValueError("Both `where` and `raw_filter` cannot be used together.")
  183. where_clause = {}
  184. if raw_filter:
  185. where_clause = raw_filter
  186. if where:
  187. where_clause = self._generate_where_clause(where)
  188. try:
  189. result = self.collection.query(
  190. query_texts=[
  191. input_query,
  192. ],
  193. n_results=n_results,
  194. where=where_clause,
  195. )
  196. except InvalidDimensionException as e:
  197. raise InvalidDimensionException(
  198. e.message()
  199. + ". This is commonly a side-effect when an embedding function, different from the one used to add the"
  200. " embeddings, is used to retrieve an embedding from the database."
  201. ) from None
  202. results_formatted = self._format_result(result)
  203. contexts = []
  204. for result in results_formatted:
  205. context = result[0].page_content
  206. if citations:
  207. metadata = result[0].metadata
  208. metadata["score"] = result[1]
  209. contexts.append((context, metadata))
  210. else:
  211. contexts.append(context)
  212. return contexts
  213. def set_collection_name(self, name: str):
  214. """
  215. Set the name of the collection. A collection is an isolated space for vectors.
  216. :param name: Name of the collection.
  217. :type name: str
  218. """
  219. if not isinstance(name, str):
  220. raise TypeError("Collection name must be a string")
  221. self.config.collection_name = name
  222. self._get_or_create_collection(self.config.collection_name)
  223. def count(self) -> int:
  224. """
  225. Count number of documents/chunks embedded in the database.
  226. :return: number of documents
  227. :rtype: int
  228. """
  229. return self.collection.count()
  230. def delete(self, where):
  231. return self.collection.delete(where=self._generate_where_clause(where))
  232. def reset(self):
  233. """
  234. Resets the database. Deletes all embeddings irreversibly.
  235. """
  236. # Delete all data from the collection
  237. try:
  238. self.client.delete_collection(self.config.collection_name)
  239. except ValueError:
  240. raise ValueError(
  241. "For safety reasons, resetting is disabled. "
  242. "Please enable it by setting `allow_reset=True` in your ChromaDbConfig"
  243. ) from None
  244. # Recreate
  245. self._get_or_create_collection(self.config.collection_name)
  246. # Todo: Automatically recreating a collection with the same name cannot be the best way to handle a reset.
  247. # A downside of this implementation is, if you have two instances,
  248. # the other instance will not get the updated `self.collection` attribute.
  249. # A better way would be to create the collection if it is called again after being reset.
  250. # That means, checking if collection exists in the db-consuming methods, and creating it if it doesn't.
  251. # That's an extra steps for all uses, just to satisfy a niche use case in a niche method. For now, this will do.