mirror of
https://github.com/bcomsugi/Quickbooks-API.git
synced 2026-01-10 02:02:38 +07:00
321 lines
17 KiB
Python
321 lines
17 KiB
Python
from server import baseQBQuery, timing
|
|
import pprint
|
|
import timeit
|
|
import xml.dom.minidom
|
|
from utils import timing, cleanIncludeRetElements
|
|
|
|
|
|
# from functools import wraps
|
|
|
|
# def cleanIncludeRetElements(includeRetElements_allowed:list, includeRetElements:list):
|
|
# iREs = []
|
|
# # print(f'{includeRetElements_allowed = }\n{includeRetElements = }')
|
|
# if isinstance(includeRetElements, str): #if user put 1 str argument in IncludeRetElements, change it to list
|
|
# includeRetElements = [includeRetElements]
|
|
# for iRE in includeRetElements:
|
|
# for iRE_a in includeRetElements_allowed:
|
|
# if iRE.lower() == iRE_a.lower():
|
|
# iREs.append(iRE_a)
|
|
# break
|
|
# return iREs
|
|
|
|
|
|
class ItemInventoryQuery(baseQBQuery):
|
|
def __init__(self, *args, **kwargs):
|
|
print(f'{args = }')
|
|
print(f'{kwargs = }')
|
|
super().__init__(*args, **kwargs)
|
|
self.includeRetElements_allowed = ["ListID", "FullName", "TimeCreated", "TimeModified", "EditSequence", "Name", "IsActive", "ClassRef", "ParentRef", "Sublevel", "BarCodeValue",
|
|
"ManufacturerPartNumber", "UnitOfMeasureSetRef", "IsTaxIncluded", "SalesTaxCodeRef", "SalesDesc,", "SalesPrice", "IncomeAccountRef",
|
|
"PurchaseDesc", "PurchaseCost", "PurchaseTaxCodeRef", "COGSAccountRef", "PrefVendorRef", "AssetAccountRef", "ReforderPoint", "Max", "QuantityOnHand",
|
|
"AcerageCost", "QuantityOnOrder", "QuantityOnSalesOrder",
|
|
"ExternalGUID", "DataExtRet",
|
|
]
|
|
self.cleanIncludeRetElements = cleanIncludeRetElements
|
|
self.onError = "stopOnError"
|
|
self.retName = 'ItemInventoryRet'
|
|
self.defaultFilterKey = "ListID"
|
|
if 'debug' in kwargs and isinstance(kwargs['debug'], bool):
|
|
self.class_debug=kwargs["debug"]
|
|
|
|
self.QBDict[self.__class__.__name__ + "Rq"]={}
|
|
if 'ListID' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ListID"]=kwargs['ListID']
|
|
elif 'FullName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FullName"]=kwargs['FullName']
|
|
else:
|
|
if 'MaxReturned' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["MaxReturned"]=kwargs['MaxReturned']
|
|
if 'ActiveStatus' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ActiveStatus"]=kwargs['ActiveStatus']
|
|
if 'FromModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FromModifiedDate"]=kwargs['FromModifiedDate']
|
|
if 'ToModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ToModifiedDate"]=kwargs['ToModifiedDate']
|
|
if 'MatchCriterion' in kwargs and 'Name' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameFilter"]={'MatchCriterion':kwargs['MatchCriterion', 'Name':kwargs['Name']]}
|
|
if 'FromName' in kwargs or 'ToName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameRangeFilter"]={'FromName':kwargs.get('FromName', ""), 'ToName':kwargs.get('ToName', "")}
|
|
if 'IncludeRetElement' in kwargs:
|
|
IRE = self.cleanIncludeRetElements(self.includeRetElements_allowed, kwargs["IncludeRetElement"])
|
|
# self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=kwargs['IncludeRetElement']
|
|
print(f"{IRE = }")
|
|
if len(IRE)>0:
|
|
if self.defaultFilterKey not in IRE:
|
|
IRE.append(self.defaultFilterKey)
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=IRE
|
|
if 'OwnerID' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["OwnerID"]=kwargs['OwnerID']
|
|
|
|
# print(self.__class__.__name__ + "Rq")
|
|
# print(self.QBDict)
|
|
self.runCheck() ### running the qbxml connection to get data ###
|
|
|
|
class GeneralSummaryReportQuery(baseQBQuery):
|
|
def __init__(self, *args, **kwargs):
|
|
print(f'{args = }')
|
|
print(f'{kwargs = }')
|
|
super().__init__( )
|
|
self.includeRetElements_allowed = ["ReportTitle", "ReportSubtitle", "ReportBasis", "NumRows", "NumColumns", "NumColTitleRows", "ColDesc", "ReportData", ]
|
|
self.cleanIncludeRetElements = cleanIncludeRetElements
|
|
self.retName = 'ReportRet'
|
|
self.QBDict[self.__class__.__name__ + "Rq"]={}
|
|
if 'debug' in kwargs and isinstance(kwargs['debug'], bool):
|
|
self.class_debug=kwargs["debug"]
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["GeneralSummaryReportType"]="InventoryStockStatusByItem"
|
|
if 'GeneralSummaryReportType' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["GeneralSummaryReportType"]=kwargs['GeneralSummaryReportType']
|
|
else:
|
|
return None
|
|
if 'FromReportDate' in kwargs or 'ToReportDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ReportPeriod"]={'FromReportDate':kwargs.get('FromReportDate', ""), 'ToReportDate':kwargs.get('ToReportDate', "")}
|
|
elif 'ReportDateMacro' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ReportDateMacro"]=kwargs.get('ReportDateMacro', "")
|
|
|
|
elif 'FullName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FullName"]=kwargs['FullName']
|
|
else:
|
|
if 'MaxReturned' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["MaxReturned"]=kwargs['MaxReturned']
|
|
if 'ActiveStatus' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ActiveStatus"]=kwargs['ActiveStatus']
|
|
if 'FromModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FromModifiedDate"]=kwargs['FromModifiedDate']
|
|
if 'ToModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ToModifiedDate"]=kwargs['ToModifiedDate']
|
|
if 'MatchCriterion' in kwargs and 'Name' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameFilter"]={'MatchCriterion':kwargs['MatchCriterion', 'Name':kwargs['Name']]}
|
|
if 'FromName' in kwargs or 'ToName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameRangeFilter"]={'FromName':kwargs.get('FromName', ""), 'ToName':kwargs.get('ToName', "")}
|
|
if 'IncludeRetElement' in kwargs:
|
|
IRE = self.cleanIncludeRetElements(self.includeRetElements_allowed, kwargs["IncludeRetElement"])
|
|
print(f"{IRE = }")
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=IRE
|
|
# self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=kwargs['IncludeRetElement']
|
|
if 'OwnerID' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["OwnerID"]=kwargs['OwnerID']
|
|
|
|
print(self.__class__.__name__ + "Rq")
|
|
print(f'{self.QBDict = }')
|
|
self.runCheck() ### running the qbxml connection to get data ###
|
|
|
|
class InvoiceAdd(baseQBQuery):
|
|
def __init__(self, *args, **kwargs):
|
|
print(f'{args = }')
|
|
print(f'{kwargs = }')
|
|
super().__init__(*args, **kwargs)
|
|
self.retName = 'InvoiceAddRet'
|
|
self.reqSubName = self.retName.replace('Ret', '')
|
|
self.QBDict[self.__class__.__name__ + "Rq"]={}
|
|
if 'CustomerFullName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ListID"]=kwargs['ListID']
|
|
if 'ListID' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ListID"]=kwargs['ListID']
|
|
elif 'FullName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FullName"]=kwargs['FullName']
|
|
else:
|
|
if 'MaxReturned' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["MaxReturned"]=kwargs['MaxReturned']
|
|
if 'ActiveStatus' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ActiveStatus"]=kwargs['ActiveStatus']
|
|
if 'FromModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FromModifiedDate"]=kwargs['FromModifiedDate']
|
|
if 'ToModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ToModifiedDate"]=kwargs['ToModifiedDate']
|
|
if 'MatchCriterion' in kwargs and 'Name' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameFilter"]={'MatchCriterion':kwargs['MatchCriterion', 'Name':kwargs['Name']]}
|
|
if 'FromName' in kwargs or 'ToName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameRangeFilter"]={'FromName':kwargs.get('FromName', ""), 'ToName':kwargs.get('ToName', "")}
|
|
if 'IncludeRetElement' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=kwargs['IncludeRetElement']
|
|
if 'OwnerID' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["OwnerID"]=kwargs['OwnerID']
|
|
|
|
# print(self.__class__.__name__ + "Rq")
|
|
# print(self.QBDict)
|
|
|
|
|
|
|
|
class CustomerQuery(baseQBQuery):
|
|
def __init__(self, *args, **kwargs):
|
|
print(f'{args = }')
|
|
print(f'{kwargs = }')
|
|
super().__init__(*args, **kwargs)
|
|
self.includeRetElements_allowed = ["FullName", "TimeCreated", "TimeModified", "EditSequence", "Name", "IsActive", "ClassRef", "ParentRef", "Sublevel", "CompanyName", "Salutation",
|
|
"FirstName", "MiddleName", "LastName", "JobTitle", "BillAddress", "BillAddressBlock", "ShipAddress", "ShipAddressBlock", "ShipToAddress", "Phone",
|
|
"AltPhone", "Fax", "Email", "Cc", "Contact", "AltContact", "AdditionalContactRef", "ContactsRet", "CustomerTypeRef", "TermsRef", "SalesRepRef",
|
|
"Balance", "TotalBalance", "SalesTaxCodeRef", "ItemSalesTaxRef", "SalesTaxCountry", "ResaleNumber", "AccountNumber", "CreditLimit",
|
|
"PreferredPaymentMethodRef", "CreditCardInfo", "JobStatus", "JobStartDate", "JobProjectedEndDate", "JobEndDate", "JobDesc", "JobTypeRef", "Notes",
|
|
"AdditionalNotesRet", "PreferredDeliveryMethod", "PriceLevelRef", "ExternalGUID", "TaxRegistrationNumber", "CurrencyRef", "DataExtRet"]
|
|
self.cleanIncludeRetElements = cleanIncludeRetElements
|
|
self.onError = "stopOnError"
|
|
self.retName = 'CustomerRet'
|
|
if 'debug' in kwargs and isinstance(kwargs['debug'], bool):
|
|
self.class_debug=kwargs["debug"]
|
|
|
|
self.QBDict[self.__class__.__name__ + "Rq"]={}
|
|
|
|
if 'ListID' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ListID"]=kwargs['ListID']
|
|
elif 'FullName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FullName"]=kwargs['FullName']
|
|
else:
|
|
if 'MaxReturned' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["MaxReturned"]=kwargs['MaxReturned']
|
|
if 'ActiveStatus' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ActiveStatus"]=kwargs['ActiveStatus']
|
|
if 'FromModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["FromModifiedDate"]=kwargs['FromModifiedDate']
|
|
if 'ToModifiedDate' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["ToModifiedDate"]=kwargs['ToModifiedDate']
|
|
if 'MatchCriterion' in kwargs and 'Name' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameFilter"]={'MatchCriterion':kwargs['MatchCriterion'], 'Name':kwargs['Name']}
|
|
elif 'FromName' in kwargs or 'ToName' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["NameRangeFilter"]={'FromName':kwargs.get('FromName', ""), 'ToName':kwargs.get('ToName', "")}
|
|
|
|
if 'Operator' in kwargs and 'Amount' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["TotalBalanceFilter"]={'Operator':kwargs['Operator'], 'Amount':kwargs['Amount']}
|
|
if 'IncludeRetElement' in kwargs:
|
|
IRE = cleanIncludeRetElements(self.includeRetElements_allowed, kwargs['IncludeRetElement'])
|
|
print("after IRE")
|
|
print(f'{IRE = }')
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=IRE
|
|
if 'OwnerID' in kwargs:
|
|
self.QBDict[self.__class__.__name__ + "Rq"]["OwnerID"]=kwargs['OwnerID']
|
|
|
|
print(self.__class__.__name__ + "Rq")
|
|
print(f'{self.QBDict = }' )
|
|
# print(f'{self.includeRetElements_allowed =}')
|
|
self.runCheck() ### running the qbxml connection to get data ###
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
@timing
|
|
def main():
|
|
g= GeneralSummaryReportQuery(GeneralSummaryReportType="ProfitAndLossStandard", ReportDateMacro="ThisYear")
|
|
print(g, type(g))
|
|
print(type(g.all()))
|
|
print(g.all())
|
|
pprint.pprint(g.filter("ColData").all())
|
|
|
|
@timing
|
|
def iteminventoryquery():
|
|
# g=ItemInventoryQuery()
|
|
g=ItemInventoryQuery(debug = True, MaxReturned = 3, IncludeRetElement=["FullName", "DataExtRet", "Name", "QuantityOnHand", "QuantityOnSalesOrder", "QuantityOnOrder", ]) #put OwnerID=0 to get DataExtRet
|
|
# print("before g.all")
|
|
# print(f'{g.all() = }')
|
|
# print("after g.all")
|
|
# print("")
|
|
# pprint.pprint(g.filter(["FullName", "Name", "sublevel"]).all())
|
|
# print(f'{g.filter("fullname") = }')
|
|
|
|
# print(f'{g = }')
|
|
# pprint.pprint(g.response_string,indent=4 )
|
|
print(g.response_string, type(g.response_string))
|
|
print("before")
|
|
# print(g.filter([ "Name", "QuantityOnHand", "QUantityOnSalesOrder", "QuantityonOrder"]).first())
|
|
c = g.filter([ "Name", "QuantityOnHand", "QUantityOnSalesOrder", "QuantityonOrder"]).first()
|
|
print("c:",type(c), c)
|
|
# pprint.pprint(g.returnRet())
|
|
# print(g.filter("name").firstValue())
|
|
print(g.count())
|
|
print(g.statusOk)
|
|
print(g.filter(["name", "quantityonhand"]).getValuesOf("quantityonhand"))
|
|
|
|
|
|
# print(g.filter(["DataExtRet"]).all())
|
|
|
|
@timing
|
|
def customerquery():
|
|
g= CustomerQuery(MaxReturned=3, IncludeRetElement=["fullname", "name", "CompanyName", "BillAddressBlock", "ShipAddressBlock"])
|
|
# g= CustomerQuery(MaxReturned=20, ActiveStatus="ActiveOnly", MatchCriterion="StartsWith", Name="to", IncludeRetElement=["fullname", "name", "billaddressblock", "currencyfilter"])
|
|
# print(g.IncludeRetElements_allowed)
|
|
print("init finish")
|
|
# print(f'{type(g.all()) = }')
|
|
print("before g.all")
|
|
print(f'{g.all() = }')
|
|
print("after g.all")
|
|
pprint.pprint(f'{g.filter(["FullName", "Name"]).all() = }')
|
|
print(f'{g.filter() = }')
|
|
# pprint.pprint(g.filter(["FullName", "abc", "BillAddressBlock"]).all())
|
|
print(f'{g.filter(["FullName", "abc", "BillAddressBlock"]).all() = }')
|
|
print("")
|
|
print(f'{g.filter([ "Addr1", "Addr2", "Addr3", "Addr4", "Addr5"]).all() = }')
|
|
print(f'{g.filter([ "Addr1", "Addr2", "Addr3", "Addr4", "Addr5"]) = }')
|
|
print(f'{g.filter(["fullname", "name"]).lastValue() = }')
|
|
|
|
def readxmltodict():
|
|
import xmltodict
|
|
filename="ItemInventoryQuery.xml"
|
|
filename="InvoiceAdd.xml"
|
|
with open(filename, "r") as f:
|
|
xml = f.read()
|
|
print(xml)
|
|
print("")
|
|
print("")
|
|
with open(filename, "r") as f:
|
|
xmllines = f.readlines()
|
|
print(xmllines)
|
|
varDict = xmltodict.parse(xml)
|
|
# print(f"{varDict = }")
|
|
print()
|
|
f = open(filename)
|
|
enumDict ={}
|
|
def recursiveDict(varDict, f, enumDict):
|
|
# print(varDict)
|
|
for dKey in varDict:
|
|
print(dKey)
|
|
if not isinstance(varDict[dKey], (list, dict)):
|
|
if dKey[0]=='@' or dKey[0]=="#":
|
|
continue
|
|
_ = f.readline()
|
|
dKey = dKey.replace("#", "")
|
|
print(dKey)
|
|
# print(f'{dKey = }, {varDict[dKey]}, {_}')
|
|
_strOptional = ""
|
|
while not dKey in _:
|
|
_ = f.readline()
|
|
# print(f'{dKey = }, {varDict[dKey]}, {_}')
|
|
if "values:" in _:
|
|
enumDict[dKey]=_.split(":")[-1].replace("[DEFAULT]","").replace("-->","").replace(" ","").replace("\n","").split(",")
|
|
_ = f.readline()
|
|
_strOptional = _.split("--")[1].strip()
|
|
varDict[dKey]+=";"+_strOptional
|
|
print(f'{varDict[dKey] = }')
|
|
elif isinstance(varDict[dKey], dict):
|
|
varDict[dKey], enumDict=recursiveDict(varDict[dKey], f, enumDict)##### istirahat dulu ah
|
|
return varDict, enumDict
|
|
print(f'{varDict = }')
|
|
print()
|
|
print(recursiveDict(varDict, f, enumDict))
|
|
f.close
|
|
|
|
# main()
|
|
iteminventoryquery()
|
|
# customerquery()
|
|
# readxmltodict()
|
|
|
|
|