Jelajahi Sumber

Add simple app functionality

This commit enables anyone to create a app and add 3 types of data
sources:

* pdf file
* youtube video
* website

It exposes a function called query which first gets similar docs from
vector db and then passes it to LLM to get the final answer.
Taranjeet Singh 2 tahun lalu
induk
melakukan
468db83337

+ 0 - 0
embedchain/__init__.py


+ 0 - 0
embedchain/chunkers/__init__.py


+ 36 - 0
embedchain/chunkers/pdf_file.py

@@ -0,0 +1,36 @@
+import hashlib
+
+from langchain.text_splitter import RecursiveCharacterTextSplitter
+
+
+TEXT_SPLITTER_CHUNK_PARAMS = {
+    "chunk_size": 1000,
+    "chunk_overlap": 0,
+    "length_function": len,
+}
+
+TEXT_SPLITTER = RecursiveCharacterTextSplitter(**TEXT_SPLITTER_CHUNK_PARAMS)
+
+
+class PdfFileChunker:
+
+    def create_chunks(self, loader, url):
+        documents = []
+        ids = []
+        datas = loader.load_data(url)
+        metadatas = []
+        for data in datas:
+            content = data["content"]
+            meta_data = data["meta_data"]
+            chunks = TEXT_SPLITTER.split_text(content)
+            url = meta_data["url"]
+            for chunk in chunks:
+                chunk_id = hashlib.sha256((chunk + url).encode()).hexdigest()
+                ids.append(chunk_id)
+                documents.append(chunk)
+                metadatas.append(meta_data)
+        return {
+            "documents": documents,
+            "ids": ids,
+            "metadatas": metadatas,
+        }

+ 36 - 0
embedchain/chunkers/website.py

@@ -0,0 +1,36 @@
+import hashlib
+
+from langchain.text_splitter import RecursiveCharacterTextSplitter
+
+
+TEXT_SPLITTER_CHUNK_PARAMS = {
+    "chunk_size": 500,
+    "chunk_overlap": 0,
+    "length_function": len,
+}
+
+TEXT_SPLITTER = RecursiveCharacterTextSplitter(**TEXT_SPLITTER_CHUNK_PARAMS)
+
+
+class WebsiteChunker:
+
+    def create_chunks(self, loader, url):
+        documents = []
+        ids = []
+        datas = loader.load_data(url)
+        metadatas = []
+        for data in datas:
+            content = data["content"]
+            meta_data = data["meta_data"]
+            chunks = TEXT_SPLITTER.split_text(content)
+            url = meta_data["url"]
+            for chunk in chunks:
+                chunk_id = hashlib.sha256((chunk + url).encode()).hexdigest()
+                ids.append(chunk_id)
+                documents.append(chunk)
+                metadatas.append(meta_data)
+        return {
+            "documents": documents,
+            "ids": ids,
+            "metadatas": metadatas,
+        }

+ 36 - 0
embedchain/chunkers/youtube_video.py

@@ -0,0 +1,36 @@
+import hashlib
+
+from langchain.text_splitter import RecursiveCharacterTextSplitter
+
+
+TEXT_SPLITTER_CHUNK_PARAMS = {
+    "chunk_size": 2000,
+    "chunk_overlap": 0,
+    "length_function": len,
+}
+
+TEXT_SPLITTER = RecursiveCharacterTextSplitter(**TEXT_SPLITTER_CHUNK_PARAMS)
+
+
+class YoutubeVideoChunker:
+
+    def create_chunks(self, loader, url):
+        documents = []
+        ids = []
+        datas = loader.load_data(url)
+        metadatas = []
+        for data in datas:
+            content = data["content"]
+            meta_data = data["meta_data"]
+            chunks = TEXT_SPLITTER.split_text(content)
+            url = meta_data["url"]
+            for chunk in chunks:
+                chunk_id = hashlib.sha256((chunk + url).encode()).hexdigest()
+                ids.append(chunk_id)
+                documents.append(chunk)
+                metadatas.append(meta_data)
+        return {
+            "documents": documents,
+            "ids": ids,
+            "metadatas": metadatas,
+        }

+ 136 - 0
embedchain/embedchain.py

@@ -0,0 +1,136 @@
+import chromadb
+import openai
+import os
+
+from chromadb.utils import embedding_functions
+from dotenv import load_dotenv
+from langchain.docstore.document import Document
+from langchain.embeddings.openai import OpenAIEmbeddings
+
+from embedchain.loaders.youtube_video import YoutubeVideoLoader
+from embedchain.loaders.pdf_file import PdfFileLoader
+from embedchain.loaders.website import WebsiteLoader
+from embedchain.chunkers.youtube_video import YoutubeVideoChunker
+from embedchain.chunkers.pdf_file import PdfFileChunker
+from embedchain.chunkers.website import WebsiteChunker
+
+load_dotenv()
+
+embeddings = OpenAIEmbeddings()
+
+ABS_PATH = os.getcwd()
+DB_DIR = os.path.join(ABS_PATH, "db")
+
+openai_ef = embedding_functions.OpenAIEmbeddingFunction(
+    api_key=os.getenv("OPENAI_API_KEY"),
+    model_name="text-embedding-ada-002"
+)
+
+
+class EmbedChain:
+    def __init__(self):
+        self.chromadb_client = self._get_or_create_db()
+        self.collection = self._get_or_create_collection()
+        self.user_asks = []
+
+    def _get_loader(self, data_type):
+        loaders = {
+            'youtube_video': YoutubeVideoLoader(),
+            'pdf_file': PdfFileLoader(),
+            'website': WebsiteLoader()
+        }
+        if data_type in loaders:
+            return loaders[data_type]
+        else:
+            raise ValueError(f"Unsupported data type: {data_type}")
+
+    def _get_chunker(self, data_type):
+        chunkers = {
+            'youtube_video': YoutubeVideoChunker(),
+            'pdf_file': PdfFileChunker(),
+            'website': WebsiteChunker()
+        }
+        if data_type in chunkers:
+            return chunkers[data_type]
+        else:
+            raise ValueError(f"Unsupported data type: {data_type}")
+
+    def add(self, data_type, url):
+        loader = self._get_loader(data_type)
+        chunker = self._get_chunker(data_type)
+        self.user_asks.append([data_type, url])
+        self.load_and_embed(loader, chunker, url)
+
+    def _get_or_create_db(self):
+        client_settings = chromadb.config.Settings(
+            chroma_db_impl="duckdb+parquet",
+            persist_directory=DB_DIR,
+            anonymized_telemetry=False
+        )
+        return chromadb.Client(client_settings)
+
+    def _get_or_create_collection(self):
+        return self.chromadb_client.get_or_create_collection(
+            'embedchain_store', embedding_function=openai_ef,
+        )
+
+    def load_embeddings_to_db(self, loader, chunker, url):
+        embeddings_data = chunker.create_chunks(loader, url)
+        documents = embeddings_data["documents"]
+        metadatas = embeddings_data["metadatas"]
+        ids = embeddings_data["ids"]
+        self.collection.add(
+            documents=documents,
+            metadatas=metadatas,
+            ids=ids
+        )
+        print(f"Docs count: {self.collection.count()}")
+
+    def load_and_embed(self, loader, chunker, url):
+        return self.load_embeddings_to_db(loader, chunker, url)
+
+    def _format_result(self, results):
+        return [
+            (Document(page_content=result[0], metadata=result[1] or {}), result[2])
+            for result in zip(
+                results["documents"][0],
+                results["metadatas"][0],
+                results["distances"][0],
+            )
+        ]
+
+    def get_openai_answer(self, prompt):
+        messages = []
+        messages.append({
+            "role": "user", "content": prompt
+        })
+        response = openai.ChatCompletion.create(
+            model="gpt-3.5-turbo-0613",
+            messages=messages,
+            temperature=0,
+            max_tokens=1000,
+            top_p=1,
+        )
+        return response["choices"][0]["message"]["content"]
+
+    def get_answer_from_llm(self, query, context):
+        prompt = f"""Use the following pieces of context to answer the query at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.
+        {context}
+        Query: {query}
+        Helpful Answer:
+        """
+        answer = self.get_openai_answer(prompt)
+        return answer
+
+    def query(self, input_query):
+        result = self.collection.query(
+            query_texts=[input_query,],
+            n_results=1,
+        )
+        result_formatted = self._format_result(result)
+        answer = self.get_answer_from_llm(input_query, result_formatted[0][0].page_content)
+        return answer
+
+
+class App(EmbedChain):
+    pass

+ 0 - 0
embedchain/loaders/__init__.py


+ 23 - 0
embedchain/loaders/pdf_file.py

@@ -0,0 +1,23 @@
+from langchain.document_loaders import PyPDFLoader
+
+from embedchain.utils import clean_string
+
+
+class PdfFileLoader:
+
+    def load_data(self, url):
+        loader = PyPDFLoader(url)
+        output = []
+        pages = loader.load_and_split()
+        if not len(pages):
+            raise ValueError("No data found")
+        for page in pages:
+            content = page.page_content
+            content = clean_string(content)
+            meta_data = page.metadata
+            meta_data["url"] = url
+            output.append({
+                "content": content,
+                "meta_data": meta_data,
+            })
+        return output

+ 30 - 0
embedchain/loaders/website.py

@@ -0,0 +1,30 @@
+import requests
+
+from bs4 import BeautifulSoup
+
+from embedchain.utils import clean_string
+
+
+class WebsiteLoader:
+
+    def load_data(self, url):
+        response = requests.get(url)
+        data = response.content
+        soup = BeautifulSoup(data, 'html.parser')
+        for tag in soup([
+            "nav", "aside", "form", "header",
+            "noscript", "svg", "canvas",
+            "footer", "script", "style"
+        ]):
+            tag.string = " "
+        output = []
+        content = soup.get_text()
+        content = clean_string(content)
+        meta_data = {
+            "url": url,
+        }
+        output.append({
+            "content": content,
+            "meta_data": meta_data,
+        })
+        return output

+ 22 - 0
embedchain/loaders/youtube_video.py

@@ -0,0 +1,22 @@
+from langchain.document_loaders import YoutubeLoader
+
+from embedchain.utils import clean_string
+
+
+class YoutubeVideoLoader:
+
+    def load_data(self, url):
+        loader = YoutubeLoader.from_youtube_url(url, add_video_info=True)
+        doc = loader.load()
+        output = []
+        if not len(doc):
+            raise ValueError("No data found")
+        content = doc[0].page_content
+        content = clean_string(content)
+        meta_data = doc[0].metadata
+        meta_data["url"] = url
+        output.append({
+            "content": content,
+            "meta_data": meta_data,
+        })
+        return output

+ 10 - 0
embedchain/utils.py

@@ -0,0 +1,10 @@
+import re
+
+
+def clean_string(text):
+    text = text.replace('\n', ' ')
+    cleaned_text = re.sub(r'\s+', ' ', text.strip())
+    cleaned_text = cleaned_text.replace('\\', '')
+    cleaned_text = cleaned_text.replace('#', ' ')
+    cleaned_text = re.sub(r'([^\w\s])\1*', r'\1', cleaned_text)
+    return cleaned_text