mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-08 18:42:37 +07:00
106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
from fastapi import FastAPI, Request, Depends, Query
|
|
from fastapi.templating import Jinja2Templates
|
|
import databases
|
|
import sqlalchemy
|
|
from sqlalchemy.sql import text
|
|
from sqlalchemy.sql import select, func # Import select and func from sqlalchemy.sql
|
|
from typing import Optional
|
|
import datetime
|
|
|
|
DATABASE_URL = "sqlite:///database.db"
|
|
|
|
database = databases.Database(DATABASE_URL)
|
|
metadata = sqlalchemy.MetaData()
|
|
|
|
your_table = sqlalchemy.Table(
|
|
"your_table",
|
|
metadata,
|
|
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
|
|
sqlalchemy.Column("item_name", sqlalchemy.String),
|
|
sqlalchemy.Column("cust_id", sqlalchemy.Integer),
|
|
sqlalchemy.Column("cust_name", sqlalchemy.String),
|
|
sqlalchemy.Column("start_date", sqlalchemy.Date),
|
|
)
|
|
|
|
engine = sqlalchemy.create_engine(DATABASE_URL)
|
|
metadata.create_all(engine)
|
|
|
|
app = FastAPI()
|
|
|
|
# Initialize Jinja2 templates
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
@app.on_event("startup")
|
|
async def startup_database():
|
|
await database.connect()
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_database():
|
|
await database.disconnect()
|
|
|
|
@app.get("/")
|
|
async def home(request: Request):
|
|
return templates.TemplateResponse("base.html", {"request": request})
|
|
|
|
@app.get("/search/")
|
|
async def search_results(
|
|
request: Request,
|
|
item_name: Optional[str] = None,
|
|
cust_id: Optional[str] = Query(None, alias="cust_id"),
|
|
cust_name: Optional[str] = None,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = Query(None, alias="end_date"),
|
|
page: int = Query(1, alias="page"),
|
|
page_size: int = Query(50, alias="page_size"),
|
|
):
|
|
# Construct the base query
|
|
base_query = select([your_table])
|
|
|
|
if item_name:
|
|
base_query = base_query.where(your_table.c.item_name.ilike(f"%{item_name}%"))
|
|
if cust_id is not None and cust_id.isdigit():
|
|
base_query = base_query.where(your_table.c.cust_id == int(cust_id))
|
|
if cust_name:
|
|
base_query = base_query.where(your_table.c.cust_name.ilike(f"%{cust_name}%"))
|
|
|
|
if start_date and end_date:
|
|
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
|
|
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
|
|
base_query = base_query.where(your_table.c.start_date.between(start_date, end_date))
|
|
elif start_date:
|
|
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
|
|
base_query = base_query.where(your_table.c.start_date == start_date)
|
|
|
|
# if start_date:
|
|
# base_query = base_query.where(your_table.c.start_date == start_date)
|
|
|
|
# Fetch all results into memory
|
|
all_results = await database.fetch_all(base_query)
|
|
|
|
# Calculate pagination
|
|
total_results = len(all_results)
|
|
total_pages = (total_results + page_size - 1) // page_size
|
|
|
|
# Apply pagination to the in-memory results
|
|
offset = (page - 1) * page_size
|
|
results = all_results[offset : offset + page_size]
|
|
|
|
return templates.TemplateResponse(
|
|
"search_results.html",
|
|
{
|
|
"request": request,
|
|
"results": results,
|
|
"total_pages": total_pages,
|
|
"current_page": page,
|
|
"page_size": page_size,
|
|
"input_values": {
|
|
"item_name": item_name,
|
|
"cust_id": cust_id,
|
|
"cust_name": cust_name,
|
|
"start_date": start_date,
|
|
"end_date": end_date,
|
|
},
|
|
},
|
|
)
|
|
|