dasaproject/qbsalesorderquery.py
2024-07-24 07:35:25 +07:00

254 lines
11 KiB
Python

import xml.etree.ElementTree as ET
import win32com.client
import xmltodict
import pprint
import datetime
import pandas as pd
class SalesOrderQuery:
def __init__(self, **kwargs) -> None:
# print(f'kwargs:{kwargs}')
# print(args)
self.SalesOrderType = kwargs['SalesOrderType'] if 'SalesOrderType' in kwargs else 'SalesByCustomerSummary'
self.ReportPeriod = kwargs['ReportPeriod'] if 'ReportPeriod' in kwargs else None
self.ReportDateMacro = None
if 'ReportDateMacro' in kwargs:
if kwargs['ReportDateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
self.ReportDateMacro = kwargs['ReportDateMacro']
self.FromReportDate = self.validate_date(kwargs['FromReportDate']) if 'FromReportDate' in kwargs else None
self.ToReportDate = self.validate_date(kwargs['ToReportDate']) if 'ToReportDate' in kwargs else None
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
# print(self.ReportDateMacro, self.ReportPeriod, self.FromReportDate, self.ToReportDate)
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
if type(attrib) is not dict:
attrib = {}
ele = ET.SubElement(parentNode, thisNode)
for x in attrib:
ele.set(x, attrib[x])
ele.text = text
tail = "\n"
for x in range(whiteSpace):
tail = tail + " "
ele.tail = tail
return ele
def create_QBXML(self):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
QBXMLMsgsRq.set("onError", "continueOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
SalesOrderQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "SalesOrderQueryRq","\n " )
SalesOrderType = self.create_sub_element(ET, SalesOrderQueryRq, 'SalesOrderType', self.SalesOrderType)
if self.ReportDateMacro:
SalesOrderType = self.create_sub_element(ET, SalesOrderQueryRq, "ReportDateMacro", self.ReportDateMacro)
elif type(self.FromReportDate) is datetime.date or type(self.ToReportDate) is datetime.date:
ReportPeriod = self.create_sub_element(ET, SalesOrderQueryRq, "ReportPeriod", "\n ",)
if type(self.FromReportDate) is datetime.date:
FromReportDate = self.create_sub_element(ET, ReportPeriod, "FromReportDate", self.FromReportDate.strftime('%Y-%m-%d'),4)
if type(self.ToReportDate) is datetime.date:
ToReportDate = self.create_sub_element(ET, ReportPeriod, "ToReportDate", self.ToReportDate.strftime('%Y-%m-%d'))
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
print(qbxml_query)
return qbxml_query
def connect_to_quickbooks(self, qbxml_query):
# Connect to Quickbooks
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
# sessionManager.OpenConnection('', 'DASA')
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
# print(enumfodnc)
# print(enumfodnc.qbFileOpenDoNotCare)
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
# print (response_string)
return response_string
def __str__(self, *args) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
print("__str__")
return str(self.get_datarow())
# return "hello"
def get_datarow(self, *args):
return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
def get_dict(self, *args):
return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
def get_total(self):
return self._get_total(self.connect_to_quickbooks(self.create_QBXML()))
def status_ok(self, QBXML):
GSRQRs=QBXML.find('.//SalesOrderQueryRs')
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'status={status}')
if 'OK' in status:
return True, status_code
else:
return False, status_code
def get_coldesc(self, QBXML):
coldescs = QBXML.findall('.//ColDesc')
coldesclist=[]
firstcol=[]
allcols=[]
for idx, coldesc in enumerate(coldescs):
coltitles = coldesc.findall('ColTitle')
# coldesclist.append(coltitles[1].attrib.get('value'))
firstcol.append(coltitles[0].attrib.get('value'))
cols=[]
for idy, coltitle in enumerate(coltitles):
cols.append(coltitle.attrib.get('value'))
# allcols[idy].append(coltitle.attrib.get('value'))
# print(idx, coltitle.tag)
# print(idx, coltitle.attrib.get('value'))
allcols.append(cols)
print(f'getcoldesc:{firstcol}; coldesclist:{coldesclist}; allcols:{allcols}')
# print(allcols)
return allcols
def _get_total(self, response_string):
# print(response_string)
# Parse the response into an Element Tree and peel away the layers of response
QBXML = ET.fromstring(response_string)
if self.status_ok(QBXML):
print("")
# print(QBXML)
# QBXMLMsgsRs = QBXML.find('QBXMLMsgsRs') #this is a must have. dont CHANGE. this is the root
total = QBXML.findall('.//TotalRow')
return {'Total':total[0][1].attrib.get('value')}
else:
return ()
def get_reportsubtitle(self, QBXML):
reportsubtitle=QBXML.find('.//ReportSubtitle').text
return reportsubtitle
def _get_datarow(self, response_string):
# print(response_string)
print('get_datarow')
# Parse the response into an Element Tree and peel away the layers of response
QBXML = ET.fromstring(response_string)
datadict={}
reportsubtitle=None
total = float(QBXML.find('.//TotalRow')[1].attrib.get('value'))
status, statusdict= self.status_ok(QBXML)
print(status, statusdict, total)
if status and total > 0:
# print("get_datarow status ok")
reportsubtitle=self.get_reportsubtitle(QBXML)
DataRowRs = QBXML.iter("DataRow")
col_desc=self.get_coldesc(QBXML)
# print(f'getdatarow coldesc:{col_desc}')
# print(f'Datarow length:{len(list(DataRowRs))}')
temp=[]
rowvalue=[]
amount=[]
rowType=[]
idxerrorcount=0
# print(DataRowRs)
for idx_datarow, DataRow in enumerate(DataRowRs):
# print(DataRow.attrib.get('rowNumber'))
RowData=DataRow.find('RowData')
if RowData is None:
print(f'{idx_datarow} none continue')
idxerrorcount+=1
continue
if 'value' not in DataRow.find('RowData').attrib:
print(f'{idx_datarow} value continue')
idxerrorcount+=1
continue
RowData_value=DataRow.find('RowData').attrib.get('value')
rowType.append(RowData_value)
ColDatas = DataRow.findall("ColData")
cols=[]
for idx_coldata, coldata in enumerate(ColDatas):
if idx_coldata==0:
cols.append(rowType[idx_datarow-idxerrorcount])
else:
cols.append(coldata.attrib.get('value'))
rowvalue.append(cols)
temp.append(ColDatas[0].attrib.get('value'))
# datadict[ColDatas[0].attrib.get('colID')]=temp
amount.append(ColDatas[1].attrib.get('value'))
# datadict[ColDatas[1].attrib.get('colID')]=amount
# print(rowvalue)
datadict['rowvalue']=rowvalue
datadict['RowType']=rowType
datadict['CustomerFullName']=temp #ganti dgn rowType
datadict['Sales']=amount
# print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
df=pd.DataFrame(rowvalue)
col_desc= [cdesc[-1] for cdesc in col_desc]
print(f'coldesc getdatarow:{col_desc}')
# print(df)
df.columns=col_desc
# print(df)
# print('lendatadict')
print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
datadict=df.to_dict('list')
# print(datadict, type(datadict))
return datadict, reportsubtitle
else:
return datadict, reportsubtitle
def validate_date(self, date_text):
if date_text is None:
return None
try:
return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
except ValueError:
return None
# raise ValueError("Incorrect data format, should be YYYY-MM-DD")
print('### SalesOrder ###')
if __name__ == '__main__':
ini=SalesOrderQuery(ReportDateMacro='LastYear', FullName='Abadi Serpong')
# ini=SalesOrderQuery(FromReportDate='2023-01-11', ToReportDate='2023-01-12')
# ini=SalesOrderQuery(SalesOrderType='SalesByItemSummary')
# ini=SalesOrderQuery(SalesOrderType='SalesByRepSummary')
# ini=SalesOrderQuery(SalesOrderType='PurchaseByVendorSummary')
# ini=SalesOrderQuery(SalesOrderType='ProfitAndLossStandard')
# ini=SalesOrderQuery(SalesOrderType='PhysicalInventoryWorksheet')
# ini=SalesOrderQuery(SalesOrderType='InventoryStockStatusByItem')
print(ini.create_QBXML())
print(f'print ini:{ini}')
# print(type(ini.get_datarow()))
# print(ini.get_total())
# print(f'ini.getdatarow:{ini.get_datarow()}')
df=pd.DataFrame(ini.get_datarow()[0])
headers=list(df.columns)
print(df.tail(10))
df.columns=['CustomerFullName', 'TotalSales']
# df['TotalSales']=df['TotalSales'].astype(float)
df['TotalSales']=pd.to_numeric(df['TotalSales'])
# df['TotalSales']=df['TotalSales'].astype('Int64')
print(df.loc[df['TotalSales']>0])
print(df.tail(10))
print(headers)
print(list(df.keys().values))