dasaproject/SO_to_Inv/CustomerQuery.py
2024-10-10 04:41:59 +07:00

206 lines
9.1 KiB
Python

import xml.etree.ElementTree as ET
import win32com.client
import xmltodict
import pprint
import datetime
import pandas as pd
from datetime import date
import timeit
import os
import json
class CustomerQuery:
def __init__(self, **kwargs) -> None:
print(f'kwargs:{kwargs}')
# print(args)
self.CustomerPriceLevelNames = []
self.SPPriceLevelNames = []
self.CustomerFullNames = []
self.item_inventory_path = "ItemInventory"
self._filename = "CustomerList.xlsx"
# self._df_customer = pd.read_excel(os.path.join(os.getcwd(), self.item_inventory_path, self._filename), usecols=['FullName', 'PriceLevelName', 'Special Cust'],)
# print(self._df_customer)
# print(type(self._df_customer.loc[(self._df_customer['FullName']=="ECO:0:ECO-002") & (self._df_customer['PriceLevelName']=="T 202202")].values.tolist()[0][2]))
self.MaxReturned = kwargs['MaxReturned'] if 'MaxReturned' in kwargs else None
if isinstance(self.MaxReturned, int):
self.MaxReturned = str(self.MaxReturned)
print(f'{self.MaxReturned=}')
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else []
self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
self.response_string = None
def get_customer_pricelevel_list(self):
QBXML = ET.fromstring(self.response_string)
CustomerRets = QBXML.findall('.//CustomerRet')
# print(f'CustomerRets:{CustomerRets}')
PriceLevelName = None
SP_PriceLevelName = None
Addr1, Addr2, Addr3, Addr4, Addr5 = None, None, None, None, None
Addr1s, Addr2s, Addr3s, Addr4s, Addr5s = [], [], [], [], []
for CustomerRet in CustomerRets:
CustomerFullName = None
PriceLevelName = None
SP_PriceLevelName = None
Addr1, Addr2, Addr3, Addr4, Addr5 = None, None, None, None, None
billaddressRet=CustomerRet.find('BillAddress')
# print(CustomerRet.find('.//Addr1')) #can
# print(CustomerRet.find('./Addr1')) #None
# print(CustomerRet.find('BillAddress/Addr1')) #can
if billaddressRet:
if billaddressRet.find('Addr1')!=None: Addr1 = billaddressRet.find('Addr1').text
if billaddressRet.find('Addr2')!=None: Addr2 = billaddressRet.find('Addr2').text
if billaddressRet.find('Addr3')!=None: Addr3 = billaddressRet.find('Addr3').text
if billaddressRet.find('Addr4')!=None: Addr4 = billaddressRet.find('Addr4').text
if billaddressRet.find('Addr5')!=None: Addr5 = billaddressRet.find('Addr5').text
# print(Addr1, Addr2,)
Addr1s.append(Addr1)
Addr2s.append(Addr2)
Addr3s.append(Addr3)
Addr4s.append(Addr4)
Addr5s.append(Addr5)
CustomerFullName = CustomerRet.find('FullName').text
PriceLevelName = CustomerRet.find('.//PriceLevelRef')
if PriceLevelName:
PriceLevelName = PriceLevelName.find("FullName").text
# print(f'PriceLevelName:{PriceLevelName}')
DataExtRets = CustomerRet.findall('.//DataExtRet')
SP_PriceLevelName = None
if len(DataExtRets)>0:
for DataExtRet in DataExtRets:
DataExtName = DataExtRet.find('DataExtName').text
DataExtValue = DataExtRet.find('DataExtValue').text
if DataExtName.lower() == 'special cust'.lower():
SP_PriceLevelName = DataExtValue.upper()
break
self.CustomerPriceLevelNames.append(PriceLevelName)
self.SPPriceLevelNames.append(SP_PriceLevelName)
self.CustomerFullNames.append(CustomerFullName)
# print(self.CustomerFullNames, self.CustomerPriceLevelNames, self.SPPriceLevelNames)
Customer = {}
Customer['FullName']=self.CustomerFullNames
Customer['PriceLevelName']=self.CustomerPriceLevelNames
Customer['SPName']= self.SPPriceLevelNames
Customer['Addr1']=Addr1s
Customer['Addr2']=Addr2s
Customer['Addr3']=Addr3s
Customer['Addr4']=Addr4s
Customer['Addr5']=Addr5s
# print(Customer)
_df = pd.DataFrame.from_dict(Customer)
# print(_df)
_df.to_excel(os.path.join(os.getcwd(), self.item_inventory_path, self._filename), index=False)
CustomerDict = _df.to_dict('records')
return CustomerDict
def create_QBXML(self):
return self.create_customerquery_QBXML()
def create_customerquery_QBXML(self ):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
# QBXMLMsgsRq.set("onError", "continueOnError")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
CustomerQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "CustomerQueryRq","\n " )
# FullName = self.create_sub_element(ET, CustomerQueryRq, "FullName", self.FullName, 6)
if self.MaxReturned is not None:
print("masuk maxreturn")
MaxReturned = self.create_sub_element(ET, CustomerQueryRq, "MaxReturned", self.MaxReturned, 4)
IncludeRetElement = ['FullName', 'PriceLevelRef', 'BillAddress', 'DataExtRet']
# IncludeRetElement += [, 'BillAddress1', 'BillAddress2', 'BillAddress3', 'BillAddres4', 'BillAddress5',]
for x in IncludeRetElement:
IncludeRetElement = self.create_sub_element(ET, CustomerQueryRq, "IncludeRetElement", x, 4)
OwnerID = self.create_sub_element(ET, CustomerQueryRq, "OwnerID", "0", 6)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
print(f'create_customer_QBXML->qbxml_query: {qbxml_query}')
self.response_string=self.connect_to_quickbooks(qbxml_query)
return self.response_string
def to_json(self):
start = timeit.default_timer()
js_data=None
# print(self.create_QBXML())
# self.response_string = self.connect_to_quickbooks(self.create_QBXML())
# ret, msg = self.status_ok(self.response_string)
ret = True
if ret:
df = pd.DataFrame.from_dict(self.get_customer_pricelevel_list())
print(df)
if len(df)>0:
js_data = json.dumps(df.to_dict("records"))
else:
False, 'There is no data(df==0)'
else:
return False, msg
print("The difference of time is :", timeit.default_timer() - start)
return True, js_data
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
if type(attrib) is not dict:
attrib = {}
ele = ET.SubElement(parentNode, thisNode)
for x in attrib:
ele.set(x, attrib[x])
ele.text = text
tail = "\n"
for x in range(whiteSpace):
tail = tail + " "
ele.tail = tail
return ele
def connect_to_quickbooks(self, qbxml_query):
# Connect to Quickbooks
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
self.response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
# print (f'self.response_string:{self.response_string}')
return self.response_string
def status_ok(self, QBXML): #for CustomerRs
tree = ET.fromstring(QBXML)
GSRQRs = tree.find(".//CustomerRs")
# print(f"GSRQRs:{GSRQRs}")
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'status={status}')
if 'OK' in status:
return True, status_code
else:
return False, status_code
if __name__ == '__main__':
starttime = timeit.default_timer()
# ini=SalesOrderQuery(FullName= '999 HPL', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'IsManuallyClosed', 'IsFullyInvoiced'])
ini=CustomerQuery()
iya = ini.create_customerquery_QBXML()
# print(iya)
Customer= ini.get_customer_pricelevel_list()
print(f'{Customer = }')
res, jsondt = ini.to_json()
print(f'{jsondt=}')
print("The time difference is :", timeit.default_timer() - starttime)