import xml.etree.ElementTree as ET import win32com.client import xmltodict import pprint import datetime import pandas as pd from datetime import date import timeit import os import json class CustomerQuery: def __init__(self, **kwargs) -> None: print(f'kwargs:{kwargs}') # print(args) self.CustomerPriceLevelNames = [] self.SPPriceLevelNames = [] self.CustomerFullNames = [] self.item_inventory_path = "ItemInventory" self._filename = "CustomerList.xlsx" # self._df_customer = pd.read_excel(os.path.join(os.getcwd(), self.item_inventory_path, self._filename), usecols=['FullName', 'PriceLevelName', 'Special Cust'],) # print(self._df_customer) # print(type(self._df_customer.loc[(self._df_customer['FullName']=="ECO:0:ECO-002") & (self._df_customer['PriceLevelName']=="T 202202")].values.tolist()[0][2])) self.MaxReturned = kwargs['MaxReturned'] if 'MaxReturned' in kwargs else None if isinstance(self.MaxReturned, int): self.MaxReturned = str(self.MaxReturned) print(f'{self.MaxReturned=}') self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else [] self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None self.response_string = None def get_customer_pricelevel_list(self): QBXML = ET.fromstring(self.response_string) CustomerRets = QBXML.findall('.//CustomerRet') # print(f'CustomerRets:{CustomerRets}') PriceLevelName = None SP_PriceLevelName = None Addr1, Addr2, Addr3, Addr4, Addr5 = None, None, None, None, None Addr1s, Addr2s, Addr3s, Addr4s, Addr5s = [], [], [], [], [] for CustomerRet in CustomerRets: CustomerFullName = None PriceLevelName = None SP_PriceLevelName = None Addr1, Addr2, Addr3, Addr4, Addr5 = None, None, None, None, None billaddressRet=CustomerRet.find('BillAddress') # print(CustomerRet.find('.//Addr1')) #can # print(CustomerRet.find('./Addr1')) #None # print(CustomerRet.find('BillAddress/Addr1')) #can if billaddressRet: if billaddressRet.find('Addr1')!=None: Addr1 = billaddressRet.find('Addr1').text if billaddressRet.find('Addr2')!=None: Addr2 = billaddressRet.find('Addr2').text if billaddressRet.find('Addr3')!=None: Addr3 = billaddressRet.find('Addr3').text if billaddressRet.find('Addr4')!=None: Addr4 = billaddressRet.find('Addr4').text if billaddressRet.find('Addr5')!=None: Addr5 = billaddressRet.find('Addr5').text # print(Addr1, Addr2,) Addr1s.append(Addr1) Addr2s.append(Addr2) Addr3s.append(Addr3) Addr4s.append(Addr4) Addr5s.append(Addr5) CustomerFullName = CustomerRet.find('FullName').text PriceLevelName = CustomerRet.find('.//PriceLevelRef') if PriceLevelName: PriceLevelName = PriceLevelName.find("FullName").text # print(f'PriceLevelName:{PriceLevelName}') DataExtRets = CustomerRet.findall('.//DataExtRet') SP_PriceLevelName = None if len(DataExtRets)>0: for DataExtRet in DataExtRets: DataExtName = DataExtRet.find('DataExtName').text DataExtValue = DataExtRet.find('DataExtValue').text if DataExtName.lower() == 'special cust'.lower(): SP_PriceLevelName = DataExtValue.upper() break self.CustomerPriceLevelNames.append(PriceLevelName) self.SPPriceLevelNames.append(SP_PriceLevelName) self.CustomerFullNames.append(CustomerFullName) # print(self.CustomerFullNames, self.CustomerPriceLevelNames, self.SPPriceLevelNames) Customer = {} Customer['FullName']=self.CustomerFullNames Customer['PriceLevelName']=self.CustomerPriceLevelNames Customer['SPName']= self.SPPriceLevelNames Customer['Addr1']=Addr1s Customer['Addr2']=Addr2s Customer['Addr3']=Addr3s Customer['Addr4']=Addr4s Customer['Addr5']=Addr5s # print(Customer) _df = pd.DataFrame.from_dict(Customer) # print(_df) _df.to_excel(os.path.join(os.getcwd(), self.item_inventory_path, self._filename), index=False) CustomerDict = _df.to_dict('records') return CustomerDict def create_QBXML(self): return self.create_customerquery_QBXML() def create_customerquery_QBXML(self ): root = ET.Element("QBXML") root.tail = "\n" root.text = "\n " QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq") # QBXMLMsgsRq.set("onError", "continueOnError") QBXMLMsgsRq.set("onError", "stopOnError") QBXMLMsgsRq.tail = "\n" QBXMLMsgsRq.text = "\n " CustomerQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "CustomerQueryRq","\n " ) # FullName = self.create_sub_element(ET, CustomerQueryRq, "FullName", self.FullName, 6) if self.MaxReturned is not None: print("masuk maxreturn") MaxReturned = self.create_sub_element(ET, CustomerQueryRq, "MaxReturned", self.MaxReturned, 4) IncludeRetElement = ['FullName', 'PriceLevelRef', 'BillAddress', 'DataExtRet'] # IncludeRetElement += [, 'BillAddress1', 'BillAddress2', 'BillAddress3', 'BillAddres4', 'BillAddress5',] for x in IncludeRetElement: IncludeRetElement = self.create_sub_element(ET, CustomerQueryRq, "IncludeRetElement", x, 4) OwnerID = self.create_sub_element(ET, CustomerQueryRq, "OwnerID", "0", 6) mydata = ET.tostring(root, encoding = "unicode") qbxml_query = """\n""" qbxml_query = qbxml_query + """""" qbxml_query = qbxml_query + "\n" + mydata print(f'create_customer_QBXML->qbxml_query: {qbxml_query}') self.response_string=self.connect_to_quickbooks(qbxml_query) return self.response_string def to_json(self): start = timeit.default_timer() js_data=None # print(self.create_QBXML()) # self.response_string = self.connect_to_quickbooks(self.create_QBXML()) # ret, msg = self.status_ok(self.response_string) ret = True if ret: df = pd.DataFrame.from_dict(self.get_customer_pricelevel_list()) print(df) if len(df)>0: js_data = json.dumps(df.to_dict("records")) else: False, 'There is no data(df==0)' else: return False, msg print("The difference of time is :", timeit.default_timer() - start) return True, js_data def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None): if type(attrib) is not dict: attrib = {} ele = ET.SubElement(parentNode, thisNode) for x in attrib: ele.set(x, attrib[x]) ele.text = text tail = "\n" for x in range(whiteSpace): tail = tail + " " ele.tail = tail return ele def connect_to_quickbooks(self, qbxml_query): # Connect to Quickbooks sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor") sessionManager.OpenConnection('', 'DASA2') # ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2) ticket = sessionManager.BeginSession("", 2) # Send query and receive response self.response_string = sessionManager.ProcessRequest(ticket, qbxml_query) # Disconnect from Quickbooks sessionManager.EndSession(ticket) # Close the company file sessionManager.CloseConnection() # Close the connection # print (f'self.response_string:{self.response_string}') return self.response_string def status_ok(self, QBXML): #for CustomerRs tree = ET.fromstring(QBXML) GSRQRs = tree.find(".//CustomerRs") # print(f"GSRQRs:{GSRQRs}") status_code = GSRQRs.attrib #.get('statusCode') # print(GSRQRs.attrib) # print(GSRQRs.attrib['statusCode']) status=GSRQRs.attrib.get('statusMessage') print(f'status={status}') if 'OK' in status: return True, status_code else: return False, status_code if __name__ == '__main__': starttime = timeit.default_timer() # ini=SalesOrderQuery(FullName= '999 HPL', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'IsManuallyClosed', 'IsFullyInvoiced']) ini=CustomerQuery() iya = ini.create_customerquery_QBXML() # print(iya) Customer= ini.get_customer_pricelevel_list() print(f'{Customer = }') res, jsondt = ini.to_json() print(f'{jsondt=}') print("The time difference is :", timeit.default_timer() - starttime)