from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from fastapi import FastAPI, Request, UploadFile, Body from typing import Union # from icecream import install # import icecream # install() # from icecream import ic # ic.configureOutput(includeContext=True, ) import json from iteminventorydasa import QBStock import datetime from pyQBstockserverV1 import searchStockQB from qbgeneralsummaryreport import GeneralSummaryReportQuery as GSRQ import shutil from qbpurchaseorderquery import PurchaseOrderQuery import pdfexcel4DNwithxlrd from ItemInventoryQuery import ItemInventoryQuery from SO_to_Inv import readSO, CustomerQuery import os from QBClass.QBClasses import SalesOrderAdd import pprint # app = FastAPI() app = FastAPI(docs_url="/itemreceipt", redoc_url=None) templates = Jinja2Templates(directory="templates") @app.get('/', response_class=HTMLResponse) async def hello(request:Request, name:str=None): return templates.TemplateResponse("hello.html", {"request":request, "name":name}) # return "Hello Word" @app.get('/hello/{name}', response_class=HTMLResponse) async def hello(request:Request, name:str=None): return templates.TemplateResponse("hello.html", {"request":request, "name":name}) @app.get('/podistri/', response_class=HTMLResponse) async def hello(request:Request, item_name:str=None, cust_id:str=None, cust_name:str=None, start_date:str=None): print(request.query_params) q_dict = {'item_name':item_name, 'cust_id':cust_id, 'cust_name':cust_name, 'start_date':start_date} if not item_name: return templates.TemplateResponse("po_index.html", {"request":request, "name":item_name}) else: return templates.TemplateResponse("po_index.html", {"request":request, "name":item_name}) @app.get('/renew-iteminventory') async def renew_iteminventory(): filename = "ItemInventory\ItemInventory_FromQB.xlsx" item_inv = ItemInventoryQuery( IncludeRetElement=['FullName', 'DataExtRet'], MaxReturned=None) #MaxReturned=None means get ALL items QBXML = item_inv.create_QBXML() status, msg = item_inv.to_excel(filename) if status: return {'Info': f"Saved to {filename}.", 'Status':'DONE', 'msg':f"{msg}"} else: return {'Info': f"NOT Saved", 'Status':'ERROR', 'msg':msg} @app.get('/get-iteminventory-fullname') async def get_iteminventory_fullname(maxreturned:Union[int, None] = None): item_inv = ItemInventoryQuery( IncludeRetElement=['FullName'], MaxReturned=maxreturned) #MaxReturned=None means get ALL items QBXML = item_inv.create_QBXML() itu = item_inv.connect_to_quickbooks(item_inv.create_QBXML()) status, data = item_inv.to_json() if status: return data else: return {'Info': f"NOT Saved", 'Status':'ERROR', 'msg':data} @app.get('/get-customer-fullname') async def get_customer_fullname(maxreturned:Union[int, None] = None): print(f'{maxreturned = }') # item_inv = CustomerQuery( IncludeRetElement=['FullName'], MaxReturned=maxreturned) #MaxReturned=None means get ALL items customer = CustomerQuery.CustomerQuery(MaxReturned=maxreturned) # QBXML = customer.create_QBXML() itu = customer.create_customerquery_QBXML() status, data = customer.to_json() if status: return data else: return {'Info': f"NOT Saved", 'Status':'ERROR', 'msg':data} @app.post("/upload-file/") async def create_upload_file(uploaded_file: UploadFile): base_file_location = f"DN_Excel_files" status = "ERROR" if uploaded_file.filename.endswith(".xls") and uploaded_file.filename.startswith("TCO-DN"): folder_yearmonth = uploaded_file.filename.split('-') if len(folder_yearmonth)>3: folder_yearmonth = folder_yearmonth[2].strip() folder_location = f"{base_file_location}/{folder_yearmonth}" file_location = f"{base_file_location}/{folder_yearmonth}/{uploaded_file.filename}" if not os.path.exists(folder_location): print('no folder exist. create one now') os.makedirs(folder_location) print(f'Main->{file_location}') else: status='Error Filename format' {"info": f"file '{uploaded_file.filename}' is not in the correct filename pattern. example:'TCO-DNxx-yymm-xxxxx.xls','TCO-DNKR-2308-08888.xls' please upload the correct file", "status": status} with open(file_location, "wb+") as file_object: shutil.copyfileobj(uploaded_file.file, file_object) status, retdict = pdfexcel4DNwithxlrd.read_DN_excel(file_location) if status == False: msg = f'Error. Cannot Find Item FullName: {retdict}' status = 'Error' return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'", "status": status, "msg": msg} ini=PurchaseOrderQuery(DN=retdict) bol_prepareItemReceiptOk, notinthelist = ini.prepareItemReceipt() print(f'create_upload_file->added DNQty:{ini.PurchaseOrderList}') ret = ini.create_itemreceiptadd_QBXML() # print(ret) if bol_prepareItemReceiptOk: ret = ini.connect_to_quickbooks(ret) # print(ret) status, status_msg = ini.status_ok(ret) # print(f"create_upload_file->replyfrom qbxml:{ret}") if status: print("OK", status_msg) status = "OK" else: print("ERROR", status_msg) print(f"ret:{ret}", type(ret)) status = "ERROR" msg = f"ERROR. replyfromqbxml:{ret}" msg = f"{status}. replyfrom qbxml:{ret}" else: print("create_upload_file->not all DN are added") # ic(notinthelist) # errmsg = ic(notinthelist) # errmsg = pprint.pprint(notinthelist) # errmsg = str(errmsg) # print(errmsg, type(errmsg)) errmsg = json.dumps(notinthelist, indent=2, sort_keys=True) errmsg = json.loads(errmsg) # print(errmsg, type(errmsg)) print(type(errmsg)) # msg = f"Error. not all DN are added. \n these items not added in the list :\n{notinthelist}" msg = f"Error. not all DN are added. \n these items not added in the list :\n{errmsg}" status = 'Error' msgdict={} msgdict["info"]=f'file {uploaded_file.filename} saved at {file_location}' msgdict["status"]=status msgdict["msg"]=errmsg print(msgdict, type(msg)) # jsonmsg = json.dumps(msgdict) # jsonmsg = json.loads(jsonmsg) return msgdict return [{"info": f"file '{uploaded_file.filename}' saved at '{file_location}'", "status": status, "msg": msg}] return {"info": f"file '{uploaded_file.filename}' is not Excel(.xls) file or Filename is not start with 'TCO-DN'. Please upload the correct file", "status": status} @app.post('/getopenso') async def getopenso(request: Request, CustomerName: str = Body(...)): print("getopenso") # CustomerName = await request.body() CustomerName = json.loads(CustomerName) print(f'CustomerName: {CustomerName}', type(CustomerName)) FullName = CustomerName['CustomerName'] print(FullName) ini=readSO.SalesOrderQuery(FullName= FullName, IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount']) print("selesai salesorder") open_sales_orders = ini.get_open_so() print(f'return opensalesorder:{open_sales_orders}') return open_sales_orders @app.post('/getopensodetail') async def getopensodetail(request: Request, open_sales_orders = Body(...)): # open_sales_orders = await request.body() print(f'opensalesorders: {open_sales_orders}', type(open_sales_orders)) open_sales_orders=json.loads(open_sales_orders) open_sales_orders=open_sales_orders['solist'] print(f'openSOstr:{open_sales_orders}', type(open_sales_orders)) open_sales_orders=json.loads(open_sales_orders) print(f'openSOlist:{open_sales_orders}', type(open_sales_orders)) ini=readSO.SalesOrderQuery(FullName= 'abadi serpong', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount']) open_sales_orders_detail = ini.get_open_sales_order(open_sales_orders) open_sales_orders_detail = json.dumps(open_sales_orders_detail) print(f'opensalesordersdetail:{open_sales_orders_detail}', type (open_sales_orders_detail)) return open_sales_orders_detail @app.post('/get_stock') async def get_stock(request: Request): getstockdict = await request.body() print("") print(f'{datetime.datetime.now()} get_stock start -> {type(getstockdict)}, {getstockdict}') try: getstockdict = json.loads(getstockdict) except: print('error get_stock()') return {'message':'error getting stock'} print(f'get_stock -> {type(getstockdict)}, {getstockdict}') responseRt = searchStockQB(searchitems=getstockdict) print(f'{datetime.datetime.now()} get_stock finish -> {type(responseRt)}, {responseRt}') print("") return responseRt @app.post('/dasa2/get_generalsalesreport') async def get_generalsalesreport(request: Request): getdict = await request.body() print("") print(f'{datetime.datetime.now()} get_gsr start -> {type(getdict)}, {getdict}') try: getdict = json.loads(getdict) except: print('error get_gsr()') return {'message':'error getting GeneralSalesReport'} print(f'get_gsr 1-> {type(getdict)}, {getdict}') ReportDateMacro = getdict['ReportDateMacro'] if 'ReportDateMacro' in getdict else None FromReportDate = getdict['FromReportDate'] if 'FromReportDate' in getdict else None ToReportDate = getdict['ToReportDate'] if 'ToReportDate' in getdict else None responseRt = GSRQ(GeneralSummaryReportType='SalesByCustomerSummary', ReportDateMacro=ReportDateMacro, FromReportDate=FromReportDate, ToReportDate=ToReportDate) # print(f'get_gsr 2-> {type(responseRt)}, {responseRt}') datas1=responseRt.get_datarow() # print(f'get_gsr 3-> {type(datas1)}, {datas1}') print(f'{datetime.datetime.now()} get_gsr 4 finish -> {type(getdict)}, {getdict}') print("") return datas1[0], responseRt.get_total(), datas1[1] @app.post('/dasa2/salesorder') async def salesorderadd(request: Request): salesorderdict = await request.body() print("") try: salesorderdict = json.loads(salesorderdict) except: print('error salesorderadd()') return {'message':'error getting SalesOrder Data. it is not json'} print(f'{type(salesorderdict)}, {salesorderdict = }') so = SalesOrderAdd(**salesorderdict) print(so.all()) return so.all()