Building an Image Compression Web App using FastAPI, Celery, and NextJS, Part 1: Backend.

Building an Image Compression Web App using FastAPI, Celery, and NextJS, Part 1: Backend.

This is my first article on @hashnode, and I'm glad they make it easy to write and let me back up my articles to GitHub. The purpose of this article is to show the tech stacks I often use. I don't want to dive into the technical aspects of this web app; I prefer it to be dead simple.I really recommend to anyone finding this article difficult to hop into Bjoern Stiel's post about FastAPI, Celery, and HTMX.

This article is not finished yet! As this writing, I am done with the backend part only. The code is available at https://github.com/faruqsandi/image_compression_webapp.

At this occasion, we will create a python backend using FastAPI and Celery to manage image compression task. The idea is to provide the frontend a way to upload the images, get the progress, then get back the result.

First, clone the code from my github repo. Please be familiar with the project structure:

bit@KAUST2024:~/Playground/tensoru/image_compression_webapp$ tree -L 1
.
├── __pycache__
├── celery_app.py
├── freeze.txt
├── main.py
├── requirements.txt
├── uploads
└── venv
💡
Not understand why the snippet above has venv directory while your cloned code doesn't have it? See What is a virtualenv, and why should I use one?

I put FastAPI instance inside main.py and Celery instance inside celery_app.py.

FastAPI backend

There are three endpoints currently needed for our smooth operation:

  1. Upload endpoint. This endpoint will save uploaded file to uploads folder with a random file name, then immediately call compress_image function to celery.

     @app.post("/upload")
     async def upload(
         quality: int = Form(default=..., le=100, ge=20), file: UploadFile = File(...)
     ):
         random_uuid = uuid.uuid4().hex
    
         temporary_file_path = f"{TEMP_DIR}/{random_uuid}_{file.filename}"
         os.makedirs(os.path.dirname(temporary_file_path), exist_ok=True)
         async with aiofiles.open(temporary_file_path, "wb") as buffer:
             while content := await file.read(1024):
                 await buffer.write(content)
    
         if not filetype.helpers.is_image(temporary_file_path):
             return ErrorResponseSchema(message="File is not an image", code=400)
    
         task = compress_image.delay(temporary_file_path, quality)  # type: ignore
         return ResponseSchema(
             data={"task_id": task.id}, message="File uploaded successfully"
         )
    

    Inside upload function above we do three things:

    1. Creating a random temporary file name. This is a must because if hundreds of people upload hundreds of images, original file name even like "Jakarta_20240801_0001.JPEG" is possibly not unique. Hence it is better to append a unique UUID to it.

    2. Save uploaded file to disk. Instead of waiting a whole file loaded into memory then save it, reading and writing in 1 MB chunk is preferred.

      💡
      You must be noticed that I am using aiofiles to handle file IO. It is because I am insisted using async function for the upload endpoint. Using regular file.read will require me to use synchronous function in order to not blocking the main thread.
    3. Checking file type. Module filetype provides way to infer file type using file's magic number. We can't rely on file extension, otherwise some kids will upload zipped roblox games to compress it.

    4. Pass the image to compress_image function which later executed by celery worker.

  2. Get status endpoint. Task queue system like celery will take care of thousands of requests using limited amount of resources. In this article I preferred dead simple way for client to get a task status. In another occasion we will implement SSE or webhook to notify client about the tasks.

     @app.get("/task/{task_id}")
     async def get_status(task_id: str):
         try:
             r = celery_app.AsyncResult(task_id)
    
             result = r.result
             if isinstance(result, CeleryError):
                 result = repr(result)
    
             task_response = TaskOut(id=r.task_id, status=r.status, result=result).dict()
             return ResponseSchema(
                 data=task_response,
                 message="Task status retrieved successfully",
             )
         except Exception:
             return ErrorResponseSchema(
                 code=500, message="Something went wrong while getting Task status"
             )
    

    Inside get_status function above only have one purpose: getting celery result then check if the result is an error or not. While checking if the result is an instance of CeleryError may not a proper way to handle it, but it is the most simple way to do it.

  3. Download endpoint. After a task is successfully executed, this endpoint provides way to retrieve compressed file from the server.

     @app.get("/download/{task_id}")
     def download(task_id: str):
         task = celery_app.AsyncResult(task_id)
         if task.status == "SUCCESS":
             result = task.result
             if result is None:
                 return ErrorResponseSchema(
                     message="File is already deleted, please try to upload again", code=500
                 )
    
             file_path = result["file_path"]
    
             if not os.path.isfile(file_path):
                 return ErrorResponseSchema(
                     message="File is not exist, please try to upload again.", code=400
                 )
    
             kind = filetype.guess(file_path)
             media_type = "application/octet-stream"
             if kind:
                 media_type = kind.mime
             file_name = os.path.basename(file_path)
             file_name = file_name.split("_", 1)[1]
             return FileResponse(
                 file_path, media_type=media_type, filename=os.path.basename(file_path)
             )
         return ErrorResponseSchema(
             message="File is not ready for download yet. Please try again later", code=400
         )
    

    In download function above, again there are four operations:

    1. Check if the task has file_path.

    2. Check if file_path is exist.

    3. Check file's MIME, or simply media type. Why? Because we accept ranges of images format: JPEG, PNG, etc. Hence it is important to us to give information back to browser what image they are getting.

    4. Return the file. Of course without random UUID prefix.

Celery tasks

Function listed below are function registered for celery worker to run. There are two celery task function:

  1. compress_image: This function will take care of image compression. This function is the reason we use celery since it will takes most resources.

  2. delete_task_result: This function will delete the task result, including the actual file. This one is important since we need to clean up piling old, unused images.

As you noticed there are two more functions which are signal handlers. Depending on the task result, it will invoke delete_task_result function immediately in case of failure or keeping it for a longer time for user to download.

@app.task(queue="tnr_imgcmprs_queue", name="compress_image")
def compress_image(file_path: str, quality: int = 50):
    img = Image.open(file_path)
    old_filesize = os.path.getsize(file_path)
    img.save(file_path, quality=quality)
    new_filesize = os.path.getsize(file_path)
    return {
        "file_path": file_path,
        "old_filesize": old_filesize,
        "new_filesize": new_filesize,
    }


@app.task(queue="tnr_imgcmprs_queue", name="delete_task_result")
def delete_task_result(task_id):
    result = app.AsyncResult(task_id)
    file_path, *_ = result.args
    if file_path and os.path.isfile(file_path):
        os.remove(file_path)
    result.forget()


@signals.task_success.connect(sender=compress_image)
def task_success_handler(sender, result, **kwargs):
    task_id = sender.request.correlation_id
    delete_task_result.apply_async((task_id,), countdown=60 * 30)  # type: ignore


@signals.task_failure.connect(sender=compress_image)
def task_failure_handler(sender, task_id, **kwargs):
    task_id = task_id
    delete_task_result.apply_async((task_id,), countdown=0)  # type: ignore

Run the backend

If you are using Visual Studio Code, I included launch.json for you to run the backend. Otherwise both FastAPI and Celery instances can be launched by executing these command separately:

uvicorn main:app
celery -A celery_app:app worker -l info -n tnr_imgcmprs -Q tnr_imgcmprs_queue

In the next article next week, we will try to build the frontend for this backend.