add qbclasses.count()

This commit is contained in:
bcomsugi 2024-07-23 06:04:13 +07:00
parent e2121465e1
commit 0f769f33ce
2 changed files with 138 additions and 101 deletions

View File

@ -1,8 +1,20 @@
from server import baseQBQuery, timing
import pprint
import timeit
from time import time
def timing(f):
# @wraps(f)
def wrap(*args, **kw):
ts = time()
result = f(*args, **kw)
te = time()
print('func:%r args:[%r, %r] took: %2.6f sec' % \
(f.__name__, args, kw, te-ts))
return result
return wrap
# from functools import wraps
def cleanIncludeRetElements(includeRetElements_allowed:list, includeRetElements:list):
@ -23,14 +35,19 @@ class ItemInventoryQuery(baseQBQuery):
print(f'{args = }')
print(f'{kwargs = }')
super().__init__(*args, **kwargs)
self.includeRetElements_allowed = ["FullName", "TimeCreated", "TimeModified", "EditSequence", "Name", "IsActive", "ClassRef", "ParentRef", "Sublevel", "BarCodeValue",
self.includeRetElements_allowed = ["ListID", "FullName", "TimeCreated", "TimeModified", "EditSequence", "Name", "IsActive", "ClassRef", "ParentRef", "Sublevel", "BarCodeValue",
"ManufacturerPartNumber", "UnitOfMeasureSetRef", "IsTaxIncluded", "SalesTaxCodeRef", "SalesDesc,", "SalesPrice", "IncomeAccountRef",
"PurchaseDesc", "PurchaseCost", "PurchaseTaxCodeRef", "COGSAccountRef", "PrefVendorRef", "AssetAccountRef", "ReforderPoint", "Max", "QuantityOnHand",
"AcerageCost", "QuantityOnOrder", "QuantityOnSalesOrder",
"ExternalGUID", "DataExtRet",
]
self.cleanIncludeRetElements = cleanIncludeRetElements
self.onError = "stopOnError"
self.retName = 'ItemInventoryRet'
self.defaultFilterKey = "ListID"
if 'debug' in kwargs and isinstance(kwargs['debug'], bool):
self.class_debug=kwargs["debug"]
self.QBDict[self.__class__.__name__ + "Rq"]={}
if 'ListID' in kwargs:
self.QBDict[self.__class__.__name__ + "Rq"]["ListID"]=kwargs['ListID']
@ -53,6 +70,9 @@ class ItemInventoryQuery(baseQBQuery):
IRE = self.cleanIncludeRetElements(self.includeRetElements_allowed, kwargs["IncludeRetElement"])
# self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=kwargs['IncludeRetElement']
print(f"{IRE = }")
if len(IRE)>0:
if self.defaultFilterKey not in IRE:
IRE.append(self.defaultFilterKey)
self.QBDict[self.__class__.__name__ + "Rq"]["IncludeRetElement"]=IRE
if 'OwnerID' in kwargs:
self.QBDict[self.__class__.__name__ + "Rq"]["OwnerID"]=kwargs['OwnerID']
@ -69,7 +89,8 @@ class GeneralSummaryReportQuery(baseQBQuery):
self.cleanIncludeRetElements = cleanIncludeRetElements
self.retName = 'ReportRet'
self.QBDict[self.__class__.__name__ + "Rq"]={}
if 'debug' in kwargs and isinstance(kwargs['debug'], bool):
self.class_debug=kwargs["debug"]
self.QBDict[self.__class__.__name__ + "Rq"]["GeneralSummaryReportType"]="InventoryStockStatusByItem"
if 'GeneralSummaryReportType' in kwargs:
self.QBDict[self.__class__.__name__ + "Rq"]["GeneralSummaryReportType"]=kwargs['GeneralSummaryReportType']
@ -193,7 +214,11 @@ class CustomerQuery(baseQBQuery):
self.cleanIncludeRetElements = cleanIncludeRetElements
self.onError = "stopOnError"
self.retName = 'CustomerRet'
if 'debug' in kwargs and isinstance(kwargs['debug'], bool):
self.class_debug=kwargs["debug"]
self.QBDict[self.__class__.__name__ + "Rq"]={}
if 'ListID' in kwargs:
self.QBDict[self.__class__.__name__ + "Rq"]["ListID"]=kwargs['ListID']
elif 'FullName' in kwargs:
@ -227,97 +252,105 @@ class CustomerQuery(baseQBQuery):
@timing
def main():
g= GeneralSummaryReportQuery(GeneralSummaryReportType="ProfitAndLossStandard", ReportDateMacro="ThisYear")
print(g, type(g))
print(type(g.all()))
print(g.all())
pprint.pprint(g.filter("ColData").all())
def iteminventoryquery():
# g=ItemInventoryQuery(MaxReturned=2)
g=ItemInventoryQuery(MaxReturned = 3, IncludeRetElement=["DataExtRet", "Name"]) #put OwnerID=0 to get DataExtRet
# print("before g.all")
# print(f'{g.all() = }')
# print("after g.all")
# print("")
# pprint.pprint(g.filter(["FullName", "Name", "sublevel"]).all())
# print(f'{g.filter("fullname") = }')
print(f'{g = }')
print("before")
print(g.filter([ "Name", "QuantityOnHand", "QUantityOnSalesOrder", "QuantityonOrder"]).first())
pprint.pprint(g.returnRet())
# print(g.filter(["DataExtRet"]).all())
def customerquery():
g= CustomerQuery(MaxReturned=3, IncludeRetElement=["fullname", "name", "CompanyName", "BillAddressBlock", "ShipAddressBlock"])
# g= CustomerQuery(MaxReturned=20, ActiveStatus="ActiveOnly", MatchCriterion="StartsWith", Name="to", IncludeRetElement=["fullname", "name", "billaddressblock", "currencyfilter"])
# print(g.IncludeRetElements_allowed)
print("init finish")
# print(f'{type(g.all()) = }')
print("before g.all")
print(f'{g.all() = }')
print("after g.all")
pprint.pprint(f'{g.filter(["FullName", "Name"]).all() = }')
print(f'{g.filter() = }')
# pprint.pprint(g.filter(["FullName", "abc", "BillAddressBlock"]).all())
print(f'{g.filter(["FullName", "abc", "BillAddressBlock"]).all() = }')
print("")
print(f'{g.filter([ "Addr1", "Addr2", "Addr3", "Addr4", "Addr5"]).all() = }')
print(f'{g.filter([ "Addr1", "Addr2", "Addr3", "Addr4", "Addr5"]) = }')
print(f'{g.filter(["fullname", "name"]).lastValue() = }')
def readxmltodict():
import xmltodict
filename="ItemInventoryQuery.xml"
filename="InvoiceAdd.xml"
with open(filename, "r") as f:
xml = f.read()
print(xml)
print("")
print("")
with open(filename, "r") as f:
xmllines = f.readlines()
print(xmllines)
varDict = xmltodict.parse(xml)
# print(f"{varDict = }")
print()
f = open(filename)
enumDict ={}
def recursiveDict(varDict, f, enumDict):
# print(varDict)
for dKey in varDict:
print(dKey)
if not isinstance(varDict[dKey], (list, dict)):
if dKey[0]=='@' or dKey[0]=="#":
continue
_ = f.readline()
dKey = dKey.replace("#", "")
print(dKey)
# print(f'{dKey = }, {varDict[dKey]}, {_}')
_strOptional = ""
while not dKey in _:
_ = f.readline()
# print(f'{dKey = }, {varDict[dKey]}, {_}')
if "values:" in _:
enumDict[dKey]=_.split(":")[-1].replace("[DEFAULT]","").replace("-->","").replace(" ","").replace("\n","").split(",")
_ = f.readline()
_strOptional = _.split("--")[1].strip()
varDict[dKey]+=";"+_strOptional
print(f'{varDict[dKey] = }')
elif isinstance(varDict[dKey], dict):
varDict[dKey], enumDict=recursiveDict(varDict[dKey], f, enumDict)##### istirahat dulu ah
return varDict, enumDict
print(f'{varDict = }')
print()
print(recursiveDict(varDict, f, enumDict))
f.close
if __name__ == "__main__":
@timing
def main():
g= GeneralSummaryReportQuery(GeneralSummaryReportType="ProfitAndLossStandard", ReportDateMacro="ThisYear")
print(g, type(g))
print(type(g.all()))
print(g.all())
pprint.pprint(g.filter("ColData").all())
@timing
def iteminventoryquery():
# g=ItemInventoryQuery()
g=ItemInventoryQuery(debug = False, MaxReturned = None, IncludeRetElement=["FullName", "DataExtRet", "Name", "QuantityOnHand", "QuantityOnSalesOrder", "QuantityOnOrder", ]) #put OwnerID=0 to get DataExtRet
# print("before g.all")
# print(f'{g.all() = }')
# print("after g.all")
# print("")
# pprint.pprint(g.filter(["FullName", "Name", "sublevel"]).all())
# print(f'{g.filter("fullname") = }')
# print(f'{g = }')
print("before")
# print(g.filter([ "Name", "QuantityOnHand", "QUantityOnSalesOrder", "QuantityonOrder"]).first())
c = g.filter([ "Name", "QuantityOnHand", "QUantityOnSalesOrder", "QuantityonOrder"]).all()
print(c)
# pprint.pprint(g.returnRet())
# print(g.filter("name").firstValue())
print(g.count())
# print(g.filter(["DataExtRet"]).all())
@timing
def customerquery():
g= CustomerQuery(MaxReturned=3, IncludeRetElement=["fullname", "name", "CompanyName", "BillAddressBlock", "ShipAddressBlock"])
# g= CustomerQuery(MaxReturned=20, ActiveStatus="ActiveOnly", MatchCriterion="StartsWith", Name="to", IncludeRetElement=["fullname", "name", "billaddressblock", "currencyfilter"])
# print(g.IncludeRetElements_allowed)
print("init finish")
# print(f'{type(g.all()) = }')
print("before g.all")
print(f'{g.all() = }')
print("after g.all")
pprint.pprint(f'{g.filter(["FullName", "Name"]).all() = }')
print(f'{g.filter() = }')
# pprint.pprint(g.filter(["FullName", "abc", "BillAddressBlock"]).all())
print(f'{g.filter(["FullName", "abc", "BillAddressBlock"]).all() = }')
print("")
print(f'{g.filter([ "Addr1", "Addr2", "Addr3", "Addr4", "Addr5"]).all() = }')
print(f'{g.filter([ "Addr1", "Addr2", "Addr3", "Addr4", "Addr5"]) = }')
print(f'{g.filter(["fullname", "name"]).lastValue() = }')
def readxmltodict():
import xmltodict
filename="ItemInventoryQuery.xml"
filename="InvoiceAdd.xml"
with open(filename, "r") as f:
xml = f.read()
print(xml)
print("")
print("")
with open(filename, "r") as f:
xmllines = f.readlines()
print(xmllines)
varDict = xmltodict.parse(xml)
# print(f"{varDict = }")
print()
f = open(filename)
enumDict ={}
def recursiveDict(varDict, f, enumDict):
# print(varDict)
for dKey in varDict:
print(dKey)
if not isinstance(varDict[dKey], (list, dict)):
if dKey[0]=='@' or dKey[0]=="#":
continue
_ = f.readline()
dKey = dKey.replace("#", "")
print(dKey)
# print(f'{dKey = }, {varDict[dKey]}, {_}')
_strOptional = ""
while not dKey in _:
_ = f.readline()
# print(f'{dKey = }, {varDict[dKey]}, {_}')
if "values:" in _:
enumDict[dKey]=_.split(":")[-1].replace("[DEFAULT]","").replace("-->","").replace(" ","").replace("\n","").split(",")
_ = f.readline()
_strOptional = _.split("--")[1].strip()
varDict[dKey]+=";"+_strOptional
print(f'{varDict[dKey] = }')
elif isinstance(varDict[dKey], dict):
varDict[dKey], enumDict=recursiveDict(varDict[dKey], f, enumDict)##### istirahat dulu ah
return varDict, enumDict
print(f'{varDict = }')
print()
print(recursiveDict(varDict, f, enumDict))
f.close
# main()
iteminventoryquery()
# customerquery()

View File

@ -31,6 +31,8 @@ class baseQBQuery:
self.cleanIncludeRetElements = None
self.includeRetElements_allowed = None
self.retName = None
self.defaultFilterKey = None
self.class_debug = False
### end ### variable to be replace with other class init value
self.listOfDict = self.ListOfDict(None, self.varDict, self.retName)
self.statusCode = -1
@ -96,7 +98,8 @@ class baseQBQuery:
# QBXML = ET.fromstring(self.response_string)
# print(xmltodict.parse(self.response_string))
self.varDict = xmltodict.parse(self.response_string)
pprint.pprint(self.varDict)
if self.class_debug:
pprint.pprint(self.varDict)
self.listOfDict.varDict = self.varDict
self.listOfDict.filterKey = ["@statusCode"]
self.statusCode = self.listOfDict.firstValue().get('@statusCode',None)
@ -172,15 +175,15 @@ class baseQBQuery:
# print(f'{self.returnRet() = }')
return self.response_string
# def __str__(self) -> str:
# print("repr")
# print(self.returnRet())
# return "abc"
def count(self) -> int:
# objs = self.filter(self.defaultFilterKey).all()
# print(f"{objs = }", type(objs))
return len(self.filter(self.defaultFilterKey).all())
def filter(self, key=None):
print(f'{key = }')
print(f'{self.statusOk = }')
# print(f'{key = }')
# print(f'{self.statusOk = }')
if not self.runCheck():
print("not runcheck")
return self.ListOfDict(["abc"], self.varDict, self.retName, self.statusOk)
@ -194,7 +197,7 @@ class baseQBQuery:
else:
return []
key = self.cleanIncludeRetElements(self.includeRetElements_allowed, key)
print(f'f {key = }')
# print(f'f {key = }')
if key:
return self.ListOfDict(key, self.varDict, self.retName)
else:
@ -246,7 +249,7 @@ class baseQBQuery:
return _dct
def all(self, var:dict=None, dataRetList:list=None):
print(f'{self.statusOk = }')
# print(f'{self.statusOk = }')
if not self.statusOk:
return []
_lst = [x for x in self.find_listOfDict(var, dataRetList)]
@ -269,6 +272,7 @@ class baseQBQuery:
def firstValue(self, var:dict=None, dataRetList:list=None):
if not self.statusOk:
print("firstValue statusOk is False")
return []
# return self.first(var, dataRetList)[self.filterKey]
_val=self.first(var, dataRetList)
@ -277,7 +281,7 @@ class baseQBQuery:
# return _val[self.filterKey]
return _val
else:
return []
return []
def last(self, var:dict=None, dataRetList:list=None):
if not self.statusOk: