import pprint import xmltodict import win32com.client # import xml.etree.ElementTree as ET import xml.dom.minidom as minidom from time import time from utils import cleanIncludeRetElements import json def timing(f): # @wraps(f) def wrap(*args, **kw): ts = time() result = f(*args, **kw) te = time() print('func:%r args:[%r, %r] took: %2.6f sec' % \ (f.__name__, args, kw, te-ts)) return result return wrap class baseQBQuery: def __init__(self, *args, **kwargs, ) -> None: # print(f'{kwargs = }') # print(f'{args = }') self.QBXML = None self.QBDict = {} self.response_string = None self.Rs = None self.varDict = {} ### start ### variable to be replace with other class init value self.onError = "continueOnError" # self.cleanIncludeRetElements = None self.includeRetElements_allowed = None self.retName = None self.defaultFilterKey = None self.class_debug = False ### end ### variable to be replace with other class init value self.listOfDict = self.ListOfDict(None, self.varDict, self.retName, False) self.requestID = None self.statusCode = -1 self.statusMessage = "" self.statusSeverity = "" self.statusOk = False self.retCount = -1 if self.__class__.__name__=="baseQBQuery": print("baseqbquey same with classname") else: print("accessed from child class") print("basequery is accessed from ", self.__class__.__name__ + "Rq") # @timing def create_QBXML(self): version = "13.0" dataDict = { ### Header for qmxml with version attribute "?qbxml": { "@version": version, } } # dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Simple Example ### # "@onError": "continueOnError", # "GeneralSummaryReportQueryRq": { # "GeneralSummaryReportType": self.GeneralSummaryReportType, # } # } # } # dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ### # "@onError": "continueOnError", # "ItemInventoryQueryRq": { # "FullName": ["TACO:AA:TH-003AA", # "TACO:AA:TH-010AA"] # }, # } # } firstKey = str(list(self.QBDict.keys())[0]) dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ### "@onError": self.onError, firstKey: self.QBDict[firstKey]}} # print(f'{dataDict = }') # # QBXML = '' + xmltodict.unparse(dataDict, pretty=True) self.QBXML = xmltodict.unparse(dataDict, pretty=True).replace("", "").replace(f'version="{version}"', f'version="{version}"?') print(self.QBXML, type(self.QBXML)) return self.QBXML # @timing def connect_to_quickbooks(self, qbxml_query=None): # Connect to Quickbooks sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor") sessionManager.OpenConnection('', 'DASA2') # ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2) ticket = sessionManager.BeginSession("", 2) # Send query and receive response self.response_string = sessionManager.ProcessRequest(ticket, self.QBXML) # Disconnect from Quickbooks sessionManager.EndSession(ticket) # Close the company file sessionManager.CloseConnection() # Close the connection # Beautify response_string # print(f'{self.response_string = }') xml = minidom.parseString(self.response_string.replace("\n", "")) self.response_string = xml.toprettyxml() # print(f'{self.response_string = }') self.statusOk = self.isDataOK() return self.statusOk return self.response_string def isDataOK(self): # print("isdataok") # QBXML = ET.fromstring(self.response_string) # print(xmltodict.parse(self.response_string)) self.varDict = xmltodict.parse(self.response_string) # print("isDataOK", self.varDict) if self.class_debug: pprint.pprint("isDataOK", self.varDict) self.listOfDict.varDict = self.varDict self.listOfDict.filterKey = ["@requestID"] self.requestID = self.listOfDict.firstValue().get('@requestID',None) self.listOfDict.filterKey = ["@statusCode"] self.statusCode = self.listOfDict.firstValue().get('@statusCode',None) self.listOfDict.filterKey = ["@statusMessage"] self.statusMessage = self.listOfDict.firstValue().get('@statusMessage',None) self.listOfDict.filterKey = ["@statusSeverity"] self.statusSeverity = self.listOfDict.firstValue().get('@statusSeverity') self.listOfDict.filterKey = ["@retCount"] self.retCount = self.listOfDict.firstValue().get('@retCount') self.listOfDict.filterKey = [self.retName] if self.class_debug: print(f'isDataOK -> {self.listOfDict.firstValue() = }') if self.listOfDict.firstValue().get(self.retName,None)==None: return False print(f'{self.statusCode = }, {self.statusMessage = }, {self.statusSeverity = } {self.retCount = }') varDict = self.varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"] return True # isStatusOK=None # for _ in self.findKeyInDict("FullName", ): ###berhasil # print(f'{_ = }') # for _ in self.gen_dict_extract("@statusMessage", self.varDict): # print(_) # if 'Status OK'.lower()==_.lower(): # print(_) # isStatusOK = True # break # else: # isStatusOK=False # if self.ListOfDict.find_firstListOfDict("@statusMessage")['@statusMessage'].lower()=="status OK".lower(): # # print(f'{self.retName = }') # self.Rs = self.find_firstListOfDict(self.retName)[self.retName] # # self.Rs=self.returnRet(self.varDict) # # # print(self.findKeyInDict("FullName", )) ###test # # print(self.findKeyInDict("FullName", self.findKeyInDict("QBXMLMsgsRs1", ))) ###test # # # print(self.findKeyInDict("@statusMessage", )) ###test # # for _ in self.findKeyInDict("QBXMLMsgsRs",): ###trial blm berhasil # # print(f'2{_ = }') # # print(f'{self.Rs = }') # # print(type(self.Rs)) # # print(self.find_firstListOfDict("FullName")['FullName']) # # print(self.find_firstListOfDict("FullName")) # # print(self.find_allListOfDict("FullName")) def returnRet(self, varDict:dict = None): if varDict== None: varDict=self.varDict # pprint.pprint(self.varDict) # print(f'{varDict = }') varDict = varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"] # print(f'{varDict = }') for idx, key in enumerate(varDict): # print(idx, key, len(varDict)) if self.retName in key: return varDict[key] return None def runCheck(self): # print("runCheck") if self.varDict: return True if self.response_string: return True if self.Rs: return True if self.QBDict: self.create_QBXML() if self.connect_to_quickbooks(): return True return False def __repr__(self) -> str: self.all() # print(f'{self.returnRet() = }') return self.response_string def count(self) -> int: # objs = self.filter(self.defaultFilterKey).all() # print(f"{objs = }", type(objs)) return len(self.filter(self.defaultFilterKey).all()) def filter(self, key=None): print(f'filter {key = }') # print(f'{self.statusOk = }') if not self.runCheck(): print("not runcheck") return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk) return [] if isinstance(key, str): key = [key] elif isinstance(key, list): pass elif isinstance(key, dict): key = [x for x,y in key.items()] elif key is None: print(f"key is none. {self.retName = }") return self.ListOfDict(self.retName, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)#.firstValue()#[self.retName] else: return [] key = cleanIncludeRetElements(self.includeRetElements_allowed, key) # print(f'f {key = }') if key: return self.ListOfDict(key, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk) else: return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk) ### dont use this way, better returning class because the value if you assign to variable, the valu will be the last filterKey inputed ### if return class, every filterKey is an object, different from other filterKey self.listOfDict.varDict = self.varDict self.listOfDict.filterKey = key return self.listOfDict ### def all(self) -> dict: if not self.runCheck(): return None # return self.ListOfDict(None, self.varDict, self.retName).firstValue() temp = self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue() if temp: temp = temp[self.retName] else: return {'status':"Error", 'statusCode': self.statusCode, 'statusMessage':self.statusMessage, 'statusSeverity': self.statusSeverity, 'retCount':self.retCount} if self.requestID: temp['requestID']=self.requestID # print(f'{temp = }') return temp # return self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue()[self.retName] ### dont use this way self.listOfDict.varDict = self.varDict self.listOfDict.filterKey = self.retName return self.listOfDict ### def to_json(self) -> str: return json.dumps(self.all(), indent = 2) class ListOfDict: def __init__(self, key, var, retName, includeRetElements_allowed:list ,statusOk:bool = True) -> None: # print(f'ListOFDict{key =}, {var =}') # self.first = self.find_firstListOfDict(key) print(f'{retName = }') if key: if isinstance(key, str): self.filterKey = [key] else: self.filterKey = key else: self.filterKey = [retName] print(f"ListOfDict {self.filterKey = }") self.varDict = var self.statusOk = statusOk self._includeRetElements_allowed = includeRetElements_allowed # print("listofDict") def __repr__(self) -> str: return str(self.all()) # def filter(self, filterKey): # self.filterKey=filterKey def getValuesOf(self, key:str=None, var:dict=None, dataRetList:list=None) : if key==None: key = self.filterKey elif isinstance(key, str): key=[key] elif isinstance(key, list): pass else: raise TypeError(f'{key=} should be string not {type(key)}') print(key) key = cleanIncludeRetElements(self._includeRetElements_allowed, key) print(key) if len(key)==0: key = self.filterKey else: key = key # print(f'getvaluesof {key = }') # for xdct in self.findKeyInDict(var, dataRetList): # print(f'{xdct = }', type(xdct), self.filterKey[0], key) lstresult = [] for x in self.findKeyInDict(var, dataRetList): templstresult = [] for y in key: templstresult.append(x.get(y, "")) lstresult.append(templstresult) print(f'{lstresult[-1] =}') return lstresult _lst = [x[key] for x in self.findKeyInDict(var, dataRetList)] # print(_dct) return _lst def all(self, var:dict=None, dataRetList:list=None) -> list: # print(f'{self.statusOk = }') # print(f'all {var = } {dataRetList = }') if not self.statusOk: return [] _lst = [x for x in self.findKeyInDict(var, dataRetList)] # _lst = [x[self.filterKey] for x in self.findKeyInDict(var, dataRetList)] # if _lst: return _lst # else: # return [] def allOnlyValue(self, var:dict=None, dataRetList:list=None): if not self.statusOk: return [] _lst = [x for x in self.findKeyInDict(var, dataRetList)] return _lst def first(self, var:dict=None, dataRetList:list=None) -> dict: if not self.statusOk: return {} return next(self.findKeyInDict( var, dataRetList), {}) def firstValue(self, var:dict=None, dataRetList:list=None) ->dict: if not self.statusOk: print("firstValue statusOk is False") return {} # return self.first(var, dataRetList)[self.filterKey] _val=self.first(var, dataRetList) # print(f'{_val = }') if _val: # return _val[self.filterKey] return _val else: return {} def last(self, var:dict=None, dataRetList:list=None) -> dict: if not self.statusOk: return {} # *_, last = self.findKeyInDict( var, dataRetList) _val= self.all(var, dataRetList) if _val:return _val[-1] else: return {} def lastValue(self, var:dict=None, dataRetList:list=None) -> dict: if not self.statusOk: return {} _val=self.last(var, dataRetList) # print(f"lastValue {_val =}") if _val: # return _val[self.filterKey] return _val else: return {} def count(self, var:dict=None, dataRetList:list=None) -> int: if not self.statusOk: return 0 # print(len(self.all())) return len(self.all()) # def findKeyInDict(self, var:dict=None, dataRetList:list=None, ): # # print("genfinekeys") # if var==None: # var=self.varDict # # print(f"{var = }") # if dataRetList is None: # dataRetList = [] # if isinstance(var, list): # # print("list var") # for _ in var: # yield from self.findKeyInDict( _, ) # elif isinstance(var, dict): # # print("dict var") # if self.filterKey in var: # dataRetList.append({self.filterKey: var[self.filterKey]}) # print(f"{dataRetList = }") # yield {self.filterKey: var[self.filterKey]} # else: # # print(f'dict else var={var}') # for _ in var: # # print(_) # yield from self.findKeyInDict(var[_], ) # return dataRetList def findKeyInDict(self, var:dict=None, dataRetList:list=None, ): # print("genfinekeys") if var==None: var=self.varDict # print(f"{var = }") if dataRetList is None: dataRetList = [] if isinstance(var, list): # print("list var") for _ in var: yield from self.findKeyInDict( _, ) elif isinstance(var, dict): # print("dict var") found = False tempDct = {} for fKey in self.filterKey: # print(f'{fKey = } {self.filterKey = }') # if self.filterKey in var: if fKey in var: found = True tempDct[fKey]=var[fKey] # print(f'{tempDct = }') if found: # dataRetList.append({self.filterKey: var[self.filterKey]}) dataRetList.append(tempDct) # print(f"{dataRetList = }") yield tempDct #{self.filterKey: var[self.filterKey]} else: # print(f'dict else var={var}') for _ in var: # print(_) yield from self.findKeyInDict(var[_], ) return dataRetList # def find_allListOfDict(self, key, var:dict=None, dataRetList:list=None): # return [x for x in self.findKeyInDict(key, var, dataRetList)] # def find_firstListOfDictValue(self, key, var:dict=None, dataRetList:list=None): # return self.find_firstListOfDict(key, var, dataRetList)[key] # def find_firstListOfDict(self, key, var:dict=None, dataRetList:list=None): # return next(self.findKeyInDict(key, var, dataRetList), None) # def findKeyInDict(self, key, var:dict=None, dataRetList:list=None, ): # # print("genfinekeys") # if var==None: # var=self.varDict # # print(f"{var = }") # if dataRetList is None: # dataRetList = [] # if isinstance(var, list): # # print("list var") # for _ in var: # yield from self.findKeyInDict(key, _, ) # elif isinstance(var, dict): # # print("dict var") # if key in var: # dataRetList.append({key: var[key]}) # # print(f"{dataRetList = }") # yield {key: var[key]} # else: # # print(f'dict else var={var}') # for _ in var: # # print(_) # yield from self.findKeyInDict(key, var[_], ) # return dataRetList #### dont delete. ### Example of extracting dictionary value by key def gen_dict_extract(self, key, var:dict=None): ### Utils if var==None: var=self.response_string # print("var") if hasattr(var,'items'): # hasattr(var,'items') for python 3, hasattr(var,'iteritems') for python 2 # print("hassattr") for k, v in var.items(): # var.items() for python 3, var.iteritems() for python 2 # print(k,v) if k == key: yield v if isinstance(v, dict): for result in self.gen_dict_extract(key, v): yield result elif isinstance(v, list): for d in v: for result in self.gen_dict_extract(key, d): yield result def __str__(self, *args, **kwargs) -> str: # return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))) # print("__str__") return str(self.all()) return self.__class__.__name__ return str(self.get_datarow()) # def get_datarow(self, *args): # return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())) # def get_dict(self, *args): # return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))) def status_ok(self, QBXML): GSRQRs=QBXML.find('.//GeneralSummaryReportQueryRs') status_code = GSRQRs.attrib #.get('statusCode') # print(GSRQRs.attrib) # print(GSRQRs.attrib['statusCode']) status=GSRQRs.attrib.get('statusMessage') print(f'status={status}') if 'OK' in status: return True, status_code else: return False, status_code if __name__ == '__main__': pass