import pprint import xmltodict import win32com.client import xml.etree.ElementTree as ET from time import time def timing(f): # @wraps(f) def wrap(*args, **kw): ts = time() result = f(*args, **kw) te = time() print('func:%r args:[%r, %r] took: %2.6f sec' % \ (f.__name__, args, kw, te-ts)) return result return wrap class baseQBQuery: def __init__(self, *args, **kwargs, ) -> None: # print(f'{kwargs = }') # print(f'{args = }') self.onError = "continueOnError" self.QBXML = None self.QBDict = {} self.response_string = None self.Rs = None self.varDict = {} self.retName = None self.listOfDict = self.ListOfDict(None, self.varDict, self.retName) self.statusCode = -1 self.statusMessage = "" self.statusSeverity = "" @timing def create_QBXML(self): dataDict = { ### Header for qmxml with version attribute "?qbxml": { "@version": "13.0", } } # dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Simple Example ### # "@onError": "continueOnError", # "GeneralSummaryReportQueryRq": { # "GeneralSummaryReportType": self.GeneralSummaryReportType, # } # } # } # dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ### # "@onError": "continueOnError", # "ItemInventoryQueryRq": { # "FullName": ["TACO:AA:TH-003AA", # "TACO:AA:TH-010AA"] # }, # } # } firstKey = str(list(self.QBDict.keys())[0]) dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ### "@onError": self.onError, firstKey: self.QBDict[firstKey]}} # print(f'{dataDict = }') # # QBXML = '' + xmltodict.unparse(dataDict, pretty=True) self.QBXML = xmltodict.unparse(dataDict, pretty=True).replace("", "") print(self.QBXML) return self.QBXML @timing def connect_to_quickbooks(self, qbxml_query=None): # Connect to Quickbooks sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor") sessionManager.OpenConnection('', 'DASA2') # ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2) ticket = sessionManager.BeginSession("", 2) # Send query and receive response self.response_string = sessionManager.ProcessRequest(ticket, self.QBXML) # Disconnect from Quickbooks sessionManager.EndSession(ticket) # Close the company file sessionManager.CloseConnection() # Close the connection # print(f'{self.response_string = }') self.isDataOK() return self.response_string def isDataOK(self): # print("isdataok") # QBXML = ET.fromstring(self.response_string) # print(xmltodict.parse(self.response_string)) self.varDict = xmltodict.parse(self.response_string) pprint.pprint(self.varDict) self.listOfDict.varDict = self.varDict self.listOfDict.filterKey = "@statusCode" self.statusCode = self.listOfDict.firstValue() self.listOfDict.filterKey = "@statusMessage" self.statusMessage = self.listOfDict.firstValue() self.listOfDict.filterKey = "@statusSeverity" self.statusSeverity = self.listOfDict.firstValue() print(f'{self.statusCode = }, {self.statusMessage = }, {self.statusSeverity = }') # isStatusOK=None # for _ in self.find_listOfDict("FullName", ): ###berhasil # print(f'{_ = }') # for _ in self.gen_dict_extract("@statusMessage", self.varDict): # print(_) # if 'Status OK'.lower()==_.lower(): # print(_) # isStatusOK = True # break # else: # isStatusOK=False # if self.ListOfDict.find_firstListOfDict("@statusMessage")['@statusMessage'].lower()=="status OK".lower(): # # print(f'{self.retName = }') # self.Rs = self.find_firstListOfDict(self.retName)[self.retName] # # self.Rs=self.returnRet(self.varDict) # # # print(self.find_listOfDict("FullName", )) ###test # # print(self.find_listOfDict("FullName", self.find_listOfDict("QBXMLMsgsRs1", ))) ###test # # # print(self.find_listOfDict("@statusMessage", )) ###test # # for _ in self.find_listOfDict("QBXMLMsgsRs",): ###trial blm berhasil # # print(f'2{_ = }') # # print(f'{self.Rs = }') # # print(type(self.Rs)) # # print(self.find_firstListOfDict("FullName")['FullName']) # # print(self.find_firstListOfDict("FullName")) # # print(self.find_allListOfDict("FullName")) def returnRet(self, varDict): # pprint.pprint(self.varDict) print(f'{varDict = }') varDict = varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"] print(f'{varDict = }') for idx, key in enumerate(varDict): # print(idx, key, len(varDict)) if "Ret" in key: return varDict[key] return None def runCheck(self): if self.varDict: return True if self.response_string: return True if self.Rs: return True if self.QBDict: self.create_QBXML() self.connect_to_quickbooks() return True return False def filter(self, key): if not self.runCheck(): return None return self.ListOfDict(key, self.varDict, self.retName) ### dont use this way, better returning class because the value if you assign to variable, the valu will be the last filterKey inputed ### if return class, every filterKey is an object, different from other filterKey self.listOfDict.varDict = self.varDict self.listOfDict.filterKey = key return self.listOfDict ### def all(self): if not self.runCheck(): return None return self.ListOfDict(None, self.varDict, self.retName).firstValue() ### dont use this way self.listOfDict.varDict = self.varDict self.listOfDict.filterKey = self.retName return self.listOfDict ### class ListOfDict: def __init__(self, key, var, retName) -> None: # print(f'{key =}, {var =}') # self.first = self.find_firstListOfDict(key) if key: self.filterKey = key else: self.filterKey = retName # print(f"{self.filterKey = }") self.varDict = var # print("listofDict") def __repr__(self) -> str: return str(self.all()) def filter(self, filterKey): self.filterKey=filterKey def all(self, var:dict=None, dataRetList:list=None): _lst = [x[self.filterKey] for x in self.find_listOfDict(var, dataRetList)] # if _lst: return _lst # else: # return [None] def allOnlyValue(self, var:dict=None, dataRetList:list=None): _lst = [x for x in self.find_listOfDict(var, dataRetList)] def first(self, var:dict=None, dataRetList:list=None): return next(self.find_listOfDict( var, dataRetList), None) def firstValue(self, var:dict=None, dataRetList:list=None): # return self.first(var, dataRetList)[self.filterKey] _val=self.first(var, dataRetList) if _val: return _val[self.filterKey] else: return None def last(self, var:dict=None, dataRetList:list=None): # *_, last = self.find_listOfDict( var, dataRetList) _val= self.all(var, dataRetList) if _val:return _val[-1] else: return None def lastValue(self, var:dict=None, dataRetList:list=None): _val=self.last(var, dataRetList) # print(f"lastValue {_val =}") if _val: return _val[self.filterKey] else: return None def count(self, var:dict=None, dataRetList:list=None): return len(self.all()) def find_listOfDict(self, var:dict=None, dataRetList:list=None, ): # print("genfinekeys") if var==None: var=self.varDict # print(f"{var = }") if dataRetList is None: dataRetList = [] if isinstance(var, list): # print("list var") for _ in var: yield from self.find_listOfDict( _, ) elif isinstance(var, dict): # print("dict var") if self.filterKey in var: dataRetList.append({self.filterKey: var[self.filterKey]}) # print(f"{dataRetList = }") yield {self.filterKey: var[self.filterKey]} else: # print(f'dict else var={var}') for _ in var: # print(_) yield from self.find_listOfDict(var[_], ) return dataRetList # def find_allListOfDict(self, key, var:dict=None, dataRetList:list=None): # return [x for x in self.find_listOfDict(key, var, dataRetList)] # def find_firstListOfDictValue(self, key, var:dict=None, dataRetList:list=None): # return self.find_firstListOfDict(key, var, dataRetList)[key] # def find_firstListOfDict(self, key, var:dict=None, dataRetList:list=None): # return next(self.find_listOfDict(key, var, dataRetList), None) # def find_listOfDict(self, key, var:dict=None, dataRetList:list=None, ): # # print("genfinekeys") # if var==None: # var=self.varDict # # print(f"{var = }") # if dataRetList is None: # dataRetList = [] # if isinstance(var, list): # # print("list var") # for _ in var: # yield from self.find_listOfDict(key, _, ) # elif isinstance(var, dict): # # print("dict var") # if key in var: # dataRetList.append({key: var[key]}) # # print(f"{dataRetList = }") # yield {key: var[key]} # else: # # print(f'dict else var={var}') # for _ in var: # # print(_) # yield from self.find_listOfDict(key, var[_], ) # return dataRetList def gen_dict_extract(self, key, var:dict=None): ### Utils if var==None: var=self.response_string # print("var") if hasattr(var,'items'): # hasattr(var,'items') for python 3, hasattr(var,'iteritems') for python 2 # print("hassattr") for k, v in var.items(): # var.items() for python 3, var.iteritems() for python 2 # print(k,v) if k == key: yield v if isinstance(v, dict): for result in self.gen_dict_extract(key, v): yield result elif isinstance(v, list): for d in v: for result in self.gen_dict_extract(key, d): yield result def __str__(self, *args) -> str: # return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))) # print("__str__") return str(self.get_datarow()) # return "hello" # def get_datarow(self, *args): # return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())) # def get_dict(self, *args): # return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))) # def get_total(self): # return self._get_total(self.connect_to_quickbooks(self.create_QBXML())) def status_ok(self, QBXML): GSRQRs=QBXML.find('.//GeneralSummaryReportQueryRs') status_code = GSRQRs.attrib #.get('statusCode') # print(GSRQRs.attrib) # print(GSRQRs.attrib['statusCode']) status=GSRQRs.attrib.get('statusMessage') print(f'status={status}') if 'OK' in status: return True, status_code else: return False, status_code # def get_coldesc(self, QBXML): # coldescs = QBXML.findall('.//ColDesc') # coldesclist=[] # firstcol=[] # allcols=[] # for idx, coldesc in enumerate(coldescs): # coltitles = coldesc.findall('ColTitle') # # coldesclist.append(coltitles[1].attrib.get('value')) # firstcol.append(coltitles[0].attrib.get('value')) # cols=[] # for idy, coltitle in enumerate(coltitles): # cols.append(coltitle.attrib.get('value')) # # allcols[idy].append(coltitle.attrib.get('value')) # # print(idx, coltitle.tag) # # print(idx, coltitle.attrib.get('value')) # allcols.append(cols) # print(f'getcoldesc:{firstcol}; coldesclist:{coldesclist}; allcols:{allcols}') # # print(allcols) # return allcols # def _get_total(self, response_string): # # print(response_string) # # Parse the response into an Element Tree and peel away the layers of response # QBXML = ET.fromstring(response_string) # if self.status_ok(QBXML): # print("") # # print(QBXML) # # QBXMLMsgsRs = QBXML.find('QBXMLMsgsRs') #this is a must have. dont CHANGE. this is the root # total = QBXML.findall('.//TotalRow') # return {'Total':total[0][1].attrib.get('value')} # else: # return () # def get_reportsubtitle(self, QBXML): # reportsubtitle=QBXML.find('.//ReportSubtitle').text # return reportsubtitle # def _get_datarow(self, response_string): # # print(response_string) # print('get_datarow') # # Parse the response into an Element Tree and peel away the layers of response # QBXML = ET.fromstring(response_string) # datadict={} # reportsubtitle=None # total = float(QBXML.find('.//TotalRow')[1].attrib.get('value')) # status, statusdict= self.status_ok(QBXML) # print(status, statusdict, total) # if status and total > 0: # # print("get_datarow status ok") # reportsubtitle=self.get_reportsubtitle(QBXML) # DataRowRs = QBXML.iter("DataRow") # col_desc=self.get_coldesc(QBXML) # # print(f'getdatarow coldesc:{col_desc}') # # print(f'Datarow length:{len(list(DataRowRs))}') # temp=[] # rowvalue=[] # amount=[] # rowType=[] # idxerrorcount=0 # # print(DataRowRs) # for idx_datarow, DataRow in enumerate(DataRowRs): # # print(DataRow.attrib.get('rowNumber')) # RowData=DataRow.find('RowData') # if RowData is None: # print(f'{idx_datarow} none continue') # idxerrorcount+=1 # continue # if 'value' not in DataRow.find('RowData').attrib: # print(f'{idx_datarow} value continue') # idxerrorcount+=1 # continue # RowData_value=DataRow.find('RowData').attrib.get('value') # rowType.append(RowData_value) # ColDatas = DataRow.findall("ColData") # cols=[] # for idx_coldata, coldata in enumerate(ColDatas): # if idx_coldata==0: # cols.append(rowType[idx_datarow-idxerrorcount]) # else: # cols.append(coldata.attrib.get('value')) # rowvalue.append(cols) # temp.append(ColDatas[0].attrib.get('value')) # # datadict[ColDatas[0].attrib.get('colID')]=temp # amount.append(ColDatas[1].attrib.get('value')) # # datadict[ColDatas[1].attrib.get('colID')]=amount # # print(rowvalue) # datadict['rowvalue']=rowvalue # datadict['RowType']=rowType # datadict['CustomerFullName']=temp #ganti dgn rowType # datadict['Sales']=amount # # print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}") # df=pd.DataFrame(rowvalue) # col_desc= [cdesc[-1] for cdesc in col_desc] # print(f'coldesc getdatarow:{col_desc}') # # print(df) # df.columns=col_desc # # print(df) # # print('lendatadict') # print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}") # datadict=df.to_dict('list') # # print(datadict, type(datadict)) # return datadict, reportsubtitle # else: # return datadict, reportsubtitle # def validate_date(self, date_text): # if date_text is None: # return None # try: # return datetime.datetime.strptime(date_text, '%Y-%m-%d').date() # except ValueError: # return None # # raise ValueError("Incorrect data format, should be YYYY-MM-DD") if __name__ == '__main__': pass