mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-10 16:52:38 +07:00
144 lines
6.2 KiB
Python
144 lines
6.2 KiB
Python
import xml.etree.ElementTree as ET
|
|
import win32com.client
|
|
import xmltodict
|
|
import pprint
|
|
import datetime
|
|
import pandas as pd
|
|
from datetime import date
|
|
import timeit
|
|
import os
|
|
|
|
class CustomerQuery:
|
|
def __init__(self, **kwargs) -> None:
|
|
# print(f'kwargs:{kwargs}')
|
|
# print(args)
|
|
self.CustomerPriceLevelNames = []
|
|
self.SPPriceLevelNames = []
|
|
self.CustomerFullNames = []
|
|
self.item_inventory_path = "ItemInventory"
|
|
self._filename = "CustomerList.xlsx"
|
|
# self._df_customer = pd.read_excel(os.path.join(os.getcwd(), self.item_inventory_path, self._filename), usecols=['FullName', 'PriceLevelName', 'Special Cust'],)
|
|
# print(self._df_customer)
|
|
# print(type(self._df_customer.loc[(self._df_customer['FullName']=="ECO:0:ECO-002") & (self._df_customer['PriceLevelName']=="T 202202")].values.tolist()[0][2]))
|
|
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else []
|
|
self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None
|
|
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
|
|
|
|
def get_customer_pricelevel_list(self, response_string):
|
|
QBXML = ET.fromstring(response_string)
|
|
CustomerRets = QBXML.findall('.//CustomerRet')
|
|
# print(f'CustomerRets:{CustomerRets}')
|
|
|
|
for CustomerRet in CustomerRets:
|
|
CustomerFullName = None
|
|
PriceLevelName = None
|
|
SP_PriceLevelName = None
|
|
CustomerFullName = CustomerRet.find('FullName').text
|
|
PriceLevelName = CustomerRet.find('.//PriceLevelRef')
|
|
if PriceLevelName:
|
|
PriceLevelName = PriceLevelName.find("FullName").text
|
|
# print(f'PriceLevelName:{PriceLevelName}')
|
|
DataExtRets = CustomerRet.findall('.//DataExtRet')
|
|
SP_PriceLevelName = None
|
|
if len(DataExtRets)>0:
|
|
for DataExtRet in DataExtRets:
|
|
DataExtName = DataExtRet.find('DataExtName').text
|
|
DataExtValue = DataExtRet.find('DataExtValue').text
|
|
if DataExtName.lower() == 'special cust'.lower():
|
|
SP_PriceLevelName = DataExtValue.upper()
|
|
break
|
|
self.CustomerPriceLevelNames.append(PriceLevelName)
|
|
self.SPPriceLevelNames.append(SP_PriceLevelName)
|
|
self.CustomerFullNames.append(CustomerFullName)
|
|
# print(self.CustomerFullNames, self.CustomerPriceLevelNames, self.SPPriceLevelNames)
|
|
Customer = {}
|
|
Customer['FullName']=self.CustomerFullNames
|
|
Customer['PriceLevelName']=self.CustomerPriceLevelNames
|
|
Customer['SPName']= self.SPPriceLevelNames
|
|
# print(Customer)
|
|
_df = pd.DataFrame.from_dict(Customer)
|
|
print(_df)
|
|
_df.to_excel(os.path.join(os.getcwd(), self.item_inventory_path, self._filename), index=False)
|
|
return PriceLevelName, SP_PriceLevelName
|
|
|
|
def create_customerquery_QBXML(self ):
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
# QBXMLMsgsRq.set("onError", "continueOnError")
|
|
QBXMLMsgsRq.set("onError", "stopOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
CustomerQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "CustomerQueryRq","\n " )
|
|
# FullName = self.create_sub_element(ET, CustomerQueryRq, "FullName", self.FullName, 6)
|
|
IncludeRetElement = ['FullName', 'PriceLevelRef', 'DataExtRet']
|
|
for x in IncludeRetElement:
|
|
IncludeRetElement = self.create_sub_element(ET, CustomerQueryRq, "IncludeRetElement", x, 4)
|
|
OwnerID = self.create_sub_element(ET, CustomerQueryRq, "OwnerID", "0", 6)
|
|
mydata = ET.tostring(root, encoding = "unicode")
|
|
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
print(f'create_customer_QBXML->qbxml_query: {qbxml_query}')
|
|
response_string=self.connect_to_quickbooks(qbxml_query)
|
|
|
|
return response_string
|
|
|
|
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
|
|
if type(attrib) is not dict:
|
|
attrib = {}
|
|
ele = ET.SubElement(parentNode, thisNode)
|
|
for x in attrib:
|
|
ele.set(x, attrib[x])
|
|
ele.text = text
|
|
tail = "\n"
|
|
for x in range(whiteSpace):
|
|
tail = tail + " "
|
|
ele.tail = tail
|
|
return ele
|
|
|
|
def connect_to_quickbooks(self, qbxml_query):
|
|
# Connect to Quickbooks
|
|
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
sessionManager.OpenConnection('', 'DASA2')
|
|
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
|
|
ticket = sessionManager.BeginSession("", 2)
|
|
|
|
# Send query and receive response
|
|
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
|
|
|
|
# Disconnect from Quickbooks
|
|
sessionManager.EndSession(ticket) # Close the company file
|
|
sessionManager.CloseConnection() # Close the connection
|
|
# print (f'response_string:{response_string}')
|
|
return response_string
|
|
|
|
def status_ok(self, QBXML): #for CustomerRs
|
|
|
|
tree = ET.fromstring(QBXML)
|
|
|
|
GSRQRs = tree.find(".//CustomerRs")
|
|
# print(f"GSRQRs:{GSRQRs}")
|
|
status_code = GSRQRs.attrib #.get('statusCode')
|
|
# print(GSRQRs.attrib)
|
|
# print(GSRQRs.attrib['statusCode'])
|
|
status=GSRQRs.attrib.get('statusMessage')
|
|
|
|
print(f'status={status}')
|
|
if 'OK' in status:
|
|
return True, status_code
|
|
else:
|
|
return False, status_code
|
|
|
|
if __name__ == '__main__':
|
|
starttime = timeit.default_timer()
|
|
# ini=SalesOrderQuery(FullName= '999 HPL', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'IsManuallyClosed', 'IsFullyInvoiced'])
|
|
|
|
ini=CustomerQuery()
|
|
iya = ini.create_customerquery_QBXML()
|
|
# print(iya)
|
|
ini.get_customer_pricelevel_list(iya)
|
|
|
|
print("The time difference is :", timeit.default_timer() - starttime) |