mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-08 18:42:37 +07:00
333 lines
16 KiB
Python
333 lines
16 KiB
Python
import xml.etree.ElementTree as ET
|
|
import win32com.client
|
|
import xmltodict
|
|
import pprint
|
|
import datetime
|
|
import pandas as pd
|
|
import xml.dom.minidom
|
|
from lxml import etree
|
|
from tools import pprintXml
|
|
import timeit
|
|
class TransactionQuery:
|
|
def __init__(self, **kwargs) -> None:
|
|
# print(f'kwargs:{kwargs}')
|
|
# print(args)
|
|
# self.TransactionTypeFilter = kwargs['TransactionTypeFilter'] if 'TransactionTypeFilter' in kwargs else 'SalesOrder'
|
|
self.debug = kwargs['Debug'] if 'Debug' in kwargs else False
|
|
if self.debug != True and self.debug != False: self.debug=False
|
|
self.EntityTypeFilter = kwargs['EntityTypeFilter'] if 'EntityTypeFilter' in kwargs else None
|
|
self.RefNumber = kwargs['RefNumber'] if 'RefNumber' in kwargs else None
|
|
|
|
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
|
|
self.TxnTypeFilter = kwargs['TxnTypeFilter'] if 'TxnTypeFilter' in kwargs else None
|
|
# self.TransactionPaidStatusFilter = kwargs['TransactionPaidStatusFilter'] if 'TransactionPaidStatusFilter' in kwargs else 'Open'
|
|
self.TransactionPaidStatusFilter = kwargs['TransactionPaidStatusFilter'] if 'TransactionPaidStatusFilter' in kwargs else 'Open'
|
|
self.TransactionDetailLevelFilter = kwargs['TransactionDetailLevelFilter'] if 'TransactionDetailLevelFilter' in kwargs else None
|
|
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else None
|
|
self.IncludeRetElement = self.IncludeRetElement if isinstance(self.IncludeRetElement, list) else None
|
|
self.ReportPeriod = kwargs['ReportPeriod'] if 'ReportPeriod' in kwargs else None
|
|
self.DateMacro = None
|
|
if 'DateMacro' in kwargs:
|
|
if kwargs['DateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
|
|
self.DateMacro = kwargs['DateMacro']
|
|
self.FromTxnDate = self.validate_date(kwargs['FromTxnDate']) if 'FromTxnDate' in kwargs else None
|
|
self.ToTxnDate = self.validate_date(kwargs['ToTxnDate']) if 'ToTxnDate' in kwargs else None
|
|
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
|
|
|
|
# print(self.DateMacro, self.ReportPeriod, self.FromTxnDate, self.ToTxnDate)
|
|
|
|
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
|
|
if type(attrib) is not dict:
|
|
attrib = {}
|
|
ele = ET.SubElement(parentNode, thisNode)
|
|
for x in attrib:
|
|
ele.set(x, attrib[x])
|
|
ele.text = text
|
|
tail = "\n"
|
|
for x in range(whiteSpace):
|
|
tail = tail + " "
|
|
ele.tail = tail
|
|
return ele
|
|
|
|
def create_QBXML(self):
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n"
|
|
# root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
QBXMLMsgsRq.set("onError", "stopOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
# TransactionQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "TransactionQueryRq","\n", attrib={"metaData":"MetaDataAndResponseData", "iteratorID":"UUIDTYPE" })
|
|
TransactionQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "TransactionQueryRq","\n")
|
|
if self.RefNumber:
|
|
if isinstance(self.RefNumber, list):
|
|
for _ in self.RefNumber:
|
|
RefNumber = self.create_sub_element(ET, TransactionQueryRq, "RefNumber", _)
|
|
else:
|
|
RefNumber = self.create_sub_element(ET, TransactionQueryRq, "RefNumber", self.RefNumber)
|
|
|
|
if self.DateMacro:
|
|
TransactionDateRangeFilter = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter","\n")
|
|
# ReportPeriod = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter", "\n ",)
|
|
DateMacro = self.create_sub_element(ET, TransactionDateRangeFilter, "DateMacro", self.DateMacro)
|
|
elif type(self.FromTxnDate) is datetime.date or type(self.ToTxnDate) is datetime.date:
|
|
TransactionDateRangeFilter = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter", "\n",)
|
|
# ReportPeriod = self.create_sub_element(ET, TransactionQueryRq, "TransactionDateRangeFilter", "\n ",)
|
|
if type(self.FromTxnDate) is datetime.date:
|
|
FromTxnDate = self.create_sub_element(ET, TransactionDateRangeFilter, "FromTxnDate", self.FromTxnDate.strftime('%Y-%m-%d'))
|
|
if type(self.ToTxnDate) is datetime.date:
|
|
ToTxnDate = self.create_sub_element(ET, TransactionDateRangeFilter, "ToTxnDate", self.ToTxnDate.strftime('%Y-%m-%d'))
|
|
|
|
if self.EntityTypeFilter or self.FullName:
|
|
TransactionEntityFilter = self.create_sub_element(ET, TransactionQueryRq, "TransactionEntityFilter","\n")
|
|
if self.EntityTypeFilter:
|
|
EntityTypeFilter = self.create_sub_element(ET, TransactionEntityFilter, "EntityTypeFilter", self.EntityTypeFilter, )
|
|
if self.FullName:
|
|
FullName = self.create_sub_element(ET, TransactionEntityFilter, "FullName", self.FullName,)
|
|
# TransactionQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "TransactionQueryRq","\n " )
|
|
if self.TxnTypeFilter:
|
|
TransactionTypeFilter = self.create_sub_element(ET, TransactionQueryRq, 'TransactionTypeFilter', "\n")
|
|
TxnTypeFilter = self.create_sub_element(ET, TransactionTypeFilter, "TxnTypeFilter", self.TxnTypeFilter)
|
|
if self.TransactionDetailLevelFilter:
|
|
TransactionDetailLevelFilter = self.create_sub_element(ET, TransactionQueryRq, 'TransactionDetailLevelFilter', self.TransactionDetailLevelFilter)
|
|
TransactionPaidStatusFilter = self.create_sub_element(ET, TransactionQueryRq, 'TransactionPaidStatusFilter', self.TransactionPaidStatusFilter)
|
|
if self.IncludeRetElement:
|
|
for ire in self.IncludeRetElement:
|
|
IncludeRetElement = self.create_sub_element(ET, TransactionQueryRq, 'IncludeRetElement', ire)
|
|
mydata = ET.tostring(root, encoding="unicode")
|
|
# mydata = ET.tostring(root, encoding = "unicode")
|
|
# print(mydata)
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
# def to_string():
|
|
# qbxml_query=str(qbxml_query)
|
|
return pprintXml(qbxml_query)
|
|
|
|
def connect_to_quickbooks(self, qbxml_query):
|
|
# Connect to Quickbooks
|
|
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
# sessionManager.OpenConnection('', 'DASA')
|
|
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
|
|
# print(enumfodnc)
|
|
# print(enumfodnc.qbFileOpenDoNotCare)
|
|
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
sessionManager.OpenConnection('', 'DASA2')
|
|
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
|
|
ticket = sessionManager.BeginSession("", 2)
|
|
|
|
# Send query and receive response
|
|
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
|
|
|
|
# Disconnect from Quickbooks
|
|
sessionManager.EndSession(ticket) # Close the company file
|
|
sessionManager.CloseConnection() # Close the connection
|
|
# print(f"response: {response_string}")
|
|
# print (pprintXml(response_string))
|
|
return pprintXml(response_string)
|
|
|
|
def __str__(self, *args) -> str:
|
|
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
|
|
# print("__str__")
|
|
return str(self.get_datarow())
|
|
# return "hello"
|
|
|
|
|
|
def get_datarow(self, *args):
|
|
return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
|
|
|
|
def get_dict(self, *args):
|
|
return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
|
|
|
|
def get_total(self):
|
|
return self._get_total(self.connect_to_quickbooks(self.create_QBXML()))
|
|
|
|
def status_ok(self, QBXML):
|
|
print(QBXML)
|
|
tree = ET.fromstring(QBXML)
|
|
|
|
GSRQRs=tree.find('.//TransactionQueryRs')
|
|
print(f"GSRQRs:{GSRQRs}")
|
|
|
|
status_code = GSRQRs.attrib #.get('statusCode')
|
|
# print(GSRQRs.attrib)
|
|
# print(GSRQRs.attrib['statusCode'])
|
|
status=GSRQRs.attrib.get('statusMessage')
|
|
|
|
print(f'status={status}')
|
|
if 'OK' in status:
|
|
return True, status_code
|
|
else:
|
|
return False, status_code
|
|
|
|
def get_coldesc(self, QBXML):
|
|
coldescs = QBXML.findall('.//ColDesc')
|
|
coldesclist=[]
|
|
firstcol=[]
|
|
allcols=[]
|
|
for idx, coldesc in enumerate(coldescs):
|
|
coltitles = coldesc.findall('ColTitle')
|
|
# coldesclist.append(coltitles[1].attrib.get('value'))
|
|
firstcol.append(coltitles[0].attrib.get('value'))
|
|
cols=[]
|
|
for idy, coltitle in enumerate(coltitles):
|
|
cols.append(coltitle.attrib.get('value'))
|
|
# allcols[idy].append(coltitle.attrib.get('value'))
|
|
# print(idx, coltitle.tag)
|
|
# print(idx, coltitle.attrib.get('value'))
|
|
allcols.append(cols)
|
|
print(f'getcoldesc:{firstcol}; coldesclist:{coldesclist}; allcols:{allcols}')
|
|
# print(allcols)
|
|
return allcols
|
|
|
|
def _get_total(self, response_string):
|
|
# print(response_string)
|
|
# Parse the response into an Element Tree and peel away the layers of response
|
|
QBXML = ET.fromstring(response_string)
|
|
if self.status_ok(QBXML):
|
|
print("")
|
|
# print(QBXML)
|
|
# QBXMLMsgsRs = QBXML.find('QBXMLMsgsRs') #this is a must have. dont CHANGE. this is the root
|
|
total = QBXML.findall('.//TotalRow')
|
|
return {'Total':total[0][1].attrib.get('value')}
|
|
else:
|
|
return ()
|
|
|
|
def get_reportsubtitle(self, QBXML):
|
|
reportsubtitle=QBXML.find('.//ReportSubtitle').text
|
|
return reportsubtitle
|
|
|
|
def _get_datarow(self, response_string):
|
|
print(response_string)
|
|
print('get_datarow')
|
|
# Parse the response into an Element Tree and peel away the layers of response
|
|
QBXML = ET.fromstring(response_string)
|
|
datadict={}
|
|
reportsubtitle=None
|
|
total = float(QBXML.find('.//TotalRow')[1].attrib.get('value'))
|
|
status, statusdict= self.status_ok(QBXML)
|
|
print(status, statusdict, total)
|
|
if status and total > 0:
|
|
# print("get_datarow status ok")
|
|
reportsubtitle=self.get_reportsubtitle(QBXML)
|
|
DataRowRs = QBXML.iter("DataRow")
|
|
col_desc=self.get_coldesc(QBXML)
|
|
# print(f'getdatarow coldesc:{col_desc}')
|
|
# print(f'Datarow length:{len(list(DataRowRs))}')
|
|
temp=[]
|
|
rowvalue=[]
|
|
amount=[]
|
|
rowType=[]
|
|
idxerrorcount=0
|
|
# print(DataRowRs)
|
|
for idx_datarow, DataRow in enumerate(DataRowRs):
|
|
# print(DataRow.attrib.get('rowNumber'))
|
|
RowData=DataRow.find('RowData')
|
|
if RowData is None:
|
|
print(f'{idx_datarow} none continue')
|
|
idxerrorcount+=1
|
|
continue
|
|
if 'value' not in DataRow.find('RowData').attrib:
|
|
print(f'{idx_datarow} value continue')
|
|
idxerrorcount+=1
|
|
continue
|
|
RowData_value=DataRow.find('RowData').attrib.get('value')
|
|
rowType.append(RowData_value)
|
|
ColDatas = DataRow.findall("ColData")
|
|
cols=[]
|
|
for idx_coldata, coldata in enumerate(ColDatas):
|
|
if idx_coldata==0:
|
|
cols.append(rowType[idx_datarow-idxerrorcount])
|
|
else:
|
|
cols.append(coldata.attrib.get('value'))
|
|
rowvalue.append(cols)
|
|
temp.append(ColDatas[0].attrib.get('value'))
|
|
# datadict[ColDatas[0].attrib.get('colID')]=temp
|
|
amount.append(ColDatas[1].attrib.get('value'))
|
|
# datadict[ColDatas[1].attrib.get('colID')]=amount
|
|
# print(rowvalue)
|
|
datadict['rowvalue']=rowvalue
|
|
datadict['RowType']=rowType
|
|
datadict['CustomerFullName']=temp #ganti dgn rowType
|
|
datadict['Sales']=amount
|
|
# print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
|
|
|
|
df=pd.DataFrame(rowvalue)
|
|
col_desc= [cdesc[-1] for cdesc in col_desc]
|
|
print(f'coldesc getdatarow:{col_desc}')
|
|
|
|
# print(df)
|
|
df.columns=col_desc
|
|
# print(df)
|
|
# print('lendatadict')
|
|
print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
|
|
datadict=df.to_dict('list')
|
|
# print(datadict, type(datadict))
|
|
return datadict, reportsubtitle
|
|
else:
|
|
return datadict, reportsubtitle
|
|
|
|
def validate_date(self, date_text):
|
|
if date_text is None:
|
|
return None
|
|
try:
|
|
return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
return None
|
|
# raise ValueError("Incorrect data format, should be YYYY-MM-DD")
|
|
print('### Transaction ###')
|
|
if __name__ == '__main__':
|
|
starttime = timeit.default_timer()
|
|
|
|
# ini=TransactionQuery(DateMacro='ThisMonth')
|
|
# ini=TransactionQuery()
|
|
# temp = xml.dom.minidom.parseString(ini.create_QBXML())
|
|
# print(temp.toprettyxml())
|
|
print("hello")
|
|
|
|
# ini=TransactionQuery(FromTxnDate='2023-01-11', ToTxnDate='2023-01-12', TransactionPaidStatusFilter='Closed', TransactionDetailLevelFilter='All')
|
|
# ini=TransactionQuery(FromTxnDate='2023-01-11', ToTxnDate='2023-01-12', TransactionPaidStatusFilter='Closed', TransactionDetailLevelFilter=None,
|
|
# IncludeRetElement=['EntityRef', 'TxnDate', 'Amount', 'Memo'])
|
|
ini=TransactionQuery(
|
|
TransactionPaidStatusFilter='Either',
|
|
TransactionDetailLevelFilter=None,
|
|
# FullName='Abadi Serpong',
|
|
# EntityTypeFilter='Customer',
|
|
# RefNumber = ['24010005', '24010001', '24010002'],
|
|
# RefNumber = ['24010002'],
|
|
# RefNumber = ['B24010066'],
|
|
# RefNumber = ['B23122529'],
|
|
RefNumber = ['33185'],
|
|
TxnTypeFilter = "Invoice",
|
|
IncludeRetElement=['EntityRef', 'TxnDate', 'Amount', 'Memo', 'RefNumber', 'TxnID', 'TimeCreated', 'TimeModified', 'Amount']
|
|
|
|
# IncludeRetElement=[ 'TxnID', 'EntityRef', 'TxnDate', 'RefNumber', 'Memo']
|
|
)
|
|
# ini=TransactionQuery(TransactionTypeFilter='SalesByItemSummary')
|
|
# ini=TransactionQuery(TransactionTypeFilter='SalesByRepSummary')
|
|
# ini=TransactionQuery(TransactionTypeFilter='PurchaseByVendorSummary')
|
|
# ini=TransactionQuery(TransactionTypeFilter='ProfitAndLossStandard')
|
|
# ini=TransactionQuery(TransactionTypeFilter='PhysicalInventoryWorksheet')
|
|
# ini=TransactionQuery(TransactionTypeFilter='InventoryStockStatusByItem')
|
|
print(type(ini.create_QBXML()))
|
|
print(ini.create_QBXML())
|
|
print(f"ini result connect:{ini.connect_to_quickbooks(ini.create_QBXML())}")
|
|
# print(f'print ini:{ini}')
|
|
# print(type(ini.get_datarow()))
|
|
# print(ini.get_total())
|
|
# print(f'ini.getdatarow:{ini.get_datarow()}')
|
|
|
|
# df=pd.DataFrame(ini.get_datarow()[0])
|
|
# headers=list(df.columns)
|
|
# print(df.tail(10))
|
|
# df.columns=['CustomerFullName', 'TotalSales']
|
|
# # df['TotalSales']=df['TotalSales'].astype(float)
|
|
# df['TotalSales']=pd.to_numeric(df['TotalSales'])
|
|
# # df['TotalSales']=df['TotalSales'].astype('Int64')
|
|
# print(df.loc[df['TotalSales']>0])
|
|
# print(df.tail(10))
|
|
# print(headers)
|
|
# print(list(df.keys().values))
|
|
|
|
print("The time difference is :", timeit.default_timer() - starttime) |