mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-08 18:42:37 +07:00
394 lines
16 KiB
Python
394 lines
16 KiB
Python
from fastapi.responses import HTMLResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi import FastAPI, Request, UploadFile, Body
|
|
from typing import Union
|
|
# from icecream import install
|
|
# import icecream
|
|
# install()
|
|
# from icecream import ic
|
|
# ic.configureOutput(includeContext=True, )
|
|
import json
|
|
from iteminventorydasa import QBStock
|
|
import datetime
|
|
from pyQBstockserverV1 import searchStockQB
|
|
from qbgeneralsummaryreport import GeneralSummaryReportQuery as GSRQ
|
|
import shutil
|
|
from qbpurchaseorderquery import PurchaseOrderQuery
|
|
import pdfexcel4DNwithxlrd
|
|
from ItemInventoryQuery import ItemInventoryQuery
|
|
from SO_to_Inv import readSO, CustomerQuery
|
|
import os
|
|
from QBClass.QBClasses import SalesOrderAdd, InventoryStockStatusByVendor, PriceLevelQuery, TransactionQuery
|
|
from QBClass.QBClasses import CustomerQuery as CQ
|
|
from QBClass.QBClasses import ItemInventoryQuery as IIQ
|
|
from QBClass.QBClasses import GeneralSummaryReportQuery as GSRQuery
|
|
|
|
import pprint
|
|
|
|
# app = FastAPI()
|
|
app = FastAPI(docs_url="/itemreceipt", redoc_url=None)
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
@app.get('/', response_class=HTMLResponse)
|
|
async def hello(request:Request, name:str=None):
|
|
return templates.TemplateResponse("hello.html", {"request":request, "name":name})
|
|
# return "Hello Word"
|
|
|
|
@app.get('/hello/{name}', response_class=HTMLResponse)
|
|
async def hello(request:Request, name:str=None):
|
|
return templates.TemplateResponse("hello.html", {"request":request, "name":name})
|
|
|
|
@app.get('/podistri/', response_class=HTMLResponse)
|
|
async def hello(request:Request, item_name:str=None, cust_id:str=None, cust_name:str=None, start_date:str=None):
|
|
print(request.query_params)
|
|
q_dict = {'item_name':item_name, 'cust_id':cust_id, 'cust_name':cust_name, 'start_date':start_date}
|
|
if not item_name:
|
|
return templates.TemplateResponse("po_index.html", {"request":request, "name":item_name})
|
|
|
|
else:
|
|
return templates.TemplateResponse("po_index.html", {"request":request, "name":item_name})
|
|
|
|
|
|
@app.get('/renew-iteminventory')
|
|
async def renew_iteminventory():
|
|
filename = "ItemInventory\ItemInventory_FromQB.xlsx"
|
|
item_inv = ItemInventoryQuery( IncludeRetElement=['FullName', 'DataExtRet'], MaxReturned=None) #MaxReturned=None means get ALL items
|
|
QBXML = item_inv.create_QBXML()
|
|
status, msg = item_inv.to_excel(filename)
|
|
if status:
|
|
return {'Info': f"Saved to {filename}.", 'Status':'DONE', 'msg':f"{msg}"}
|
|
else:
|
|
return {'Info': f"NOT Saved", 'Status':'ERROR', 'msg':msg}
|
|
|
|
|
|
@app.get('/get-iteminventory-fullname')
|
|
async def get_iteminventory_fullname(maxreturned:Union[int, None] = None):
|
|
item_inv = ItemInventoryQuery( IncludeRetElement=['FullName','UnitOfMeasureSetRef'], MaxReturned=maxreturned) #MaxReturned=None means get ALL items
|
|
# item_inv = ItemInventoryQuery( IncludeRetElement=['FullName',], MaxReturned=maxreturned) #MaxReturned=None means get ALL items
|
|
QBXML = item_inv.create_QBXML()
|
|
itu = item_inv.connect_to_quickbooks(item_inv.create_QBXML())
|
|
status, data = item_inv.to_json()
|
|
if status:
|
|
return data
|
|
else:
|
|
return {'Info': f"NOT Saved", 'Status':'ERROR', 'msg':data}
|
|
|
|
@app.get('/get-iteminventory')
|
|
# async def get_customer_fullname(MaxReturned:Union[int, None] = None, request:Request):
|
|
async def get_iteminventory(request:Request):
|
|
print(f'{request.query_params = }')
|
|
params = await request.json()
|
|
print(f'{params = }')
|
|
# print(type(params), params)
|
|
# params = request.query_params._dict
|
|
print(type(params), f'{params = }')
|
|
# iteminventory = CustomerQuery.CustomerQuery(**params)
|
|
iteminventory = IIQ(**params, debug=False)
|
|
# print(iteminventory.all())
|
|
# status, data = iteminventory.to_json()
|
|
# print(f'{MaxReturned = }')
|
|
# print(f'{request.query_params = }')
|
|
if len(iteminventory.all())>0:
|
|
return iteminventory.all()
|
|
else:
|
|
return {'Info': f"CANNOT Get", 'Status':'ERROR', 'msg':f'Cannot Get Customer with params: {params}'}
|
|
|
|
@app.get('/get-customer-fullname')
|
|
# async def get_customer_fullname(MaxReturned:Union[int, None] = None, request:Request):
|
|
async def get_customer_fullname(request:Request):
|
|
# print(f'{MaxReturned = }')
|
|
print(f'{request = }')
|
|
print(f'{request.query_params = }')
|
|
MaxReturned = request.query_params.get('MaxReturned')
|
|
# item_inv = CustomerQuery( IncludeRetElement=['FullName'], MaxReturned=maxreturned) #MaxReturned=None means get ALL items
|
|
customer = CustomerQuery.CustomerQuery(MaxReturned=MaxReturned)
|
|
# QBXML = customer.create_QBXML()
|
|
itu = customer.create_customerquery_QBXML()
|
|
# print(itu)
|
|
status, data = customer.to_json()
|
|
print(f'{MaxReturned = }')
|
|
print(f'{request.query_params = }')
|
|
if status:
|
|
return data
|
|
else:
|
|
return {'Info': f"NOT Saved", 'Status':'ERROR', 'msg':data}
|
|
|
|
|
|
@app.get('/get-customer')
|
|
# async def get_customer_fullname(MaxReturned:Union[int, None] = None, request:Request):
|
|
async def get_customer(request:Request):
|
|
print(f'{request.query_params = }')
|
|
params = await request.json()
|
|
print(f'{params = }')
|
|
# print(type(params), params)
|
|
# params = request.query_params._dict
|
|
print(type(params), f'{params = }')
|
|
# customer = CustomerQuery.CustomerQuery(**params)
|
|
customer = CQ(**params)
|
|
# print(customer.all())
|
|
# status, data = customer.to_json()
|
|
# print(f'{MaxReturned = }')
|
|
# print(f'{request.query_params = }')
|
|
if len(customer.all())>0:
|
|
return customer.all()
|
|
else:
|
|
return {'Info': f"CANNOT Get", 'Status':'ERROR', 'msg':f'Cannot Get Customer with params: {params}'}
|
|
|
|
|
|
@app.get('/get-pricelevel')
|
|
async def get_pricelevel(name:Union[str, None] = None):
|
|
print(f'{name = }')
|
|
# item_inv = CustomerQuery( IncludeRetElement=['FullName'], MaxReturned=maxreturned) #MaxReturned=None means get ALL items
|
|
pricelevel = PriceLevelQuery( NameFilter_MatchCriterion='Contains', NameFilter_Name=name, )
|
|
# print(pricelevel.all())
|
|
print(f'{len(pricelevel.all()) = }')
|
|
# status, data = customer.to_json()
|
|
if len(pricelevel.all())>0:
|
|
return pricelevel.all()#len(pricelevel.all()) #pricelevel.all()
|
|
else:
|
|
return {'Info': f"CANNOT Get PriceLevel", 'Status':'ERROR', 'msg':f'Cannot get Pricelevel with {name}'}
|
|
|
|
|
|
@app.get('/get-inventorystockstatusbyvendor')
|
|
async def get_inventorystockstatusbyvendor():
|
|
items = InventoryStockStatusByVendor()
|
|
|
|
if items:
|
|
# data = json.dumps(items)
|
|
return items
|
|
else:
|
|
return {'Info': f"CANNOT Get InventoryStockStatusByVendor TACO", 'Status':'ERROR', 'msg':items}
|
|
|
|
@app.post("/upload-file/")
|
|
async def create_upload_file(uploaded_file: UploadFile):
|
|
base_file_location = f"DN_Excel_files"
|
|
status = "ERROR"
|
|
if uploaded_file.filename.endswith(".xls") and uploaded_file.filename.startswith("TCO-DN"):
|
|
folder_yearmonth = uploaded_file.filename.split('-')
|
|
if len(folder_yearmonth)>3:
|
|
folder_yearmonth = folder_yearmonth[2].strip()
|
|
folder_location = f"{base_file_location}/{folder_yearmonth}"
|
|
file_location = f"{base_file_location}/{folder_yearmonth}/{uploaded_file.filename}"
|
|
if not os.path.exists(folder_location):
|
|
print('no folder exist. create one now')
|
|
os.makedirs(folder_location)
|
|
print(f'Main->{file_location}')
|
|
else:
|
|
status='Error Filename format'
|
|
{"info": f"file '{uploaded_file.filename}' is not in the correct filename pattern. example:'TCO-DNxx-yymm-xxxxx.xls','TCO-DNKR-2308-08888.xls' please upload the correct file", "status": status}
|
|
with open(file_location, "wb+") as file_object:
|
|
shutil.copyfileobj(uploaded_file.file, file_object)
|
|
status, retdict = pdfexcel4DNwithxlrd.read_DN_excel(file_location)
|
|
if status == False:
|
|
msg = f'Error. Cannot Find Item FullName: {retdict}'
|
|
status = 'Error'
|
|
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'", "status": status, "msg": msg}
|
|
ini=PurchaseOrderQuery(DN=retdict)
|
|
bol_prepareItemReceiptOk, notinthelist = ini.prepareItemReceipt()
|
|
print(f'create_upload_file->added DNQty:{ini.PurchaseOrderList}')
|
|
ret = ini.create_itemreceiptadd_QBXML()
|
|
# print(ret)
|
|
if bol_prepareItemReceiptOk:
|
|
ret = ini.connect_to_quickbooks(ret)
|
|
# print(ret)
|
|
status, status_msg = ini.status_ok(ret)
|
|
# print(f"create_upload_file->replyfrom qbxml:{ret}")
|
|
if status:
|
|
print("OK", status_msg)
|
|
status = "OK"
|
|
else:
|
|
print("ERROR", status_msg)
|
|
print(f"ret:{ret}", type(ret))
|
|
status = "ERROR"
|
|
msg = f"ERROR. replyfromqbxml:{ret}"
|
|
msg = f"{status}. replyfrom qbxml:{ret}"
|
|
else:
|
|
print("create_upload_file->not all DN are added")
|
|
# ic(notinthelist)
|
|
# errmsg = ic(notinthelist)
|
|
# errmsg = pprint.pprint(notinthelist)
|
|
# errmsg = str(errmsg)
|
|
# print(errmsg, type(errmsg))
|
|
errmsg = json.dumps(notinthelist, indent=2, sort_keys=True)
|
|
errmsg = json.loads(errmsg)
|
|
|
|
# print(errmsg, type(errmsg))
|
|
print(type(errmsg))
|
|
# msg = f"Error. not all DN are added. \n these items not added in the list :\n{notinthelist}"
|
|
msg = f"Error. not all DN are added. \n these items not added in the list :\n{errmsg}"
|
|
|
|
status = 'Error'
|
|
msgdict={}
|
|
msgdict["info"]=f'file {uploaded_file.filename} saved at {file_location}'
|
|
msgdict["status"]=status
|
|
msgdict["msg"]=errmsg
|
|
print(msgdict, type(msg))
|
|
# jsonmsg = json.dumps(msgdict)
|
|
# jsonmsg = json.loads(jsonmsg)
|
|
|
|
return msgdict
|
|
|
|
|
|
return [{"info": f"file '{uploaded_file.filename}' saved at '{file_location}'", "status": status, "msg": msg}]
|
|
return {"info": f"file '{uploaded_file.filename}' is not Excel(.xls) file or Filename is not start with 'TCO-DN'. Please upload the correct file", "status": status}
|
|
|
|
@app.post('/getopenso')
|
|
async def getopenso(request: Request, CustomerName: str = Body(...)):
|
|
print("getopenso")
|
|
# CustomerName = await request.body()
|
|
CustomerName = json.loads(CustomerName)
|
|
print(f'CustomerName: {CustomerName}', type(CustomerName))
|
|
FullName = CustomerName['CustomerName']
|
|
print(FullName)
|
|
ini=readSO.SalesOrderQuery(FullName= FullName, IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount'])
|
|
print("selesai salesorder")
|
|
open_sales_orders = ini.get_open_so()
|
|
print(f'return opensalesorder:{open_sales_orders}')
|
|
return open_sales_orders
|
|
|
|
@app.post('/getopensodetail')
|
|
async def getopensodetail(request: Request, open_sales_orders = Body(...)):
|
|
# open_sales_orders = await request.body()
|
|
print(f'opensalesorders: {open_sales_orders}', type(open_sales_orders))
|
|
open_sales_orders=json.loads(open_sales_orders)
|
|
open_sales_orders=open_sales_orders['solist']
|
|
print(f'openSOstr:{open_sales_orders}', type(open_sales_orders))
|
|
open_sales_orders=json.loads(open_sales_orders)
|
|
print(f'openSOlist:{open_sales_orders}', type(open_sales_orders))
|
|
ini=readSO.SalesOrderQuery(FullName= 'abadi serpong', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount'])
|
|
open_sales_orders_detail = ini.get_open_sales_order(open_sales_orders)
|
|
open_sales_orders_detail = json.dumps(open_sales_orders_detail)
|
|
print(f'opensalesordersdetail:{open_sales_orders_detail}', type (open_sales_orders_detail))
|
|
return open_sales_orders_detail
|
|
|
|
@app.post('/get_stock')
|
|
async def get_stock(request: Request):
|
|
getstockdict = await request.body()
|
|
print("")
|
|
print(f'{datetime.datetime.now()} get_stock start -> {type(getstockdict)}, {getstockdict}')
|
|
try:
|
|
getstockdict = json.loads(getstockdict)
|
|
except:
|
|
print('error get_stock()')
|
|
return {'message':'error getting stock'}
|
|
print(f'get_stock -> {type(getstockdict)}, {getstockdict}')
|
|
responseRt = searchStockQB(searchitems=getstockdict)
|
|
print(f'{datetime.datetime.now()} get_stock finish -> {type(responseRt)}, {responseRt}')
|
|
print("")
|
|
return responseRt
|
|
|
|
@app.post('/get-iteminventory_report')
|
|
async def getiteminventory_report(request:Request):
|
|
print('masuk getiteminventoryreport')
|
|
params = await request.body()
|
|
print('getiteminventoryrepot')
|
|
try:
|
|
params = json.loads(params)
|
|
except:
|
|
print('error get-iteminventory_report')
|
|
return {'message':'error get-iteminventory_report'}
|
|
# print(f'{request.query_params = }')
|
|
# params = await request.json()
|
|
print(f'{params = }')
|
|
# print(type(params), params)
|
|
# params = request.query_params._dict
|
|
print(type(params), f'{params = }')
|
|
# iteminventory = CustomerQuery.CustomerQuery(**params)
|
|
iteminventory_report = GSRQuery(**params, debug=False)
|
|
# print(iteminventory.all())
|
|
# status, data = iteminventory.to_json()
|
|
# print(f'{MaxReturned = }')
|
|
# print(f'{request.query_params = }')
|
|
# print(iteminventory_report.filter('datarow').getItemInventory_Report())
|
|
ret = iteminventory_report.filter('DataRow').getItemInventory_Report()
|
|
# print(f'{ret = } {type(ret) = } {len(ret) = }')
|
|
if len(ret)>0:
|
|
return ret
|
|
else:
|
|
return {'Info': f"CANNOT Get", 'Status':'ERROR', 'msg':f'Cannot Get /getiteminventory_report with params: {params}'}
|
|
|
|
@app.post('/dasa2/get_generalsalesreport')
|
|
async def get_generalsalesreport(request: Request):
|
|
getdict = await request.body()
|
|
print("")
|
|
print(f'{datetime.datetime.now()} get_gsr start -> {type(getdict)}, {getdict}')
|
|
try:
|
|
getdict = json.loads(getdict)
|
|
except:
|
|
print('error get_gsr()')
|
|
if not getdict:
|
|
getdict = {'ReportDateMacro' : 'ThisYear'}
|
|
else:
|
|
return {'message':'error getting GeneralSalesReport'}
|
|
print(f'get_gsr 1-> {type(getdict)}, {getdict}')
|
|
ReportDateMacro = getdict['ReportDateMacro'] if 'ReportDateMacro' in getdict else None
|
|
FromReportDate = getdict['FromReportDate'] if 'FromReportDate' in getdict else None
|
|
ToReportDate = getdict['ToReportDate'] if 'ToReportDate' in getdict else None
|
|
|
|
responseRt = GSRQ(GeneralSummaryReportType='SalesByCustomerSummary', ReportDateMacro=ReportDateMacro, FromReportDate=FromReportDate, ToReportDate=ToReportDate)
|
|
# print(f'get_gsr 2-> {type(responseRt)}, {responseRt}')
|
|
datas1=responseRt.get_datarow()
|
|
print(f'get_gsr 3-> {type(datas1)}, {datas1}')
|
|
print(f'{datetime.datetime.now()} get_gsr 4 finish -> {type(getdict)}, {getdict}')
|
|
print("")
|
|
|
|
return datas1[0], responseRt.get_total(), datas1[1]
|
|
|
|
|
|
@app.post('/dasa2/salesorderadd')
|
|
async def salesorderadd(request: Request):
|
|
salesorderdict = await request.body()
|
|
print("")
|
|
try:
|
|
salesorderdict = json.loads(salesorderdict)
|
|
except:
|
|
print('error salesorderadd()')
|
|
return {'message':'error getting SalesOrder Data. it is not json'}
|
|
print(f'{type(salesorderdict)}, {salesorderdict = }')
|
|
|
|
so = SalesOrderAdd(**salesorderdict)
|
|
print(f'{len(so.to_json()) = }')
|
|
return so.all()
|
|
|
|
|
|
@app.post('/dasa2/transactionquery')
|
|
async def transactionquery(request: Request):
|
|
transactiondict = await request.body()
|
|
print("")
|
|
try:
|
|
transactiondict = json.loads(transactiondict)
|
|
except:
|
|
print('error TransactionQuery()')
|
|
return {'message':'error getting TransactionQuery Data. it is not json'}
|
|
print(f'{type(transactiondict)}, {transactiondict = }')
|
|
print(f'{transactiondict = }')
|
|
tq = TransactionQuery(**transactiondict)
|
|
print(tq.all())
|
|
print(f'{len(tq.to_json()) = }')
|
|
return tq.all()
|
|
|
|
|
|
@app.post('/dasa2/rawxml')
|
|
async def rawxml(request: Request):
|
|
raw = await request.body()
|
|
from sendItem import send_new_item
|
|
rawstr = raw.decode(encoding="utf-8")
|
|
# print(f'{raw = } {type(raw) = } {rawstr = } {type(rawstr) = }')
|
|
# print(f'{send_new_item(rawstr) = }')
|
|
result = send_new_item(rawstr)
|
|
# print(result, type(result))
|
|
return {'result': result}
|
|
# return {'msg':'ok'}
|
|
# return "ok world"
|
|
# # try:
|
|
# # transactiondict = json.loads(transactiondict)
|
|
# # except:
|
|
# # print('error TransactionQuery()')
|
|
# # return {'message':'error getting TransactionQuery Data. it is not json'}
|
|
# print(f'{type(transactiondict)}, {transactiondict = }')
|
|
# print(f'{transactiondict = }')
|
|
# tq = TransactionQuery(**transactiondict)
|
|
# print(tq.all())
|
|
# print(f'{len(tq.to_json()) = }')
|
|
# return tq.all()
|