mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-08 18:42:37 +07:00
252 lines
12 KiB
Python
252 lines
12 KiB
Python
import xml.etree.ElementTree as ET
|
|
import win32com.client
|
|
import pandas as pd
|
|
from datetime import date, datetime
|
|
import timeit
|
|
import os
|
|
import json
|
|
|
|
class ItemInventoryQuery:
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
# print(f'kwargs:{kwargs}')
|
|
# print(args)
|
|
self.filename=None
|
|
if len(args)>0:
|
|
self.filename = args[0]
|
|
# print(f'args:{args}')
|
|
# print(self.filename)
|
|
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else None
|
|
if not isinstance(self.IncludeRetElement, list):
|
|
self.IncludeRetElement = [self.IncludeRetElement]
|
|
self.OwnerID = kwargs['OwnerID'] if 'OwnerID' in kwargs else "0"
|
|
self.MaxReturned = kwargs['MaxReturned'] if 'MaxReturned' in kwargs else None
|
|
if isinstance(self.MaxReturned, int):
|
|
self.MaxReturned = str(self.MaxReturned)
|
|
self.NameRangeFilter = kwargs['NameRangeFilter'] if 'NameRangeFilter' in kwargs else []
|
|
self.DN = kwargs['DN'] if 'DN' in kwargs else {}
|
|
self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None
|
|
self.DateMacro = None
|
|
if 'DateMacro' in kwargs:
|
|
if kwargs['DateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
|
|
self.DateMacro = kwargs['DateMacro']
|
|
self.FromTxnDate = self.validate_date(kwargs['FromTxnDate']) if 'FromTxnDate' in kwargs else None
|
|
self.ToTxnDate = self.validate_date(kwargs['ToTxnDate']) if 'ToTxnDate' in kwargs else None
|
|
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
|
|
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
|
|
# print(self.DateMacro, self.TxnDateRangeFilter, self.FromTxnDate, self.ToTxnDate)
|
|
|
|
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
|
|
if type(attrib) is not dict:
|
|
attrib = {}
|
|
ele = ET.SubElement(parentNode, thisNode)
|
|
for x in attrib:
|
|
ele.set(x, attrib[x])
|
|
ele.text = text
|
|
tail = "\n"
|
|
for x in range(whiteSpace):
|
|
tail = tail + " "
|
|
ele.tail = tail
|
|
return ele
|
|
|
|
def create_QBXML(self):
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
# QBXMLMsgsRq.set("onError", "continueOnError")
|
|
QBXMLMsgsRq.set("onError", "stopOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
ItemInventoryQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "ItemInventoryQueryRq","\n " )
|
|
# print(f'{self.MaxReturned = }')
|
|
if self.MaxReturned is not None:
|
|
MaxReturned = self.create_sub_element(ET, ItemInventoryQueryRq, "MaxReturned", self.MaxReturned, 4)
|
|
|
|
if len(self.NameRangeFilter)==2:
|
|
print(self.NameRangeFilter)
|
|
NameRangeFilter = self.create_sub_element(ET, ItemInventoryQueryRq, "NameRangeFilter", "\n ")
|
|
FromName = self.create_sub_element(ET, NameRangeFilter, "FromName", self.NameRangeFilter[0])
|
|
ToName = self.create_sub_element(ET, NameRangeFilter, "ToName", self.NameRangeFilter[1])
|
|
|
|
if self.IncludeRetElement:
|
|
for x in self.IncludeRetElement:
|
|
IncludeRetElement = self.create_sub_element(ET, ItemInventoryQueryRq, "IncludeRetElement", x, 4)
|
|
# print(self.OwnerID)
|
|
if self.OwnerID is not None:
|
|
OwnerID = self.create_sub_element(ET, ItemInventoryQueryRq, "OwnerID", self.OwnerID, 4)
|
|
mydata = ET.tostring(root, encoding = "unicode")
|
|
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
print(f'create_QBXML->qbxml_query: {qbxml_query}')
|
|
return qbxml_query
|
|
|
|
def connect_to_quickbooks(self, qbxml_query):
|
|
# Connect to Quickbooks
|
|
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
# sessionManager.OpenConnection('', 'DASA')
|
|
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
|
|
# print(enumfodnc)
|
|
# print(enumfodnc.qbFileOpenDoNotCare)
|
|
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
sessionManager.OpenConnection('', 'DASA2')
|
|
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
|
|
|
|
ticket = sessionManager.BeginSession("", 2)
|
|
|
|
# Send query and receive response
|
|
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
|
|
|
|
# Disconnect from Quickbooks
|
|
sessionManager.EndSession(ticket) # Close the company file
|
|
sessionManager.CloseConnection() # Close the connection
|
|
# print (f'response_string: {response_string}')
|
|
return response_string
|
|
|
|
def status_ok(self, QBXML): #for ItemInventoryQueryRs
|
|
tree = ET.fromstring(QBXML)
|
|
GSRQRs = tree.find(".//ItemInventoryQueryRs")
|
|
# print(f"GSRQRs:{GSRQRs}")
|
|
status_code = GSRQRs.attrib #.get('statusCode')
|
|
# print(GSRQRs.attrib)
|
|
# print(GSRQRs.attrib['statusCode'])
|
|
status=GSRQRs.attrib.get('statusMessage')
|
|
|
|
print(f'ItemInventoryQuery->status={status}')
|
|
# print('OK' in status)
|
|
if 'OK' in status:
|
|
return True, status_code
|
|
else:
|
|
return False, status_code
|
|
|
|
def get_data(self, response_string):
|
|
tree = ET.fromstring(response_string)
|
|
data = tree.findall(".//ItemInventoryRet")
|
|
NameFromTaco = []
|
|
FullName = []
|
|
UnitOfMeasure = []
|
|
print(f'get_data->response_string:{response_string}')
|
|
# print(data)
|
|
if 'DataExtRet' in self.IncludeRetElement:
|
|
for dt in data:
|
|
# print(dt.find('FullName').text, )
|
|
for dtextret in dt.findall('DataExtRet'):
|
|
|
|
if dtextret.findtext('DataExtName')=='NameFromTaco':
|
|
FullName.append(dt.find('FullName').text.strip())
|
|
NameFromTaco.append(dtextret.findtext('DataExtValue').strip())
|
|
# print(f"{dt.find('FullName').text}->{dtextret.find('DataExtName').text} : {dtextret.findtext('DataExtValue')}")
|
|
# print()
|
|
if len(FullName)>0 and len(NameFromTaco)>0 and len(FullName) == len(NameFromTaco):
|
|
# print(f'ItemInventoryQuery->exact len:{len(FullName)}')
|
|
return {'FullName': FullName, 'NameFromTaco': NameFromTaco}
|
|
else:
|
|
print(f'ItemInventoryQuery->Not Exact Len:Fullname={len(FullName)}; NameFromTaco={len(NameFromTaco)}')
|
|
return {}
|
|
else:
|
|
# fullnames = tree.findall(".//FullName")
|
|
itemInventoryRet = tree.findall(".//ItemInventoryRet")
|
|
for itemInventory in itemInventoryRet:
|
|
# for fullname in fullnames:
|
|
fullname = itemInventory.find("FullName")
|
|
uom = itemInventory.find("UnitOfMeasureSetRef")
|
|
if len(fullname.text.split(":"))==3:
|
|
FullName.append(fullname.text)
|
|
print(f'{fullname.text = }')
|
|
UnitOfMeasure.append(uom[1].text)
|
|
print(f'{uom.text = }')
|
|
print(fullname.text.strip(), type(fullname))
|
|
if len(FullName)>0:
|
|
return {'FullName': FullName, 'UnitOfMeasureSetRef':UnitOfMeasure}
|
|
else:
|
|
return {}
|
|
def to_excel(self, filename:str=''):
|
|
start = timeit.default_timer()
|
|
# print(f'to_excel filename:{self.filename}')
|
|
if filename == '':
|
|
print("filename is none")
|
|
if self.filename != None:
|
|
print("self.filename not none")
|
|
filename = self.filename
|
|
else:
|
|
return False, 'Please fill excel filename.'
|
|
# print(filename, type(filename))
|
|
if not filename.endswith('.xlsx'):
|
|
return False, 'filename has to be excel file(.xlsx)'
|
|
# print(self.create_QBXML())
|
|
response_string = self.connect_to_quickbooks(self.create_QBXML())
|
|
|
|
ret, msg = self.status_ok(response_string)
|
|
if ret:
|
|
df = pd.DataFrame.from_dict(self.get_data(response_string))
|
|
print(df)
|
|
if len(df)>0:
|
|
df.to_excel(filename, index=False)
|
|
modtime = datetime.fromtimestamp(os.path.getmtime(filename))
|
|
print(f"modified Time:{modtime}")
|
|
else:
|
|
False, 'There is no data(df==0)'
|
|
else:
|
|
return False, msg
|
|
print("The difference of time is :", timeit.default_timer() - start)
|
|
return True, f"It takes {format(timeit.default_timer() - start, '.2f')} seconds to update {filename}"
|
|
|
|
def to_json(self):
|
|
start = timeit.default_timer()
|
|
js_data=None
|
|
# print(self.create_QBXML())
|
|
response_string = self.connect_to_quickbooks(self.create_QBXML())
|
|
print(f'{response_string = }')
|
|
ret, msg = self.status_ok(response_string)
|
|
if ret:
|
|
print(f'{ret = }')
|
|
df = pd.DataFrame.from_dict(self.get_data(response_string))
|
|
print(df)
|
|
if len(df)>0:
|
|
js_data = json.dumps(df.to_dict("list"))
|
|
else:
|
|
False, 'There is no data(df==0)'
|
|
else:
|
|
return False, msg
|
|
print("The difference of time is :", timeit.default_timer() - start)
|
|
return True, js_data
|
|
|
|
start = timeit.default_timer()
|
|
if __name__ == "__main__":
|
|
print('start ItemIventoryQuery')
|
|
# ini= ItemInventoryQuery(IncludeRetElement='FullName', OwnerID = "0")
|
|
for x in range(0, 1):
|
|
# ini= ItemInventoryQuery(IncludeRetElement=['FullName', 'DataExtRet'] , OwnerID = str(x), MaxReturned=None)
|
|
# ini= ItemInventoryQuery( OwnerID = str(x), MaxReturned="10", NameRangeFilter=["TSHT:TS WOODGRAIN:TS-252", "TSHT:TS WOODGRAIN:TS-252"])
|
|
# ini= ItemInventoryQuery( OwnerID = str(x), MaxReturned="10", NameRangeFilter=["TSHT:TS LUXURY:TS-L-252", "TSHT:TS LUXURY:TS-L-252"])
|
|
# ini= ItemInventoryQuery( OwnerID = str(x), MaxReturned="10")
|
|
# ini= ItemInventoryQuery( OwnerID ="0", MaxReturned=None, IncludeRetElement=['FullName', 'DataExtRet'])
|
|
ini= ItemInventoryQuery( IncludeRetElement=['FullName'])
|
|
# ini= ItemInventoryQuery( OwnerID = str(x))
|
|
# ini.to_excel('ItemInventory\ItemInventory_FromQB.xlsx')
|
|
# print(f'{ini.create_QBXML() = }')
|
|
qbxml = ini.create_QBXML()
|
|
status, msg = ini.to_json()
|
|
print(f'{msg = }')
|
|
itu = ini.connect_to_quickbooks(ini.create_QBXML())
|
|
# print(itu)
|
|
ret, msg = ini.status_ok(itu)
|
|
if ret:
|
|
print("YEAH")
|
|
# df = pd.DataFrame.from_dict(ini.get_data(itu))
|
|
|
|
# dfdict = df.to_dict('list')
|
|
|
|
# print(json.dumps(dfdict))
|
|
# print(df.to_json(orient="values"))
|
|
# print(f'{dfdict = }')
|
|
# print(df)
|
|
# df.to_excel('ItemInventory\ItemInventory_FromQB.xlsx', index=False)
|
|
# modtime = datetime.fromtimestamp(os.path.getmtime('ItemInventory\ItemInventory_FromQB.xlsx'))
|
|
# print(f"modified Time:{modtime}")
|
|
break
|
|
# print(ini.status_ok(itu))
|
|
# print(ini.create_QBXML())
|
|
print("The difference of time is :",
|
|
timeit.default_timer() - start) |