from fastapi import FastAPI, Request, Depends, Query from fastapi.templating import Jinja2Templates import databases import sqlalchemy from sqlalchemy.sql import text from sqlalchemy.sql import select, func # Import select and func from sqlalchemy.sql from typing import Optional import datetime DATABASE_URL = "sqlite:///database.db" database = databases.Database(DATABASE_URL) metadata = sqlalchemy.MetaData() your_table = sqlalchemy.Table( "your_table", metadata, sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True), sqlalchemy.Column("item_name", sqlalchemy.String), sqlalchemy.Column("cust_id", sqlalchemy.Integer), sqlalchemy.Column("cust_name", sqlalchemy.String), sqlalchemy.Column("start_date", sqlalchemy.Date), ) engine = sqlalchemy.create_engine(DATABASE_URL) metadata.create_all(engine) app = FastAPI() # Initialize Jinja2 templates templates = Jinja2Templates(directory="templates") @app.on_event("startup") async def startup_database(): await database.connect() @app.on_event("shutdown") async def shutdown_database(): await database.disconnect() @app.get("/") async def home(request: Request): return templates.TemplateResponse("base.html", {"request": request}) @app.get("/search/") async def search_results( request: Request, item_name: Optional[str] = None, cust_id: Optional[str] = Query(None, alias="cust_id"), cust_name: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = Query(None, alias="end_date"), page: int = Query(1, alias="page"), page_size: int = Query(50, alias="page_size"), ): # Construct the base query base_query = select([your_table]) if item_name: base_query = base_query.where(your_table.c.item_name.ilike(f"%{item_name}%")) if cust_id is not None and cust_id.isdigit(): base_query = base_query.where(your_table.c.cust_id == int(cust_id)) if cust_name: base_query = base_query.where(your_table.c.cust_name.ilike(f"%{cust_name}%")) if start_date and end_date: start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date() end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date() base_query = base_query.where(your_table.c.start_date.between(start_date, end_date)) elif start_date: start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date() base_query = base_query.where(your_table.c.start_date == start_date) # if start_date: # base_query = base_query.where(your_table.c.start_date == start_date) # Fetch all results into memory all_results = await database.fetch_all(base_query) # Calculate pagination total_results = len(all_results) total_pages = (total_results + page_size - 1) // page_size # Apply pagination to the in-memory results offset = (page - 1) * page_size results = all_results[offset : offset + page_size] return templates.TemplateResponse( "search_results.html", { "request": request, "results": results, "total_pages": total_pages, "current_page": page, "page_size": page_size, "input_values": { "item_name": item_name, "cust_id": cust_id, "cust_name": cust_name, "start_date": start_date, "end_date": end_date, }, }, )