dasaproject/qbtransactionquery.py
2024-03-14 03:30:41 +07:00

333 lines
16 KiB
Python

import xml.etree.ElementTree as ET
import win32com.client
import xmltodict
import pprint
import datetime
import pandas as pd
import xml.dom.minidom
from lxml import etree
from tools import pprintXml
import timeit
class TransactionQuery:
def __init__(self, **kwargs) -> None:
# print(f'kwargs:{kwargs}')
# print(args)
# self.TransactionTypeFilter = kwargs['TransactionTypeFilter'] if 'TransactionTypeFilter' in kwargs else 'SalesOrder'
self.debug = kwargs['Debug'] if 'Debug' in kwargs else False
if self.debug != True and self.debug != False: self.debug=False
self.EntityTypeFilter = kwargs['EntityTypeFilter'] if 'EntityTypeFilter' in kwargs else None
self.RefNumber = kwargs['RefNumber'] if 'RefNumber' in kwargs else None
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
self.TxnTypeFilter = kwargs['TxnTypeFilter'] if 'TxnTypeFilter' in kwargs else None
# self.TransactionPaidStatusFilter = kwargs['TransactionPaidStatusFilter'] if 'TransactionPaidStatusFilter' in kwargs else 'Open'
self.TransactionPaidStatusFilter = kwargs['TransactionPaidStatusFilter'] if 'TransactionPaidStatusFilter' in kwargs else 'Open'
self.TransactionDetailLevelFilter = kwargs['TransactionDetailLevelFilter'] if 'TransactionDetailLevelFilter' in kwargs else None
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else None
self.IncludeRetElement = self.IncludeRetElement if isinstance(self.IncludeRetElement, list) else None
self.ReportPeriod = kwargs['ReportPeriod'] if 'ReportPeriod' in kwargs else None
self.DateMacro = None
if 'DateMacro' in kwargs:
if kwargs['DateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
self.DateMacro = kwargs['DateMacro']
self.FromTxnDate = self.validate_date(kwargs['FromTxnDate']) if 'FromTxnDate' in kwargs else None
self.ToTxnDate = self.validate_date(kwargs['ToTxnDate']) if 'ToTxnDate' in kwargs else None
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
# print(self.DateMacro, self.ReportPeriod, self.FromTxnDate, self.ToTxnDate)
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
if type(attrib) is not dict:
attrib = {}
ele = ET.SubElement(parentNode, thisNode)
for x in attrib:
ele.set(x, attrib[x])
ele.text = text
tail = "\n"
for x in range(whiteSpace):
tail = tail + " "
ele.tail = tail
return ele
def create_QBXML(self):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n"
# root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
# TransactionQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "TransactionQueryRq","\n", attrib={"metaData":"MetaDataAndResponseData", "iteratorID":"UUIDTYPE" })
TransactionQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "TransactionQueryRq","\n")
if self.RefNumber:
if isinstance(self.RefNumber, list):
for _ in self.RefNumber:
RefNumber = self.create_sub_element(ET, TransactionQueryRq, "RefNumber", _)
else:
RefNumber = self.create_sub_element(ET, TransactionQueryRq, "RefNumber", self.RefNumber)
if self.DateMacro:
TransactionDateRangeFilter = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter","\n")
# ReportPeriod = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter", "\n ",)
DateMacro = self.create_sub_element(ET, TransactionDateRangeFilter, "DateMacro", self.DateMacro)
elif type(self.FromTxnDate) is datetime.date or type(self.ToTxnDate) is datetime.date:
TransactionDateRangeFilter = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter", "\n",)
# ReportPeriod = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter", "\n ",)
if type(self.FromTxnDate) is datetime.date:
FromTxnDate = self.create_sub_element(ET, TransactionDateRangeFilter, "FromTxnDate", self.FromTxnDate.strftime('%Y-%m-%d'))
if type(self.ToTxnDate) is datetime.date:
ToTxnDate = self.create_sub_element(ET, TransactionDateRangeFilter, "ToTxnDate", self.ToTxnDate.strftime('%Y-%m-%d'))
if self.EntityTypeFilter or self.FullName:
TransactionEntityFilter = self.create_sub_element(ET, TransactionQueryRq, "TransactionEntityFilter","\n")
if self.EntityTypeFilter:
EntityTypeFilter = self.create_sub_element(ET, TransactionEntityFilter, "EntityTypeFilter", self.EntityTypeFilter, )
if self.FullName:
FullName = self.create_sub_element(ET, TransactionEntityFilter, "FullName", self.FullName,)
# TransactionQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "TransactionQueryRq","\n " )
if self.TxnTypeFilter:
TransactionTypeFilter = self.create_sub_element(ET, TransactionQueryRq, 'TransactionTypeFilter', "\n")
TxnTypeFilter = self.create_sub_element(ET, TransactionTypeFilter, "TxnTypeFilter", self.TxnTypeFilter)
if self.TransactionDetailLevelFilter:
TransactionDetailLevelFilter = self.create_sub_element(ET, TransactionQueryRq, 'TransactionDetailLevelFilter', self.TransactionDetailLevelFilter)
TransactionPaidStatusFilter = self.create_sub_element(ET, TransactionQueryRq, 'TransactionPaidStatusFilter', self.TransactionPaidStatusFilter)
if self.IncludeRetElement:
for ire in self.IncludeRetElement:
IncludeRetElement = self.create_sub_element(ET, TransactionQueryRq, 'IncludeRetElement', ire)
mydata = ET.tostring(root, encoding="unicode")
# mydata = ET.tostring(root, encoding = "unicode")
# print(mydata)
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
# def to_string():
# qbxml_query=str(qbxml_query)
return pprintXml(qbxml_query)
def connect_to_quickbooks(self, qbxml_query):
# Connect to Quickbooks
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
# sessionManager.OpenConnection('', 'DASA')
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
# print(enumfodnc)
# print(enumfodnc.qbFileOpenDoNotCare)
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
# print(f"response: {response_string}")
# print (pprintXml(response_string))
return pprintXml(response_string)
def __str__(self, *args) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# print("__str__")
return str(self.get_datarow())
# return "hello"
def get_datarow(self, *args):
return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
def get_dict(self, *args):
return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
def get_total(self):
return self._get_total(self.connect_to_quickbooks(self.create_QBXML()))
def status_ok(self, QBXML):
print(QBXML)
tree = ET.fromstring(QBXML)
GSRQRs=tree.find('.//TransactionQueryRs')
print(f"GSRQRs:{GSRQRs}")
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'status={status}')
if 'OK' in status:
return True, status_code
else:
return False, status_code
def get_coldesc(self, QBXML):
coldescs = QBXML.findall('.//ColDesc')
coldesclist=[]
firstcol=[]
allcols=[]
for idx, coldesc in enumerate(coldescs):
coltitles = coldesc.findall('ColTitle')
# coldesclist.append(coltitles[1].attrib.get('value'))
firstcol.append(coltitles[0].attrib.get('value'))
cols=[]
for idy, coltitle in enumerate(coltitles):
cols.append(coltitle.attrib.get('value'))
# allcols[idy].append(coltitle.attrib.get('value'))
# print(idx, coltitle.tag)
# print(idx, coltitle.attrib.get('value'))
allcols.append(cols)
print(f'getcoldesc:{firstcol}; coldesclist:{coldesclist}; allcols:{allcols}')
# print(allcols)
return allcols
def _get_total(self, response_string):
# print(response_string)
# Parse the response into an Element Tree and peel away the layers of response
QBXML = ET.fromstring(response_string)
if self.status_ok(QBXML):
print("")
# print(QBXML)
# QBXMLMsgsRs = QBXML.find('QBXMLMsgsRs') #this is a must have. dont CHANGE. this is the root
total = QBXML.findall('.//TotalRow')
return {'Total':total[0][1].attrib.get('value')}
else:
return ()
def get_reportsubtitle(self, QBXML):
reportsubtitle=QBXML.find('.//ReportSubtitle').text
return reportsubtitle
def _get_datarow(self, response_string):
print(response_string)
print('get_datarow')
# Parse the response into an Element Tree and peel away the layers of response
QBXML = ET.fromstring(response_string)
datadict={}
reportsubtitle=None
total = float(QBXML.find('.//TotalRow')[1].attrib.get('value'))
status, statusdict= self.status_ok(QBXML)
print(status, statusdict, total)
if status and total > 0:
# print("get_datarow status ok")
reportsubtitle=self.get_reportsubtitle(QBXML)
DataRowRs = QBXML.iter("DataRow")
col_desc=self.get_coldesc(QBXML)
# print(f'getdatarow coldesc:{col_desc}')
# print(f'Datarow length:{len(list(DataRowRs))}')
temp=[]
rowvalue=[]
amount=[]
rowType=[]
idxerrorcount=0
# print(DataRowRs)
for idx_datarow, DataRow in enumerate(DataRowRs):
# print(DataRow.attrib.get('rowNumber'))
RowData=DataRow.find('RowData')
if RowData is None:
print(f'{idx_datarow} none continue')
idxerrorcount+=1
continue
if 'value' not in DataRow.find('RowData').attrib:
print(f'{idx_datarow} value continue')
idxerrorcount+=1
continue
RowData_value=DataRow.find('RowData').attrib.get('value')
rowType.append(RowData_value)
ColDatas = DataRow.findall("ColData")
cols=[]
for idx_coldata, coldata in enumerate(ColDatas):
if idx_coldata==0:
cols.append(rowType[idx_datarow-idxerrorcount])
else:
cols.append(coldata.attrib.get('value'))
rowvalue.append(cols)
temp.append(ColDatas[0].attrib.get('value'))
# datadict[ColDatas[0].attrib.get('colID')]=temp
amount.append(ColDatas[1].attrib.get('value'))
# datadict[ColDatas[1].attrib.get('colID')]=amount
# print(rowvalue)
datadict['rowvalue']=rowvalue
datadict['RowType']=rowType
datadict['CustomerFullName']=temp #ganti dgn rowType
datadict['Sales']=amount
# print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
df=pd.DataFrame(rowvalue)
col_desc= [cdesc[-1] for cdesc in col_desc]
print(f'coldesc getdatarow:{col_desc}')
# print(df)
df.columns=col_desc
# print(df)
# print('lendatadict')
print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
datadict=df.to_dict('list')
# print(datadict, type(datadict))
return datadict, reportsubtitle
else:
return datadict, reportsubtitle
def validate_date(self, date_text):
if date_text is None:
return None
try:
return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
except ValueError:
return None
# raise ValueError("Incorrect data format, should be YYYY-MM-DD")
print('### Transaction ###')
if __name__ == '__main__':
starttime = timeit.default_timer()
# ini=TransactionQuery(DateMacro='ThisMonth')
# ini=TransactionQuery()
# temp = xml.dom.minidom.parseString(ini.create_QBXML())
# print(temp.toprettyxml())
print("hello")
# ini=TransactionQuery(FromTxnDate='2023-01-11', ToTxnDate='2023-01-12', TransactionPaidStatusFilter='Closed', TransactionDetailLevelFilter='All')
# ini=TransactionQuery(FromTxnDate='2023-01-11', ToTxnDate='2023-01-12', TransactionPaidStatusFilter='Closed', TransactionDetailLevelFilter=None,
# IncludeRetElement=['EntityRef', 'TxnDate', 'Amount', 'Memo'])
ini=TransactionQuery(
TransactionPaidStatusFilter='Either',
TransactionDetailLevelFilter=None,
# FullName='Abadi Serpong',
# EntityTypeFilter='Customer',
# RefNumber = ['24010005', '24010001', '24010002'],
# RefNumber = ['24010002'],
# RefNumber = ['B24010066'],
# RefNumber = ['B23122529'],
RefNumber = ['33185'],
TxnTypeFilter = "Invoice",
IncludeRetElement=['EntityRef', 'TxnDate', 'Amount', 'Memo', 'RefNumber', 'TxnID', 'TimeCreated', 'TimeModified', 'Amount']
# IncludeRetElement=[ 'TxnID', 'EntityRef', 'TxnDate', 'RefNumber', 'Memo']
)
# ini=TransactionQuery(TransactionTypeFilter='SalesByItemSummary')
# ini=TransactionQuery(TransactionTypeFilter='SalesByRepSummary')
# ini=TransactionQuery(TransactionTypeFilter='PurchaseByVendorSummary')
# ini=TransactionQuery(TransactionTypeFilter='ProfitAndLossStandard')
# ini=TransactionQuery(TransactionTypeFilter='PhysicalInventoryWorksheet')
# ini=TransactionQuery(TransactionTypeFilter='InventoryStockStatusByItem')
print(type(ini.create_QBXML()))
print(ini.create_QBXML())
print(f"ini result connect:{ini.connect_to_quickbooks(ini.create_QBXML())}")
# print(f'print ini:{ini}')
# print(type(ini.get_datarow()))
# print(ini.get_total())
# print(f'ini.getdatarow:{ini.get_datarow()}')
# df=pd.DataFrame(ini.get_datarow()[0])
# headers=list(df.columns)
# print(df.tail(10))
# df.columns=['CustomerFullName', 'TotalSales']
# # df['TotalSales']=df['TotalSales'].astype(float)
# df['TotalSales']=pd.to_numeric(df['TotalSales'])
# # df['TotalSales']=df['TotalSales'].astype('Int64')
# print(df.loc[df['TotalSales']>0])
# print(df.tail(10))
# print(headers)
# print(list(df.keys().values))
print("The time difference is :", timeit.default_timer() - starttime)