add exim for export import

This commit is contained in:
bcomsugi 2025-02-19 06:58:18 +07:00
parent a110b9ae99
commit 86ff995133
4 changed files with 1882 additions and 1742 deletions

0
Exim/__init__.py Normal file
View File

132
Exim/exim.py Normal file
View File

@ -0,0 +1,132 @@
# from . import QBClasses
from pprint import pprint
from QBClass.QBClasses import InvoiceQuery, SalesOrderQuery
# import timeit
import time
print('succes Loading modules')
def timer(func):
def wrapper(*args, **kwargs):
nonlocal total
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
total += duration
print(f"Execution time: {duration} Total: {total}")
return result
total = 0
return wrapper
@timer
def get_all_so_from_invoice():
# iq = InvoiceQuery(MaxReturned= 20, IncludeLinkedTxns='true', IncludeLineItems='true')
start = time.time()
print('Get Invoice Query List. Processing..... wait for at minute(1 month=90secs)')
iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate='2024-09-1', TxnDateRangeFilter_ToTxnDate='2024-09-31', IncludeLinkedTxns='true', IncludeLineItems='true')
# pprint(iq.all(), sort_dicts=False)
# print(iq.all())
print(f"Execution time InvoiceQuery: {time.time()-start} {len(iq.all()) = }")
so_list = []
iq_list = []
dup_so_list = []
for idx, txn in enumerate(iq.all()):
# iq_list.append(txn)
# print(f"{idx = } {txn['RefNumber'] = } {txn['TxnDate'] = } {txn['Subtotal'] = } ")
if 'LinkedTxn' in txn:
# pprint(txn['LinkedTxn'], sort_dicts=False)
if not isinstance(txn['LinkedTxn'], list): #if there is no receive payment and only 1 linked traction, need to change to a list. RECORD it
txn_linkedTxn = [txn['LinkedTxn']]
else:
txn_linkedTxn = txn['LinkedTxn']
for linkedtxn in txn_linkedTxn:
if linkedtxn['TxnType']=='SalesOrder':
if linkedtxn['RefNumber'] not in so_list:
so_list.append(linkedtxn['RefNumber'])
else:
dup_so_list.append(linkedtxn['RefNumber'])
print(f'{dup_so_list = }')
print()
so_dict = {}
print(f"Execution time before SO: {time.time()-start}")
print('Get Sales Order Query List. Processing..... wait for at minute(1 month=130 secs)')
so = SalesOrderQuery(RefNumber = so_list, IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
print(f"Execution time SalesOrderQuery: {time.time()-start}")
for idx, txn in enumerate(so.all()):
so_dict[txn['RefNumber']] = txn
# pprint(so.all(), sort_dicts=False)
res = next(iter(so_dict))
print(f'{so_dict[res] = }')
print(f'{len(iq.all()) = } {len(so.all()) = } {len(so_list) = } {len(dup_so_list) = } {len(so_dict) = }')
@timer
def process():
# iq = InvoiceQuery(MaxReturned= 20, IncludeLinkedTxns='true', IncludeLineItems='true')
iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate='2024-02-01', TxnDateRangeFilter_ToTxnDate='2024-02-29', IncludeLinkedTxns='true', IncludeLineItems='true')
# pprint(iq.all(), sort_dicts=False)
# print(iq.all())
for idx, txn in enumerate(iq.all()):
print(f"{idx = } {txn['RefNumber'] = } {txn['TxnDate'] = } {txn['Subtotal'] = } ")
# print(f"{txn['Subtotal'] = }")
if 'LinkedTxn' in txn:
# pprint(txn['LinkedTxn'], sort_dicts=False)
if not isinstance(txn['LinkedTxn'], list): #if there is no receive payment and only 1 linked traction, need to change to a list. RECORD it
txn_linkedTxn = [txn['LinkedTxn']]
else:
txn_linkedTxn = txn['LinkedTxn']
for linkedtxn in txn_linkedTxn:
try:
if linkedtxn['TxnType']=='SalesOrder':
so = SalesOrderQuery(RefNumber = linkedtxn['RefNumber'], IncludeLinkedTxns='true', debug=False)
is_soLinkedToOneInvoice = False
if 'LinkedTxn' in so.all():
if not isinstance(so.all()['LinkedTxn'], list):
# print(so.all())
so_linkedTxn = [so.all()['LinkedTxn']]
else:
so_linkedTxn = so.all()['LinkedTxn']
# print(so.all())
for solinkedtxn in so_linkedTxn:
# print(len(so_linkedTxn))
if solinkedtxn['TxnType']=='Invoice' and len(so_linkedTxn)==1:
# print(so.all()['RefNumber'], 'the only one SO')
is_soLinkedToOneInvoice=True
# pass
else:
is_soLinkedToOneInvoice=False
print(so.all()['RefNumber'], 'NOT the only One, this SO have other Invoice number')
if float(linkedtxn['Amount'])<0:
if so.all()['TotalAmount']!=linkedtxn['Amount'][1:]:
if is_soLinkedToOneInvoice: #maybe the SO is manually closed, check it item by item, find which item is not in invoice
if so.all()['IsManuallyClosed'] == 'true':
pass
print(f"{so.all()['TxnID'] = } {so.all()['RefNumber'] = }")
else:
print('SO TotalAmount<>Amount in Invoice. not Manually closed and not fully Invoiced')
pprint(f'{linkedtxn = }', sort_dicts=False)
print(f"{so.all()['TxnID'] = } {so.all()['RefNumber'] = }")
print(so.all())
else:
pass # this is one SO is fully invoiced
else:
print('Linkedtxn amount is positif(should be negatif')
except Exception as e:
print('ERROR')
pprint(linkedtxn, sort_dicts=False)
print(f"{so.all()['TxnID'] = }")
print(so.all())
print(e)
break
# print(timeit.repeat(process, repeat=1))
# process()
get_all_so_from_invoice()

File diff suppressed because it is too large Load Diff

View File

@ -8,508 +8,511 @@ from .utils import cleanIncludeRetElements
import json import json
def timing(f): def timing(f):
# @wraps(f) # @wraps(f)
def wrap(*args, **kw): def wrap(*args, **kw):
ts = time() ts = time()
result = f(*args, **kw) result = f(*args, **kw)
te = time() te = time()
print('func:%r args:[%r, %r] took: %2.6f sec' % \ print('func:%r args:[%r, %r] took: %2.6f sec' % \
(f.__name__, args, kw, te-ts)) (f.__name__, args, kw, te-ts))
return result return result
return wrap return wrap
class baseQBQuery: class baseQBQuery:
def __init__(self, *args, **kwargs, ) -> None: def __init__(self, *args, **kwargs, ) -> None:
# print(f'{kwargs = }') # print(f'{kwargs = }')
# print(f'{args = }') # print(f'{args = }')
self.QBXML = None self.QBXML = None
self.QBDict = {} self.QBDict = {}
self.response_string = None self.response_string = None
self.Rs = None self.Rs = None
self.varDict = {} self.varDict = {}
### start ### variable to be replace with other class init value ### start ### variable to be replace with other class init value
self.onError = "continueOnError" self.onError = "continueOnError"
# self.cleanIncludeRetElements = None # self.cleanIncludeRetElements = None
self.includeRetElements_allowed = None self.includeRetElements_allowed = None
self.retName = None self.retName = None
self.defaultFilterKey = None self.defaultFilterKey = None
self.class_debug = False self.class_debug = False
### end ### variable to be replace with other class init value ### end ### variable to be replace with other class init value
self.listOfDict = self.ListOfDict(None, self.varDict, self.retName, False) self.listOfDict = self.ListOfDict(None, self.varDict, self.retName, False)
self.requestID = None self.requestID = None
self.statusCode = -1 self.statusCode = -1
self.statusMessage = "" self.statusMessage = ""
self.statusSeverity = "" self.statusSeverity = ""
self.statusOk = False self.statusOk = False
if self.__class__.__name__=="baseQBQuery": if self.class_debug:
print("baseqbquey same with classname") if self.__class__.__name__=="baseQBQuery":
else: print("baseqbquey same with classname")
print("accessed from child class") else:
print("basequery is accessed from ", self.__class__.__name__ + "Rq") print("accessed from child class")
print("basequery is accessed from ", self.__class__.__name__ + "Rq")
# @timing # @timing
def create_QBXML(self): def create_QBXML(self):
version = "13.0" version = "13.0"
dataDict = { ### Header for qmxml with version attribute dataDict = { ### Header for qmxml with version attribute
"?qbxml": { "?qbxml": {
"@version": version, "@version": version,
} }
} }
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Simple Example ### # dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Simple Example ###
# "@onError": "continueOnError", # "@onError": "continueOnError",
# "GeneralSummaryReportQueryRq": { # "GeneralSummaryReportQueryRq": {
# "GeneralSummaryReportType": self.GeneralSummaryReportType, # "GeneralSummaryReportType": self.GeneralSummaryReportType,
# } # }
# } # }
# } # }
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ### # dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
# "@onError": "continueOnError", # "@onError": "continueOnError",
# "ItemInventoryQueryRq": { # "ItemInventoryQueryRq": {
# "FullName": ["TACO:AA:TH-003AA", # "FullName": ["TACO:AA:TH-003AA",
# "TACO:AA:TH-010AA"] # "TACO:AA:TH-010AA"]
# }, # },
# } # }
# } # }
firstKey = str(list(self.QBDict.keys())[0]) firstKey = str(list(self.QBDict.keys())[0])
dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ### dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
"@onError": self.onError, "@onError": self.onError,
firstKey: self.QBDict[firstKey]}} firstKey: self.QBDict[firstKey]}}
# print(f'{dataDict = }') # print(f'{dataDict = }')
# # QBXML = '<?qbxml version="13.0"?>' + xmltodict.unparse(dataDict, pretty=True) # # QBXML = '<?qbxml version="13.0"?>' + xmltodict.unparse(dataDict, pretty=True)
self.QBXML = xmltodict.unparse(dataDict, pretty=True).replace("</?qbxml>", "").replace(f'version="{version}"', f'version="{version}"?') self.QBXML = xmltodict.unparse(dataDict, pretty=True).replace("</?qbxml>", "").replace(f'version="{version}"', f'version="{version}"?')
print(self.QBXML, type(self.QBXML)) if self.class_debug:
return self.QBXML print(self.QBXML, type(self.QBXML))
return self.QBXML
# @timing # @timing
def connect_to_quickbooks(self, qbxml_query=None): def connect_to_quickbooks(self, qbxml_query=None):
# Connect to Quickbooks # Connect to Quickbooks
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor") sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2') sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2) # ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2) ticket = sessionManager.BeginSession("", 2)
# Send query and receive response # Send query and receive response
self.response_string = sessionManager.ProcessRequest(ticket, self.QBXML) self.response_string = sessionManager.ProcessRequest(ticket, self.QBXML)
# Disconnect from Quickbooks # Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection sessionManager.CloseConnection() # Close the connection
# Beautify response_string # Beautify response_string
# print(f'{self.response_string = }') # print(f'{self.response_string = }')
xml = minidom.parseString(self.response_string.replace("\n", "")) xml = minidom.parseString(self.response_string.replace("\n", ""))
self.response_string = xml.toprettyxml() self.response_string = xml.toprettyxml()
# print(f'{self.response_string = }') # print(f'{self.response_string = }')
self.statusOk = self.isDataOK() self.statusOk = self.isDataOK()
return self.statusOk return self.statusOk
return self.response_string return self.response_string
def isDataOK(self): def isDataOK(self):
# print("isdataok") # print("isdataok")
# QBXML = ET.fromstring(self.response_string) # QBXML = ET.fromstring(self.response_string)
# print(xmltodict.parse(self.response_string)) # print(xmltodict.parse(self.response_string))
self.varDict = xmltodict.parse(self.response_string) self.varDict = xmltodict.parse(self.response_string)
if self.class_debug: if self.class_debug:
pprint.pprint("isDataOK", self.varDict) pprint.pprint("isDataOK", self.varDict)
self.listOfDict.varDict = self.varDict self.listOfDict.varDict = self.varDict
self.listOfDict.filterKey = ["@requestID"] self.listOfDict.filterKey = ["@requestID"]
self.requestID = self.listOfDict.firstValue().get('@requestID',None) self.requestID = self.listOfDict.firstValue().get('@requestID',None)
self.listOfDict.filterKey = ["@statusCode"] self.listOfDict.filterKey = ["@statusCode"]
self.statusCode = self.listOfDict.firstValue().get('@statusCode',None) self.statusCode = self.listOfDict.firstValue().get('@statusCode',None)
self.listOfDict.filterKey = ["@statusMessage"] self.listOfDict.filterKey = ["@statusMessage"]
self.statusMessage = self.listOfDict.firstValue().get('@statusMessage',None) self.statusMessage = self.listOfDict.firstValue().get('@statusMessage',None)
self.listOfDict.filterKey = ["@statusSeverity"] self.listOfDict.filterKey = ["@statusSeverity"]
self.statusSeverity = self.listOfDict.firstValue().get('@statusSeverity') self.statusSeverity = self.listOfDict.firstValue().get('@statusSeverity')
self.listOfDict.filterKey = [self.retName] self.listOfDict.filterKey = [self.retName]
if self.class_debug: if self.class_debug:
print(f'isDataOK -> {self.listOfDict.firstValue() = }') print(f'isDataOK -> {self.listOfDict.firstValue() = }')
if self.listOfDict.firstValue().get(self.retName,None)==None: if self.listOfDict.firstValue().get(self.retName,None)==None:
return False return False
if self.class_debug:
print(f'{self.statusCode = }, {self.statusMessage = }, {self.statusSeverity = }') print(f'{self.statusCode = }, {self.statusMessage = }, {self.statusSeverity = }')
varDict = self.varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"] varDict = self.varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"]
return True return True
# isStatusOK=None # isStatusOK=None
# for _ in self.findKeyInDict("FullName", ): ###berhasil # for _ in self.findKeyInDict("FullName", ): ###berhasil
# print(f'{_ = }') # print(f'{_ = }')
# for _ in self.gen_dict_extract("@statusMessage", self.varDict): # for _ in self.gen_dict_extract("@statusMessage", self.varDict):
# print(_) # print(_)
# if 'Status OK'.lower()==_.lower(): # if 'Status OK'.lower()==_.lower():
# print(_) # print(_)
# isStatusOK = True # isStatusOK = True
# break # break
# else: # else:
# isStatusOK=False # isStatusOK=False
# if self.ListOfDict.find_firstListOfDict("@statusMessage")['@statusMessage'].lower()=="status OK".lower(): # if self.ListOfDict.find_firstListOfDict("@statusMessage")['@statusMessage'].lower()=="status OK".lower():
# # print(f'{self.retName = }') # # print(f'{self.retName = }')
# self.Rs = self.find_firstListOfDict(self.retName)[self.retName] # self.Rs = self.find_firstListOfDict(self.retName)[self.retName]
# # self.Rs=self.returnRet(self.varDict) # # self.Rs=self.returnRet(self.varDict)
# # # print(self.findKeyInDict("FullName", )) ###test # # # print(self.findKeyInDict("FullName", )) ###test
# # print(self.findKeyInDict("FullName", self.findKeyInDict("QBXMLMsgsRs1", ))) ###test # # print(self.findKeyInDict("FullName", self.findKeyInDict("QBXMLMsgsRs1", ))) ###test
# # # print(self.findKeyInDict("@statusMessage", )) ###test # # # print(self.findKeyInDict("@statusMessage", )) ###test
# # for _ in self.findKeyInDict("QBXMLMsgsRs",): ###trial blm berhasil # # for _ in self.findKeyInDict("QBXMLMsgsRs",): ###trial blm berhasil
# # print(f'2{_ = }') # # print(f'2{_ = }')
# # print(f'{self.Rs = }') # # print(f'{self.Rs = }')
# # print(type(self.Rs)) # # print(type(self.Rs))
# # print(self.find_firstListOfDict("FullName")['FullName']) # # print(self.find_firstListOfDict("FullName")['FullName'])
# # print(self.find_firstListOfDict("FullName")) # # print(self.find_firstListOfDict("FullName"))
# # print(self.find_allListOfDict("FullName")) # # print(self.find_allListOfDict("FullName"))
def returnRet(self, varDict:dict = None): def returnRet(self, varDict:dict = None):
if varDict== None: if varDict== None:
varDict=self.varDict varDict=self.varDict
# pprint.pprint(self.varDict) # pprint.pprint(self.varDict)
# print(f'{varDict = }') # print(f'{varDict = }')
varDict = varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"] varDict = varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"]
# print(f'{varDict = }') # print(f'{varDict = }')
for idx, key in enumerate(varDict): for idx, key in enumerate(varDict):
# print(idx, key, len(varDict)) # print(idx, key, len(varDict))
if self.retName in key: if self.retName in key:
return varDict[key] return varDict[key]
return None return None
def runCheck(self): def runCheck(self):
# print("runCheck") # print("runCheck")
if self.varDict: if self.varDict:
return True return True
if self.response_string: if self.response_string:
return True return True
if self.Rs: if self.Rs:
return True return True
if self.QBDict: if self.QBDict:
self.create_QBXML() self.create_QBXML()
if self.connect_to_quickbooks(): if self.connect_to_quickbooks():
return True return True
return False return False
def __repr__(self) -> str: def __repr__(self) -> str:
self.all() self.all()
# print(f'{self.returnRet() = }') # print(f'{self.returnRet() = }')
return self.response_string return self.response_string
def count(self) -> int: def count(self) -> int:
# objs = self.filter(self.defaultFilterKey).all() # objs = self.filter(self.defaultFilterKey).all()
# print(f"{objs = }", type(objs)) # print(f"{objs = }", type(objs))
return len(self.filter(self.defaultFilterKey).all()) return len(self.filter(self.defaultFilterKey).all())
def filter(self, key=None): def filter(self, key=None):
print(f'{key = }') print(f'{key = }')
# print(f'{self.statusOk = }') # print(f'{self.statusOk = }')
if not self.runCheck(): if not self.runCheck():
print("not runcheck") print("not runcheck")
return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk) return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)
return [] return []
if isinstance(key, str): if isinstance(key, str):
key = [key] key = [key]
elif isinstance(key, list): elif isinstance(key, list):
pass pass
elif isinstance(key, dict): elif isinstance(key, dict):
key = [x for x,y in key.items()] key = [x for x,y in key.items()]
elif key is None: elif key is None:
# print(f"key is none. {self.retName = }") # print(f"key is none. {self.retName = }")
return self.ListOfDict(self.retName, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)#.firstValue()#[self.retName] return self.ListOfDict(self.retName, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)#.firstValue()#[self.retName]
else: else:
return [] return []
key = cleanIncludeRetElements(self.includeRetElements_allowed, key) key = cleanIncludeRetElements(self.includeRetElements_allowed, key)
# print(f'f {key = }') # print(f'f {key = }')
if key: if key:
return self.ListOfDict(key, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk) return self.ListOfDict(key, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)
else: else:
return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk) return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)
### dont use this way, better returning class because the value if you assign to variable, the valu will be the last filterKey inputed ### dont use this way, better returning class because the value if you assign to variable, the valu will be the last filterKey inputed
### if return class, every filterKey is an object, different from other filterKey ### if return class, every filterKey is an object, different from other filterKey
self.listOfDict.varDict = self.varDict self.listOfDict.varDict = self.varDict
self.listOfDict.filterKey = key self.listOfDict.filterKey = key
return self.listOfDict return self.listOfDict
### ###
def all(self) -> dict: def all(self) -> dict:
if not self.runCheck(): if not self.runCheck():
return None return None
# return self.ListOfDict(None, self.varDict, self.retName).firstValue() # return self.ListOfDict(None, self.varDict, self.retName).firstValue()
temp = self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue() temp = self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue()
if temp: if temp:
temp = temp[self.retName] temp = temp[self.retName]
else: else:
return {'status':"Error", 'statusCode': self.statusCode, 'statusMessage':self.statusMessage, 'statusSeverity': self.statusSeverity} return {'status':"Error", 'statusCode': self.statusCode, 'statusMessage':self.statusMessage, 'statusSeverity': self.statusSeverity}
if self.requestID: if self.requestID:
temp['requestID']=self.requestID temp['requestID']=self.requestID
# print(f'{temp = }') # print(f'{temp = }')
return temp return temp
# return self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue()[self.retName] # return self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue()[self.retName]
### dont use this way ### dont use this way
self.listOfDict.varDict = self.varDict self.listOfDict.varDict = self.varDict
self.listOfDict.filterKey = self.retName self.listOfDict.filterKey = self.retName
return self.listOfDict return self.listOfDict
### ###
def to_json(self) -> str: def to_json(self) -> str:
return json.dumps(self.all()) return json.dumps(self.all())
class ListOfDict:
def __init__(self, key, var, retName, includeRetElements_allowed:list ,statusOk:bool = True) -> None:
# print(f'{key =}, {var =}')
# self.first = self.find_firstListOfDict(key)
if key:
if isinstance(key, str):
self.filterKey = [key]
else:
self.filterKey = key
else:
self.filterKey = [retName]
# print(f"{self.filterKey = }")
self.varDict = var
self.statusOk = statusOk
self._includeRetElements_allowed = includeRetElements_allowed
# print("listofDict")
def __repr__(self) -> str:
return str(self.all())
# def filter(self, filterKey):
# self.filterKey=filterKey
def getValuesOf(self, key:str=None, var:dict=None, dataRetList:list=None) :
if key==None:
key = self.filterKey
elif isinstance(key, str):
key=[key]
elif isinstance(key, list):
pass
else:
raise TypeError(f'{key=} should be string not {type(key)}')
print(key)
key = cleanIncludeRetElements(self._includeRetElements_allowed, key)
print(key)
if len(key)==0:
key = self.filterKey
else:
key = key
# print(f'getvaluesof {key = }')
# for xdct in self.findKeyInDict(var, dataRetList):
# print(f'{xdct = }', type(xdct), self.filterKey[0], key)
lstresult = []
for x in self.findKeyInDict(var, dataRetList):
templstresult = []
for y in key:
templstresult.append(x.get(y, ""))
lstresult.append(templstresult)
print(f'{lstresult[-1] =}')
return lstresult
_lst = [x[key] for x in self.findKeyInDict(var, dataRetList)]
# print(_dct)
return _lst
def all(self, var:dict=None, dataRetList:list=None) -> list:
# print(f'{self.statusOk = }')
if not self.statusOk:
return []
_lst = [x for x in self.findKeyInDict(var, dataRetList)]
# _lst = [x[self.filterKey] for x in self.findKeyInDict(var, dataRetList)]
# if _lst:
return _lst
# else:
# return []
def allOnlyValue(self, var:dict=None, dataRetList:list=None):
if not self.statusOk:
return []
_lst = [x for x in self.findKeyInDict(var, dataRetList)]
return _lst
def first(self, var:dict=None, dataRetList:list=None) -> dict:
if not self.statusOk:
return {}
return next(self.findKeyInDict( var, dataRetList), {})
def firstValue(self, var:dict=None, dataRetList:list=None) ->dict:
if not self.statusOk:
print("firstValue statusOk is False")
return {}
# return self.first(var, dataRetList)[self.filterKey]
_val=self.first(var, dataRetList)
# print(f'{_val = }')
if _val:
# return _val[self.filterKey]
return _val
else:
return {}
def last(self, var:dict=None, dataRetList:list=None) -> dict:
if not self.statusOk:
return {}
# *_, last = self.findKeyInDict( var, dataRetList)
_val= self.all(var, dataRetList)
if _val:return _val[-1]
else: return {}
def lastValue(self, var:dict=None, dataRetList:list=None) -> dict:
if not self.statusOk:
return {}
_val=self.last(var, dataRetList)
# print(f"lastValue {_val =}")
if _val:
# return _val[self.filterKey]
return _val
else:
return {}
def count(self, var:dict=None, dataRetList:list=None) -> int:
if not self.statusOk:
return 0
# print(len(self.all()))
return len(self.all())
# def findKeyInDict(self, var:dict=None, dataRetList:list=None, ):
# # print("genfinekeys")
# if var==None:
# var=self.varDict
# # print(f"{var = }")
# if dataRetList is None:
# dataRetList = []
# if isinstance(var, list):
# # print("list var")
# for _ in var:
# yield from self.findKeyInDict( _, )
# elif isinstance(var, dict):
# # print("dict var")
# if self.filterKey in var:
# dataRetList.append({self.filterKey: var[self.filterKey]})
# print(f"{dataRetList = }")
# yield {self.filterKey: var[self.filterKey]}
# else:
# # print(f'dict else var={var}')
# for _ in var:
# # print(_)
# yield from self.findKeyInDict(var[_], )
# return dataRetList
def findKeyInDict(self, var:dict=None, dataRetList:list=None, ):
# print("genfinekeys")
if var==None:
var=self.varDict
# print(f"{var = }")
if dataRetList is None:
dataRetList = []
if isinstance(var, list):
# print("list var")
for _ in var:
yield from self.findKeyInDict( _, )
elif isinstance(var, dict):
# print("dict var")
found = False
tempDct = {}
for fKey in self.filterKey:
# if self.filterKey in var:
if fKey in var:
found = True
tempDct[fKey]=var[fKey]
# print(f'{tempDct = }')
if found:
# dataRetList.append({self.filterKey: var[self.filterKey]})
dataRetList.append(tempDct)
# print(f"{dataRetList = }")
yield tempDct #{self.filterKey: var[self.filterKey]}
else:
# print(f'dict else var={var}')
for _ in var:
# print(_)
yield from self.findKeyInDict(var[_], )
return dataRetList
# def find_allListOfDict(self, key, var:dict=None, dataRetList:list=None):
# return [x for x in self.findKeyInDict(key, var, dataRetList)]
# def find_firstListOfDictValue(self, key, var:dict=None, dataRetList:list=None):
# return self.find_firstListOfDict(key, var, dataRetList)[key]
# def find_firstListOfDict(self, key, var:dict=None, dataRetList:list=None):
# return next(self.findKeyInDict(key, var, dataRetList), None)
# def findKeyInDict(self, key, var:dict=None, dataRetList:list=None, ):
# # print("genfinekeys")
# if var==None:
# var=self.varDict
# # print(f"{var = }")
# if dataRetList is None:
# dataRetList = []
# if isinstance(var, list):
# # print("list var")
# for _ in var:
# yield from self.findKeyInDict(key, _, )
# elif isinstance(var, dict):
# # print("dict var")
# if key in var:
# dataRetList.append({key: var[key]})
# # print(f"{dataRetList = }")
# yield {key: var[key]}
# else:
# # print(f'dict else var={var}')
# for _ in var:
# # print(_)
# yield from self.findKeyInDict(key, var[_], )
# return dataRetList
#### dont delete.
### Example of extracting dictionary value by key
def gen_dict_extract(self, key, var:dict=None): ### Utils
if var==None:
var=self.response_string
# print("var")
if hasattr(var,'items'): # hasattr(var,'items') for python 3, hasattr(var,'iteritems') for python 2
# print("hassattr")
for k, v in var.items(): # var.items() for python 3, var.iteritems() for python 2
# print(k,v)
if k == key:
yield v
if isinstance(v, dict):
for result in self.gen_dict_extract(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in self.gen_dict_extract(key, d):
yield result
def __str__(self, *args, **kwargs) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# print("__str__")
return str(self.all())
return self.__class__.__name__
return str(self.get_datarow())
# def get_datarow(self, *args):
# return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
# def get_dict(self, *args):
# return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
def status_ok(self, QBXML): class ListOfDict:
GSRQRs=QBXML.find('.//GeneralSummaryReportQueryRs') def __init__(self, key, var, retName, includeRetElements_allowed:list ,statusOk:bool = True) -> None:
status_code = GSRQRs.attrib #.get('statusCode') # print(f'{key =}, {var =}')
# print(GSRQRs.attrib) # self.first = self.find_firstListOfDict(key)
# print(GSRQRs.attrib['statusCode']) if key:
status=GSRQRs.attrib.get('statusMessage') if isinstance(key, str):
self.filterKey = [key]
print(f'status={status}') else:
if 'OK' in status: self.filterKey = key
return True, status_code else:
else: self.filterKey = [retName]
return False, status_code # print(f"{self.filterKey = }")
self.varDict = var
self.statusOk = statusOk
self._includeRetElements_allowed = includeRetElements_allowed
# print("listofDict")
def __repr__(self) -> str:
return str(self.all())
# def filter(self, filterKey):
# self.filterKey=filterKey
def getValuesOf(self, key:str=None, var:dict=None, dataRetList:list=None) :
if key==None:
key = self.filterKey
elif isinstance(key, str):
key=[key]
elif isinstance(key, list):
pass
else:
raise TypeError(f'{key=} should be string not {type(key)}')
print(key)
key = cleanIncludeRetElements(self._includeRetElements_allowed, key)
print(key)
if len(key)==0:
key = self.filterKey
else:
key = key
# print(f'getvaluesof {key = }')
# for xdct in self.findKeyInDict(var, dataRetList):
# print(f'{xdct = }', type(xdct), self.filterKey[0], key)
lstresult = []
for x in self.findKeyInDict(var, dataRetList):
templstresult = []
for y in key:
templstresult.append(x.get(y, ""))
lstresult.append(templstresult)
print(f'{lstresult[-1] =}')
return lstresult
_lst = [x[key] for x in self.findKeyInDict(var, dataRetList)]
# print(_dct)
return _lst
def all(self, var:dict=None, dataRetList:list=None) -> list:
# print(f'{self.statusOk = }')
if not self.statusOk:
return []
_lst = [x for x in self.findKeyInDict(var, dataRetList)]
# _lst = [x[self.filterKey] for x in self.findKeyInDict(var, dataRetList)]
# if _lst:
return _lst
# else:
# return []
def allOnlyValue(self, var:dict=None, dataRetList:list=None):
if not self.statusOk:
return []
_lst = [x for x in self.findKeyInDict(var, dataRetList)]
return _lst
def first(self, var:dict=None, dataRetList:list=None) -> dict:
if not self.statusOk:
return {}
return next(self.findKeyInDict( var, dataRetList), {})
def firstValue(self, var:dict=None, dataRetList:list=None) ->dict:
if not self.statusOk:
print("firstValue statusOk is False")
return {}
# return self.first(var, dataRetList)[self.filterKey]
_val=self.first(var, dataRetList)
# print(f'{_val = }')
if _val:
# return _val[self.filterKey]
return _val
else:
return {}
def last(self, var:dict=None, dataRetList:list=None) -> dict:
if not self.statusOk:
return {}
# *_, last = self.findKeyInDict( var, dataRetList)
_val= self.all(var, dataRetList)
if _val:return _val[-1]
else: return {}
def lastValue(self, var:dict=None, dataRetList:list=None) -> dict:
if not self.statusOk:
return {}
_val=self.last(var, dataRetList)
# print(f"lastValue {_val =}")
if _val:
# return _val[self.filterKey]
return _val
else:
return {}
def count(self, var:dict=None, dataRetList:list=None) -> int:
if not self.statusOk:
return 0
# print(len(self.all()))
return len(self.all())
# def findKeyInDict(self, var:dict=None, dataRetList:list=None, ):
# # print("genfinekeys")
# if var==None:
# var=self.varDict
# # print(f"{var = }")
# if dataRetList is None:
# dataRetList = []
# if isinstance(var, list):
# # print("list var")
# for _ in var:
# yield from self.findKeyInDict( _, )
# elif isinstance(var, dict):
# # print("dict var")
# if self.filterKey in var:
# dataRetList.append({self.filterKey: var[self.filterKey]})
# print(f"{dataRetList = }")
# yield {self.filterKey: var[self.filterKey]}
# else:
# # print(f'dict else var={var}')
# for _ in var:
# # print(_)
# yield from self.findKeyInDict(var[_], )
# return dataRetList
def findKeyInDict(self, var:dict=None, dataRetList:list=None, ):
# print("genfinekeys")
if var==None:
var=self.varDict
# print(f"{var = }")
if dataRetList is None:
dataRetList = []
if isinstance(var, list):
# print("list var")
for _ in var:
yield from self.findKeyInDict( _, )
elif isinstance(var, dict):
# print("dict var")
found = False
tempDct = {}
for fKey in self.filterKey:
# if self.filterKey in var:
if fKey in var:
found = True
tempDct[fKey]=var[fKey]
# print(f'{tempDct = }')
if found:
# dataRetList.append({self.filterKey: var[self.filterKey]})
dataRetList.append(tempDct)
# print(f"{dataRetList = }")
yield tempDct #{self.filterKey: var[self.filterKey]}
else:
# print(f'dict else var={var}')
for _ in var:
# print(_)
yield from self.findKeyInDict(var[_], )
return dataRetList
# def find_allListOfDict(self, key, var:dict=None, dataRetList:list=None):
# return [x for x in self.findKeyInDict(key, var, dataRetList)]
# def find_firstListOfDictValue(self, key, var:dict=None, dataRetList:list=None):
# return self.find_firstListOfDict(key, var, dataRetList)[key]
# def find_firstListOfDict(self, key, var:dict=None, dataRetList:list=None):
# return next(self.findKeyInDict(key, var, dataRetList), None)
# def findKeyInDict(self, key, var:dict=None, dataRetList:list=None, ):
# # print("genfinekeys")
# if var==None:
# var=self.varDict
# # print(f"{var = }")
# if dataRetList is None:
# dataRetList = []
# if isinstance(var, list):
# # print("list var")
# for _ in var:
# yield from self.findKeyInDict(key, _, )
# elif isinstance(var, dict):
# # print("dict var")
# if key in var:
# dataRetList.append({key: var[key]})
# # print(f"{dataRetList = }")
# yield {key: var[key]}
# else:
# # print(f'dict else var={var}')
# for _ in var:
# # print(_)
# yield from self.findKeyInDict(key, var[_], )
# return dataRetList
#### dont delete.
### Example of extracting dictionary value by key
def gen_dict_extract(self, key, var:dict=None): ### Utils
if var==None:
var=self.response_string
# print("var")
if hasattr(var,'items'): # hasattr(var,'items') for python 3, hasattr(var,'iteritems') for python 2
# print("hassattr")
for k, v in var.items(): # var.items() for python 3, var.iteritems() for python 2
# print(k,v)
if k == key:
yield v
if isinstance(v, dict):
for result in self.gen_dict_extract(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in self.gen_dict_extract(key, d):
yield result
def __str__(self, *args, **kwargs) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# print("__str__")
return str(self.all())
return self.__class__.__name__
return str(self.get_datarow())
# def get_datarow(self, *args):
# return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
# def get_dict(self, *args):
# return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
def status_ok(self, QBXML):
GSRQRs=QBXML.find('.//GeneralSummaryReportQueryRs')
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'status={status}')
if 'OK' in status:
return True, status_code
else:
return False, status_code
if __name__ == '__main__': if __name__ == '__main__':
pass pass