Building an Image Compression Web App using FastAPI, Celery, and NextJS, Part 1: Backend.
Table of contents
This is my first article on @hashnode, and I'm glad they make it easy to write and let me back up my articles to GitHub. The purpose of this article is to show the tech stacks I often use. I don't want to dive into the technical aspects of this web app; I prefer it to be dead simple.I really recommend to anyone finding this article difficult to hop into Bjoern Stiel's post about FastAPI, Celery, and HTMX.
At this occasion, we will create a python backend using FastAPI and Celery to manage image compression task. The idea is to provide the frontend a way to upload the images, get the progress, then get back the result.
First, clone the code from my github repo. Please be familiar with the project structure:
bit@KAUST2024:~/Playground/tensoru/image_compression_webapp$ tree -L 1
.
├── __pycache__
├── celery_app.py
├── freeze.txt
├── main.py
├── requirements.txt
├── uploads
└── venv
venv
directory while your cloned code doesn't have it? See What is a virtualenv, and why should I use one?I put FastAPI instance inside main.py
and Celery instance inside celery_app.py
.
FastAPI backend
There are three endpoints currently needed for our smooth operation:
Upload endpoint. This endpoint will save uploaded file to uploads folder with a random file name, then immediately call
compress_image
function to celery.@app.post("/upload") async def upload( quality: int = Form(default=..., le=100, ge=20), file: UploadFile = File(...) ): random_uuid = uuid.uuid4().hex temporary_file_path = f"{TEMP_DIR}/{random_uuid}_{file.filename}" os.makedirs(os.path.dirname(temporary_file_path), exist_ok=True) async with aiofiles.open(temporary_file_path, "wb") as buffer: while content := await file.read(1024): await buffer.write(content) if not filetype.helpers.is_image(temporary_file_path): return ErrorResponseSchema(message="File is not an image", code=400) task = compress_image.delay(temporary_file_path, quality) # type: ignore return ResponseSchema( data={"task_id": task.id}, message="File uploaded successfully" )
Inside
upload
function above we do three things:Creating a random temporary file name. This is a must because if hundreds of people upload hundreds of images, original file name even like "Jakarta_20240801_0001.JPEG" is possibly not unique. Hence it is better to append a unique UUID to it.
Save uploaded file to disk. Instead of waiting a whole file loaded into memory then save it, reading and writing in 1 MB chunk is preferred.
💡You must be noticed that I am usingaiofiles
to handle file IO. It is because I am insisted using async function for the upload endpoint. Using regularfile.read
will require me to use synchronous function in order to not blocking the main thread.Checking file type. Module
filetype
provides way to infer file type using file's magic number. We can't rely on file extension, otherwise some kids will upload zipped roblox games to compress it.Pass the image to
compress_image
function which later executed by celery worker.
Get status endpoint. Task queue system like celery will take care of thousands of requests using limited amount of resources. In this article I preferred dead simple way for client to get a task status. In another occasion we will implement SSE or webhook to notify client about the tasks.
@app.get("/task/{task_id}") async def get_status(task_id: str): try: r = celery_app.AsyncResult(task_id) result = r.result if isinstance(result, CeleryError): result = repr(result) task_response = TaskOut(id=r.task_id, status=r.status, result=result).dict() return ResponseSchema( data=task_response, message="Task status retrieved successfully", ) except Exception: return ErrorResponseSchema( code=500, message="Something went wrong while getting Task status" )
Inside
get_status
function above only have one purpose: getting celery result then check if the result is an error or not. While checking if the result is an instance ofCeleryError
may not a proper way to handle it, but it is the most simple way to do it.Download endpoint. After a task is successfully executed, this endpoint provides way to retrieve compressed file from the server.
@app.get("/download/{task_id}") def download(task_id: str): task = celery_app.AsyncResult(task_id) if task.status == "SUCCESS": result = task.result if result is None: return ErrorResponseSchema( message="File is already deleted, please try to upload again", code=500 ) file_path = result["file_path"] if not os.path.isfile(file_path): return ErrorResponseSchema( message="File is not exist, please try to upload again.", code=400 ) kind = filetype.guess(file_path) media_type = "application/octet-stream" if kind: media_type = kind.mime file_name = os.path.basename(file_path) file_name = file_name.split("_", 1)[1] return FileResponse( file_path, media_type=media_type, filename=os.path.basename(file_path) ) return ErrorResponseSchema( message="File is not ready for download yet. Please try again later", code=400 )
In
download
function above, again there are four operations:Check if the task has
file_path
.Check if
file_path
is exist.Check file's MIME, or simply media type. Why? Because we accept ranges of images format: JPEG, PNG, etc. Hence it is important to us to give information back to browser what image they are getting.
Return the file. Of course without random UUID prefix.
Celery tasks
Function listed below are function registered for celery worker to run. There are two celery task function:
compress_image
: This function will take care of image compression. This function is the reason we use celery since it will takes most resources.delete_task_result
: This function will delete the task result, including the actual file. This one is important since we need to clean up piling old, unused images.
As you noticed there are two more functions which are signal handlers. Depending on the task result, it will invoke delete_task_result
function immediately in case of failure or keeping it for a longer time for user to download.
@app.task(queue="tnr_imgcmprs_queue", name="compress_image")
def compress_image(file_path: str, quality: int = 50):
img = Image.open(file_path)
old_filesize = os.path.getsize(file_path)
img.save(file_path, quality=quality)
new_filesize = os.path.getsize(file_path)
return {
"file_path": file_path,
"old_filesize": old_filesize,
"new_filesize": new_filesize,
}
@app.task(queue="tnr_imgcmprs_queue", name="delete_task_result")
def delete_task_result(task_id):
result = app.AsyncResult(task_id)
file_path, *_ = result.args
if file_path and os.path.isfile(file_path):
os.remove(file_path)
result.forget()
@signals.task_success.connect(sender=compress_image)
def task_success_handler(sender, result, **kwargs):
task_id = sender.request.correlation_id
delete_task_result.apply_async((task_id,), countdown=60 * 30) # type: ignore
@signals.task_failure.connect(sender=compress_image)
def task_failure_handler(sender, task_id, **kwargs):
task_id = task_id
delete_task_result.apply_async((task_id,), countdown=0) # type: ignore
Run the backend
If you are using Visual Studio Code, I included launch.json
for you to run the backend. Otherwise both FastAPI and Celery instances can be launched by executing these command separately:
uvicorn main:app
celery -A celery_app:app worker -l info -n tnr_imgcmprs -Q tnr_imgcmprs_queue
In the next article next week, we will try to build the frontend for this backend.