dasaproject/ItemInventoryQuery.py

252 lines
12 KiB
Python

import xml.etree.ElementTree as ET
import win32com.client
import pandas as pd
from datetime import date, datetime
import timeit
import os
import json
class ItemInventoryQuery:
def __init__(self, *args, **kwargs) -> None:
# print(f'kwargs:{kwargs}')
# print(args)
self.filename=None
if len(args)>0:
self.filename = args[0]
# print(f'args:{args}')
# print(self.filename)
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else None
if not isinstance(self.IncludeRetElement, list):
self.IncludeRetElement = [self.IncludeRetElement]
self.OwnerID = kwargs['OwnerID'] if 'OwnerID' in kwargs else "0"
self.MaxReturned = kwargs['MaxReturned'] if 'MaxReturned' in kwargs else None
if isinstance(self.MaxReturned, int):
self.MaxReturned = str(self.MaxReturned)
self.NameRangeFilter = kwargs['NameRangeFilter'] if 'NameRangeFilter' in kwargs else []
self.DN = kwargs['DN'] if 'DN' in kwargs else {}
self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None
self.DateMacro = None
if 'DateMacro' in kwargs:
if kwargs['DateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
self.DateMacro = kwargs['DateMacro']
self.FromTxnDate = self.validate_date(kwargs['FromTxnDate']) if 'FromTxnDate' in kwargs else None
self.ToTxnDate = self.validate_date(kwargs['ToTxnDate']) if 'ToTxnDate' in kwargs else None
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
# print(self.DateMacro, self.TxnDateRangeFilter, self.FromTxnDate, self.ToTxnDate)
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
if type(attrib) is not dict:
attrib = {}
ele = ET.SubElement(parentNode, thisNode)
for x in attrib:
ele.set(x, attrib[x])
ele.text = text
tail = "\n"
for x in range(whiteSpace):
tail = tail + " "
ele.tail = tail
return ele
def create_QBXML(self):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
# QBXMLMsgsRq.set("onError", "continueOnError")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
ItemInventoryQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "ItemInventoryQueryRq","\n " )
# print(f'{self.MaxReturned = }')
if self.MaxReturned is not None:
MaxReturned = self.create_sub_element(ET, ItemInventoryQueryRq, "MaxReturned", self.MaxReturned, 4)
if len(self.NameRangeFilter)==2:
print(self.NameRangeFilter)
NameRangeFilter = self.create_sub_element(ET, ItemInventoryQueryRq, "NameRangeFilter", "\n ")
FromName = self.create_sub_element(ET, NameRangeFilter, "FromName", self.NameRangeFilter[0])
ToName = self.create_sub_element(ET, NameRangeFilter, "ToName", self.NameRangeFilter[1])
if self.IncludeRetElement:
for x in self.IncludeRetElement:
IncludeRetElement = self.create_sub_element(ET, ItemInventoryQueryRq, "IncludeRetElement", x, 4)
# print(self.OwnerID)
if self.OwnerID is not None:
OwnerID = self.create_sub_element(ET, ItemInventoryQueryRq, "OwnerID", self.OwnerID, 4)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
print(f'create_QBXML->qbxml_query: {qbxml_query}')
return qbxml_query
def connect_to_quickbooks(self, qbxml_query):
# Connect to Quickbooks
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
# sessionManager.OpenConnection('', 'DASA')
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
# print(enumfodnc)
# print(enumfodnc.qbFileOpenDoNotCare)
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
# print (f'response_string: {response_string}')
return response_string
def status_ok(self, QBXML): #for ItemInventoryQueryRs
tree = ET.fromstring(QBXML)
GSRQRs = tree.find(".//ItemInventoryQueryRs")
# print(f"GSRQRs:{GSRQRs}")
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'ItemInventoryQuery->status={status}')
# print('OK' in status)
if 'OK' in status:
return True, status_code
else:
return False, status_code
def get_data(self, response_string):
tree = ET.fromstring(response_string)
data = tree.findall(".//ItemInventoryRet")
NameFromTaco = []
FullName = []
UnitOfMeasure = []
print(f'get_data->response_string:{response_string}')
# print(data)
if 'DataExtRet' in self.IncludeRetElement:
for dt in data:
# print(dt.find('FullName').text, )
for dtextret in dt.findall('DataExtRet'):
if dtextret.findtext('DataExtName')=='NameFromTaco':
FullName.append(dt.find('FullName').text.strip())
NameFromTaco.append(dtextret.findtext('DataExtValue').strip())
# print(f"{dt.find('FullName').text}->{dtextret.find('DataExtName').text} : {dtextret.findtext('DataExtValue')}")
# print()
if len(FullName)>0 and len(NameFromTaco)>0 and len(FullName) == len(NameFromTaco):
# print(f'ItemInventoryQuery->exact len:{len(FullName)}')
return {'FullName': FullName, 'NameFromTaco': NameFromTaco}
else:
print(f'ItemInventoryQuery->Not Exact Len:Fullname={len(FullName)}; NameFromTaco={len(NameFromTaco)}')
return {}
else:
# fullnames = tree.findall(".//FullName")
itemInventoryRet = tree.findall(".//ItemInventoryRet")
for itemInventory in itemInventoryRet:
# for fullname in fullnames:
fullname = itemInventory.find("FullName")
uom = itemInventory.find("UnitOfMeasureSetRef")
if len(fullname.text.split(":"))==3:
FullName.append(fullname.text)
print(f'{fullname.text = }')
UnitOfMeasure.append(uom[1].text)
print(f'{uom.text = }')
print(fullname.text.strip(), type(fullname))
if len(FullName)>0:
return {'FullName': FullName, 'UnitOfMeasureSetRef':UnitOfMeasure}
else:
return {}
def to_excel(self, filename:str=''):
start = timeit.default_timer()
# print(f'to_excel filename:{self.filename}')
if filename == '':
print("filename is none")
if self.filename != None:
print("self.filename not none")
filename = self.filename
else:
return False, 'Please fill excel filename.'
# print(filename, type(filename))
if not filename.endswith('.xlsx'):
return False, 'filename has to be excel file(.xlsx)'
# print(self.create_QBXML())
response_string = self.connect_to_quickbooks(self.create_QBXML())
ret, msg = self.status_ok(response_string)
if ret:
df = pd.DataFrame.from_dict(self.get_data(response_string))
print(df)
if len(df)>0:
df.to_excel(filename, index=False)
modtime = datetime.fromtimestamp(os.path.getmtime(filename))
print(f"modified Time:{modtime}")
else:
False, 'There is no data(df==0)'
else:
return False, msg
print("The difference of time is :", timeit.default_timer() - start)
return True, f"It takes {format(timeit.default_timer() - start, '.2f')} seconds to update {filename}"
def to_json(self):
start = timeit.default_timer()
js_data=None
# print(self.create_QBXML())
response_string = self.connect_to_quickbooks(self.create_QBXML())
print(f'{response_string = }')
ret, msg = self.status_ok(response_string)
if ret:
print(f'{ret = }')
df = pd.DataFrame.from_dict(self.get_data(response_string))
print(df)
if len(df)>0:
js_data = json.dumps(df.to_dict("list"))
else:
False, 'There is no data(df==0)'
else:
return False, msg
print("The difference of time is :", timeit.default_timer() - start)
return True, js_data
start = timeit.default_timer()
if __name__ == "__main__":
print('start ItemIventoryQuery')
# ini= ItemInventoryQuery(IncludeRetElement='FullName', OwnerID = "0")
for x in range(0, 1):
# ini= ItemInventoryQuery(IncludeRetElement=['FullName', 'DataExtRet'] , OwnerID = str(x), MaxReturned=None)
# ini= ItemInventoryQuery( OwnerID = str(x), MaxReturned="10", NameRangeFilter=["TSHT:TS WOODGRAIN:TS-252", "TSHT:TS WOODGRAIN:TS-252"])
# ini= ItemInventoryQuery( OwnerID = str(x), MaxReturned="10", NameRangeFilter=["TSHT:TS LUXURY:TS-L-252", "TSHT:TS LUXURY:TS-L-252"])
# ini= ItemInventoryQuery( OwnerID = str(x), MaxReturned="10")
# ini= ItemInventoryQuery( OwnerID ="0", MaxReturned=None, IncludeRetElement=['FullName', 'DataExtRet'])
ini= ItemInventoryQuery( IncludeRetElement=['FullName'])
# ini= ItemInventoryQuery( OwnerID = str(x))
# ini.to_excel('ItemInventory\ItemInventory_FromQB.xlsx')
# print(f'{ini.create_QBXML() = }')
qbxml = ini.create_QBXML()
status, msg = ini.to_json()
print(f'{msg = }')
itu = ini.connect_to_quickbooks(ini.create_QBXML())
# print(itu)
ret, msg = ini.status_ok(itu)
if ret:
print("YEAH")
# df = pd.DataFrame.from_dict(ini.get_data(itu))
# dfdict = df.to_dict('list')
# print(json.dumps(dfdict))
# print(df.to_json(orient="values"))
# print(f'{dfdict = }')
# print(df)
# df.to_excel('ItemInventory\ItemInventory_FromQB.xlsx', index=False)
# modtime = datetime.fromtimestamp(os.path.getmtime('ItemInventory\ItemInventory_FromQB.xlsx'))
# print(f"modified Time:{modtime}")
break
# print(ini.status_ok(itu))
# print(ini.create_QBXML())
print("The difference of time is :",
timeit.default_timer() - start)