123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- import logging
- import os
- import aiofiles
- import yaml
- from database import Base, SessionLocal, engine
- from fastapi import Depends, FastAPI, HTTPException, UploadFile
- from models import DefaultResponse, DeployAppRequest, QueryApp, SourceApp
- from services import get_app, get_apps, remove_app, save_app
- from sqlalchemy.orm import Session
- from utils import generate_error_message_for_api_keys
- from embedchain import App
- from embedchain.client import Client
- Base.metadata.create_all(bind=engine)
- def get_db():
- db = SessionLocal()
- try:
- yield db
- finally:
- db.close()
- app = FastAPI(
- title="Embedchain REST API",
- description="This is the REST API for Embedchain.",
- version="0.0.1",
- license_info={
- "name": "Apache 2.0",
- "url": "https://github.com/embedchain/embedchain/blob/main/LICENSE",
- },
- )
- @app.get("/ping", tags=["Utility"])
- def check_status():
- """
- Endpoint to check the status of the API
- """
- return {"ping": "pong"}
- @app.get("/apps", tags=["Apps"])
- async def get_all_apps(db: Session = Depends(get_db)):
- """
- Get all apps.
- """
- apps = get_apps(db)
- return {"results": apps}
- @app.post("/create", tags=["Apps"], response_model=DefaultResponse)
- async def create_app_using_default_config(app_id: str, config: UploadFile = None, db: Session = Depends(get_db)):
- """
- Create a new app using App ID.
- If you don't provide a config file, Embedchain will use the default config file\n
- which uses opensource GPT4ALL model.\n
- app_id: The ID of the app.\n
- config: The YAML config file to create an App.\n
- """
- try:
- if app_id is None:
- raise HTTPException(detail="App ID not provided.", status_code=400)
- if get_app(db, app_id) is not None:
- raise HTTPException(detail=f"App with id '{app_id}' already exists.", status_code=400)
- yaml_path = "default.yaml"
- if config is not None:
- contents = await config.read()
- try:
- yaml.safe_load(contents)
- # TODO: validate the config yaml file here
- yaml_path = f"configs/{app_id}.yaml"
- async with aiofiles.open(yaml_path, mode="w") as file_out:
- await file_out.write(str(contents, "utf-8"))
- except yaml.YAMLError as exc:
- raise HTTPException(detail=f"Error parsing YAML: {exc}", status_code=400)
- save_app(db, app_id, yaml_path)
- return DefaultResponse(response=f"App created successfully. App ID: {app_id}")
- except Exception as e:
- logging.warning(str(e))
- raise HTTPException(detail=f"Error creating app: {str(e)}", status_code=400)
- @app.get(
- "/{app_id}/data",
- tags=["Apps"],
- )
- async def get_datasources_associated_with_app_id(app_id: str, db: Session = Depends(get_db)):
- """
- Get all data sources for an app.\n
- app_id: The ID of the app. Use "default" for the default app.\n
- """
- try:
- if app_id is None:
- raise HTTPException(
- detail="App ID not provided. If you want to use the default app, use 'default' as the app_id.",
- status_code=400,
- )
- db_app = get_app(db, app_id)
- if db_app is None:
- raise HTTPException(detail=f"App with id {app_id} does not exist, please create it first.", status_code=400)
- app = App.from_config(config_path=db_app.config)
- response = app.get_data_sources()
- return {"results": response}
- except ValueError as ve:
- logging.warning(str(ve))
- raise HTTPException(
- detail=generate_error_message_for_api_keys(ve),
- status_code=400,
- )
- except Exception as e:
- logging.warning(str(e))
- raise HTTPException(detail=f"Error occurred: {str(e)}", status_code=400)
- @app.post(
- "/{app_id}/add",
- tags=["Apps"],
- response_model=DefaultResponse,
- )
- async def add_datasource_to_an_app(body: SourceApp, app_id: str, db: Session = Depends(get_db)):
- """
- Add a source to an existing app.\n
- app_id: The ID of the app. Use "default" for the default app.\n
- source: The source to add.\n
- data_type: The data type of the source. Remove it if you want Embedchain to detect it automatically.\n
- """
- try:
- if app_id is None:
- raise HTTPException(
- detail="App ID not provided. If you want to use the default app, use 'default' as the app_id.",
- status_code=400,
- )
- db_app = get_app(db, app_id)
- if db_app is None:
- raise HTTPException(detail=f"App with id {app_id} does not exist, please create it first.", status_code=400)
- app = App.from_config(config_path=db_app.config)
- response = app.add(source=body.source, data_type=body.data_type)
- return DefaultResponse(response=response)
- except ValueError as ve:
- logging.warning(str(ve))
- raise HTTPException(
- detail=generate_error_message_for_api_keys(ve),
- status_code=400,
- )
- except Exception as e:
- logging.warning(str(e))
- raise HTTPException(detail=f"Error occurred: {str(e)}", status_code=400)
- @app.post(
- "/{app_id}/query",
- tags=["Apps"],
- response_model=DefaultResponse,
- )
- async def query_an_app(body: QueryApp, app_id: str, db: Session = Depends(get_db)):
- """
- Query an existing app.\n
- app_id: The ID of the app. Use "default" for the default app.\n
- query: The query that you want to ask the App.\n
- """
- try:
- if app_id is None:
- raise HTTPException(
- detail="App ID not provided. If you want to use the default app, use 'default' as the app_id.",
- status_code=400,
- )
- db_app = get_app(db, app_id)
- if db_app is None:
- raise HTTPException(detail=f"App with id {app_id} does not exist, please create it first.", status_code=400)
- app = App.from_config(config_path=db_app.config)
- response = app.query(body.query)
- return DefaultResponse(response=response)
- except ValueError as ve:
- logging.warning(str(ve))
- raise HTTPException(
- detail=generate_error_message_for_api_keys(ve),
- status_code=400,
- )
- except Exception as e:
- logging.warning(str(e))
- raise HTTPException(detail=f"Error occurred: {str(e)}", status_code=400)
- # FIXME: The chat implementation of Embedchain needs to be modified to work with the REST API.
- # @app.post(
- # "/{app_id}/chat",
- # tags=["Apps"],
- # response_model=DefaultResponse,
- # )
- # async def chat_with_an_app(body: MessageApp, app_id: str, db: Session = Depends(get_db)):
- # """
- # Query an existing app.\n
- # app_id: The ID of the app. Use "default" for the default app.\n
- # message: The message that you want to send to the App.\n
- # """
- # try:
- # if app_id is None:
- # raise HTTPException(
- # detail="App ID not provided. If you want to use the default app, use 'default' as the app_id.",
- # status_code=400,
- # )
- # db_app = get_app(db, app_id)
- # if db_app is None:
- # raise HTTPException(
- # detail=f"App with id {app_id} does not exist, please create it first.",
- # status_code=400
- # )
- # app = App.from_config(config_path=db_app.config)
- # response = app.chat(body.message)
- # return DefaultResponse(response=response)
- # except ValueError as ve:
- # raise HTTPException(
- # detail=generate_error_message_for_api_keys(ve),
- # status_code=400,
- # )
- # except Exception as e:
- # raise HTTPException(detail=f"Error occurred: {str(e)}", status_code=400)
- @app.post(
- "/{app_id}/deploy",
- tags=["Apps"],
- response_model=DefaultResponse,
- )
- async def deploy_app(body: DeployAppRequest, app_id: str, db: Session = Depends(get_db)):
- """
- Query an existing app.\n
- app_id: The ID of the app. Use "default" for the default app.\n
- api_key: The API key to use for deployment. If not provided,
- Embedchain will use the API key previously used (if any).\n
- """
- try:
- if app_id is None:
- raise HTTPException(
- detail="App ID not provided. If you want to use the default app, use 'default' as the app_id.",
- status_code=400,
- )
- db_app = get_app(db, app_id)
- if db_app is None:
- raise HTTPException(detail=f"App with id {app_id} does not exist, please create it first.", status_code=400)
- app = App.from_config(config_path=db_app.config)
- api_key = body.api_key
- # this will save the api key in the embedchain.db
- Client(api_key=api_key)
- app.deploy()
- return DefaultResponse(response="App deployed successfully.")
- except ValueError as ve:
- logging.warning(str(ve))
- raise HTTPException(
- detail=generate_error_message_for_api_keys(ve),
- status_code=400,
- )
- except Exception as e:
- logging.warning(str(e))
- raise HTTPException(detail=f"Error occurred: {str(e)}", status_code=400)
- @app.delete(
- "/{app_id}/delete",
- tags=["Apps"],
- response_model=DefaultResponse,
- )
- async def delete_app(app_id: str, db: Session = Depends(get_db)):
- """
- Delete an existing app.\n
- app_id: The ID of the app to be deleted.
- """
- try:
- if app_id is None:
- raise HTTPException(
- detail="App ID not provided. If you want to use the default app, use 'default' as the app_id.",
- status_code=400,
- )
- db_app = get_app(db, app_id)
- if db_app is None:
- raise HTTPException(detail=f"App with id {app_id} does not exist, please create it first.", status_code=400)
- app = App.from_config(config_path=db_app.config)
- # reset app.db
- app.db.reset()
- remove_app(db, app_id)
- return DefaultResponse(response=f"App with id {app_id} deleted successfully.")
- except Exception as e:
- raise HTTPException(detail=f"Error occurred: {str(e)}", status_code=400)
- if __name__ == "__main__":
- import uvicorn
- is_dev = os.getenv("DEVELOPMENT", "False")
- uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=bool(is_dev))
|