dasaproject/main2.py
2023-09-27 15:49:36 +07:00

106 lines
3.4 KiB
Python

from fastapi import FastAPI, Request, Depends, Query
from fastapi.templating import Jinja2Templates
import databases
import sqlalchemy
from sqlalchemy.sql import text
from sqlalchemy.sql import select, func # Import select and func from sqlalchemy.sql
from typing import Optional
import datetime
DATABASE_URL = "sqlite:///database.db"
database = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
your_table = sqlalchemy.Table(
"your_table",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("item_name", sqlalchemy.String),
sqlalchemy.Column("cust_id", sqlalchemy.Integer),
sqlalchemy.Column("cust_name", sqlalchemy.String),
sqlalchemy.Column("start_date", sqlalchemy.Date),
)
engine = sqlalchemy.create_engine(DATABASE_URL)
metadata.create_all(engine)
app = FastAPI()
# Initialize Jinja2 templates
templates = Jinja2Templates(directory="templates")
@app.on_event("startup")
async def startup_database():
await database.connect()
@app.on_event("shutdown")
async def shutdown_database():
await database.disconnect()
@app.get("/")
async def home(request: Request):
return templates.TemplateResponse("base.html", {"request": request})
@app.get("/search/")
async def search_results(
request: Request,
item_name: Optional[str] = None,
cust_id: Optional[str] = Query(None, alias="cust_id"),
cust_name: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = Query(None, alias="end_date"),
page: int = Query(1, alias="page"),
page_size: int = Query(50, alias="page_size"),
):
# Construct the base query
base_query = select([your_table])
if item_name:
base_query = base_query.where(your_table.c.item_name.ilike(f"%{item_name}%"))
if cust_id is not None and cust_id.isdigit():
base_query = base_query.where(your_table.c.cust_id == int(cust_id))
if cust_name:
base_query = base_query.where(your_table.c.cust_name.ilike(f"%{cust_name}%"))
if start_date and end_date:
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
base_query = base_query.where(your_table.c.start_date.between(start_date, end_date))
elif start_date:
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
base_query = base_query.where(your_table.c.start_date == start_date)
# if start_date:
# base_query = base_query.where(your_table.c.start_date == start_date)
# Fetch all results into memory
all_results = await database.fetch_all(base_query)
# Calculate pagination
total_results = len(all_results)
total_pages = (total_results + page_size - 1) // page_size
# Apply pagination to the in-memory results
offset = (page - 1) * page_size
results = all_results[offset : offset + page_size]
return templates.TemplateResponse(
"search_results.html",
{
"request": request,
"results": results,
"total_pages": total_pages,
"current_page": page,
"page_size": page_size,
"input_values": {
"item_name": item_name,
"cust_id": cust_id,
"cust_name": cust_name,
"start_date": start_date,
"end_date": end_date,
},
},
)