chroma.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import logging
  2. from typing import Any, Dict, List, Optional, Tuple, Union
  3. from chromadb import Collection, QueryResult
  4. from langchain.docstore.document import Document
  5. from embedchain.config import ChromaDbConfig
  6. from embedchain.helper.json_serializable import register_deserializable
  7. from embedchain.vectordb.base import BaseVectorDB
  8. try:
  9. import chromadb
  10. from chromadb.config import Settings
  11. from chromadb.errors import InvalidDimensionException
  12. except RuntimeError:
  13. from embedchain.utils import use_pysqlite3
  14. use_pysqlite3()
  15. import chromadb
  16. from chromadb.config import Settings
  17. from chromadb.errors import InvalidDimensionException
  18. @register_deserializable
  19. class ChromaDB(BaseVectorDB):
  20. """Vector database using ChromaDB."""
  21. BATCH_SIZE = 100
  22. def __init__(self, config: Optional[ChromaDbConfig] = None):
  23. """Initialize a new ChromaDB instance
  24. :param config: Configuration options for Chroma, defaults to None
  25. :type config: Optional[ChromaDbConfig], optional
  26. """
  27. if config:
  28. self.config = config
  29. else:
  30. self.config = ChromaDbConfig()
  31. self.settings = Settings(anonymized_telemetry=False)
  32. self.settings.allow_reset = self.config.allow_reset if hasattr(self.config, "allow_reset") else False
  33. if self.config.chroma_settings:
  34. for key, value in self.config.chroma_settings.items():
  35. if hasattr(self.settings, key):
  36. setattr(self.settings, key, value)
  37. if self.config.host and self.config.port:
  38. logging.info(f"Connecting to ChromaDB server: {self.config.host}:{self.config.port}")
  39. self.settings.chroma_server_host = self.config.host
  40. self.settings.chroma_server_http_port = self.config.port
  41. self.settings.chroma_api_impl = "chromadb.api.fastapi.FastAPI"
  42. else:
  43. if self.config.dir is None:
  44. self.config.dir = "db"
  45. self.settings.persist_directory = self.config.dir
  46. self.settings.is_persistent = True
  47. self.client = chromadb.Client(self.settings)
  48. super().__init__(config=self.config)
  49. def _initialize(self):
  50. """
  51. This method is needed because `embedder` attribute needs to be set externally before it can be initialized.
  52. """
  53. if not self.embedder:
  54. raise ValueError(
  55. "Embedder not set. Please set an embedder with `_set_embedder()` function before initialization."
  56. )
  57. self._get_or_create_collection(self.config.collection_name)
  58. def _get_or_create_db(self):
  59. """Called during initialization"""
  60. return self.client
  61. def _generate_where_clause(self, where: Dict[str, any]) -> str:
  62. # If only one filter is supplied, return it as is
  63. # (no need to wrap in $and based on chroma docs)
  64. if len(where.keys()) == 1:
  65. return where
  66. where_filters = []
  67. for k, v in where.items():
  68. if isinstance(v, str):
  69. where_filters.append({k: v})
  70. return {"$and": where_filters}
  71. def _get_or_create_collection(self, name: str) -> Collection:
  72. """
  73. Get or create a named collection.
  74. :param name: Name of the collection
  75. :type name: str
  76. :raises ValueError: No embedder configured.
  77. :return: Created collection
  78. :rtype: Collection
  79. """
  80. if not hasattr(self, "embedder") or not self.embedder:
  81. raise ValueError("Cannot create a Chroma database collection without an embedder.")
  82. self.collection = self.client.get_or_create_collection(
  83. name=name,
  84. embedding_function=self.embedder.embedding_fn,
  85. )
  86. return self.collection
  87. def get(self, ids: Optional[List[str]] = None, where: Optional[Dict[str, any]] = None, limit: Optional[int] = None):
  88. """
  89. Get existing doc ids present in vector database
  90. :param ids: list of doc ids to check for existence
  91. :type ids: List[str]
  92. :param where: Optional. to filter data
  93. :type where: Dict[str, Any]
  94. :param limit: Optional. maximum number of documents
  95. :type limit: Optional[int]
  96. :return: Existing documents.
  97. :rtype: List[str]
  98. """
  99. args = {}
  100. if ids:
  101. args["ids"] = ids
  102. if where:
  103. args["where"] = self._generate_where_clause(where)
  104. if limit:
  105. args["limit"] = limit
  106. return self.collection.get(**args)
  107. def add(
  108. self,
  109. embeddings: List[List[float]],
  110. documents: List[str],
  111. metadatas: List[object],
  112. ids: List[str],
  113. skip_embedding: bool,
  114. ) -> Any:
  115. """
  116. Add vectors to chroma database
  117. :param embeddings: list of embeddings to add
  118. :type embeddings: List[List[str]]
  119. :param documents: Documents
  120. :type documents: List[str]
  121. :param metadatas: Metadatas
  122. :type metadatas: List[object]
  123. :param ids: ids
  124. :type ids: List[str]
  125. :param skip_embedding: Optional. If True, then the embeddings are assumed to be already generated.
  126. :type skip_embedding: bool
  127. """
  128. size = len(documents)
  129. if skip_embedding and (embeddings is None or len(embeddings) != len(documents)):
  130. raise ValueError("Cannot add documents to chromadb with inconsistent embeddings")
  131. if len(documents) != size or len(metadatas) != size or len(ids) != size:
  132. raise ValueError(
  133. "Cannot add documents to chromadb with inconsistent sizes. Documents size: {}, Metadata size: {},"
  134. " Ids size: {}".format(len(documents), len(metadatas), len(ids))
  135. )
  136. for i in range(0, len(documents), self.BATCH_SIZE):
  137. print("Inserting batches from {} to {} in chromadb".format(i, min(len(documents), i + self.BATCH_SIZE)))
  138. if skip_embedding:
  139. self.collection.add(
  140. embeddings=embeddings[i : i + self.BATCH_SIZE],
  141. documents=documents[i : i + self.BATCH_SIZE],
  142. metadatas=metadatas[i : i + self.BATCH_SIZE],
  143. ids=ids[i : i + self.BATCH_SIZE],
  144. )
  145. else:
  146. self.collection.add(
  147. documents=documents[i : i + self.BATCH_SIZE],
  148. metadatas=metadatas[i : i + self.BATCH_SIZE],
  149. ids=ids[i : i + self.BATCH_SIZE],
  150. )
  151. def _format_result(self, results: QueryResult) -> list[tuple[Document, float]]:
  152. """
  153. Format Chroma results
  154. :param results: ChromaDB query results to format.
  155. :type results: QueryResult
  156. :return: Formatted results
  157. :rtype: list[tuple[Document, float]]
  158. """
  159. return [
  160. (Document(page_content=result[0], metadata=result[1] or {}), result[2])
  161. for result in zip(
  162. results["documents"][0],
  163. results["metadatas"][0],
  164. results["distances"][0],
  165. )
  166. ]
  167. def query(
  168. self,
  169. input_query: List[str],
  170. n_results: int,
  171. where: Dict[str, any],
  172. skip_embedding: bool,
  173. citations: bool = False,
  174. ) -> Union[List[Tuple[str, str, str]], List[str]]:
  175. """
  176. Query contents from vector database based on vector similarity
  177. :param input_query: list of query string
  178. :type input_query: List[str]
  179. :param n_results: no of similar documents to fetch from database
  180. :type n_results: int
  181. :param where: to filter data
  182. :type where: Dict[str, Any]
  183. :param skip_embedding: Optional. If True, then the input_query is assumed to be already embedded.
  184. :type skip_embedding: bool
  185. :param citations: we use citations boolean param to return context along with the answer.
  186. :type citations: bool, default is False.
  187. :raises InvalidDimensionException: Dimensions do not match.
  188. :return: The content of the document that matched your query,
  189. along with url of the source and doc_id (if citations flag is true)
  190. :rtype: List[str], if citations=False, otherwise List[Tuple[str, str, str]]
  191. """
  192. try:
  193. if skip_embedding:
  194. result = self.collection.query(
  195. query_embeddings=[
  196. input_query,
  197. ],
  198. n_results=n_results,
  199. where=where,
  200. )
  201. else:
  202. result = self.collection.query(
  203. query_texts=[
  204. input_query,
  205. ],
  206. n_results=n_results,
  207. where=where,
  208. )
  209. except InvalidDimensionException as e:
  210. raise InvalidDimensionException(
  211. e.message()
  212. + ". This is commonly a side-effect when an embedding function, different from the one used to add the"
  213. " embeddings, is used to retrieve an embedding from the database."
  214. ) from None
  215. results_formatted = self._format_result(result)
  216. contexts = []
  217. for result in results_formatted:
  218. context = result[0].page_content
  219. if citations:
  220. metadata = result[0].metadata
  221. source = metadata["url"]
  222. doc_id = metadata["doc_id"]
  223. contexts.append((context, source, doc_id))
  224. else:
  225. contexts.append(context)
  226. return contexts
  227. def set_collection_name(self, name: str):
  228. """
  229. Set the name of the collection. A collection is an isolated space for vectors.
  230. :param name: Name of the collection.
  231. :type name: str
  232. """
  233. if not isinstance(name, str):
  234. raise TypeError("Collection name must be a string")
  235. self.config.collection_name = name
  236. self._get_or_create_collection(self.config.collection_name)
  237. def count(self) -> int:
  238. """
  239. Count number of documents/chunks embedded in the database.
  240. :return: number of documents
  241. :rtype: int
  242. """
  243. return self.collection.count()
  244. def delete(self, where):
  245. return self.collection.delete(where=where)
  246. def reset(self):
  247. """
  248. Resets the database. Deletes all embeddings irreversibly.
  249. """
  250. # Delete all data from the collection
  251. try:
  252. self.client.delete_collection(self.config.collection_name)
  253. except ValueError:
  254. raise ValueError(
  255. "For safety reasons, resetting is disabled. "
  256. "Please enable it by setting `allow_reset=True` in your ChromaDbConfig"
  257. ) from None
  258. # Recreate
  259. self._get_or_create_collection(self.config.collection_name)
  260. # Todo: Automatically recreating a collection with the same name cannot be the best way to handle a reset.
  261. # A downside of this implementation is, if you have two instances,
  262. # the other instance will not get the updated `self.collection` attribute.
  263. # A better way would be to create the collection if it is called again after being reset.
  264. # That means, checking if collection exists in the db-consuming methods, and creating it if it doesn't.
  265. # That's an extra steps for all uses, just to satisfy a niche use case in a niche method. For now, this will do.