mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-08 18:42:37 +07:00
254 lines
11 KiB
Python
254 lines
11 KiB
Python
import xml.etree.ElementTree as ET
|
|
import win32com.client
|
|
import xmltodict
|
|
import pprint
|
|
import datetime
|
|
import pandas as pd
|
|
|
|
class SalesOrderQuery:
|
|
def __init__(self, **kwargs) -> None:
|
|
# print(f'kwargs:{kwargs}')
|
|
# print(args)
|
|
self.SalesOrderType = kwargs['SalesOrderType'] if 'SalesOrderType' in kwargs else 'SalesByCustomerSummary'
|
|
self.ReportPeriod = kwargs['ReportPeriod'] if 'ReportPeriod' in kwargs else None
|
|
self.ReportDateMacro = None
|
|
if 'ReportDateMacro' in kwargs:
|
|
if kwargs['ReportDateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
|
|
self.ReportDateMacro = kwargs['ReportDateMacro']
|
|
self.FromReportDate = self.validate_date(kwargs['FromReportDate']) if 'FromReportDate' in kwargs else None
|
|
self.ToReportDate = self.validate_date(kwargs['ToReportDate']) if 'ToReportDate' in kwargs else None
|
|
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
|
|
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
|
|
# print(self.ReportDateMacro, self.ReportPeriod, self.FromReportDate, self.ToReportDate)
|
|
|
|
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
|
|
if type(attrib) is not dict:
|
|
attrib = {}
|
|
ele = ET.SubElement(parentNode, thisNode)
|
|
for x in attrib:
|
|
ele.set(x, attrib[x])
|
|
ele.text = text
|
|
tail = "\n"
|
|
for x in range(whiteSpace):
|
|
tail = tail + " "
|
|
ele.tail = tail
|
|
return ele
|
|
|
|
def create_QBXML(self):
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
QBXMLMsgsRq.set("onError", "continueOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
SalesOrderQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "SalesOrderQueryRq","\n " )
|
|
SalesOrderType = self.create_sub_element(ET, SalesOrderQueryRq, 'SalesOrderType', self.SalesOrderType)
|
|
if self.ReportDateMacro:
|
|
SalesOrderType = self.create_sub_element(ET, SalesOrderQueryRq, "ReportDateMacro", self.ReportDateMacro)
|
|
elif type(self.FromReportDate) is datetime.date or type(self.ToReportDate) is datetime.date:
|
|
ReportPeriod = self.create_sub_element(ET, SalesOrderQueryRq, "ReportPeriod", "\n ",)
|
|
if type(self.FromReportDate) is datetime.date:
|
|
FromReportDate = self.create_sub_element(ET, ReportPeriod, "FromReportDate", self.FromReportDate.strftime('%Y-%m-%d'),4)
|
|
if type(self.ToReportDate) is datetime.date:
|
|
ToReportDate = self.create_sub_element(ET, ReportPeriod, "ToReportDate", self.ToReportDate.strftime('%Y-%m-%d'))
|
|
|
|
mydata = ET.tostring(root, encoding = "unicode")
|
|
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
print(qbxml_query)
|
|
return qbxml_query
|
|
|
|
def connect_to_quickbooks(self, qbxml_query):
|
|
# Connect to Quickbooks
|
|
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
# sessionManager.OpenConnection('', 'DASA')
|
|
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
|
|
# print(enumfodnc)
|
|
# print(enumfodnc.qbFileOpenDoNotCare)
|
|
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
sessionManager.OpenConnection('', 'DASA2')
|
|
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
|
|
ticket = sessionManager.BeginSession("", 2)
|
|
|
|
# Send query and receive response
|
|
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
|
|
|
|
# Disconnect from Quickbooks
|
|
sessionManager.EndSession(ticket) # Close the company file
|
|
sessionManager.CloseConnection() # Close the connection
|
|
# print (response_string)
|
|
return response_string
|
|
|
|
def __str__(self, *args) -> str:
|
|
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
|
|
print("__str__")
|
|
return str(self.get_datarow())
|
|
# return "hello"
|
|
|
|
|
|
def get_datarow(self, *args):
|
|
return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
|
|
|
|
def get_dict(self, *args):
|
|
return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
|
|
|
|
def get_total(self):
|
|
return self._get_total(self.connect_to_quickbooks(self.create_QBXML()))
|
|
|
|
def status_ok(self, QBXML):
|
|
GSRQRs=QBXML.find('.//SalesOrderQueryRs')
|
|
status_code = GSRQRs.attrib #.get('statusCode')
|
|
# print(GSRQRs.attrib)
|
|
# print(GSRQRs.attrib['statusCode'])
|
|
status=GSRQRs.attrib.get('statusMessage')
|
|
|
|
print(f'status={status}')
|
|
if 'OK' in status:
|
|
return True, status_code
|
|
else:
|
|
return False, status_code
|
|
|
|
def get_coldesc(self, QBXML):
|
|
coldescs = QBXML.findall('.//ColDesc')
|
|
coldesclist=[]
|
|
firstcol=[]
|
|
allcols=[]
|
|
for idx, coldesc in enumerate(coldescs):
|
|
coltitles = coldesc.findall('ColTitle')
|
|
# coldesclist.append(coltitles[1].attrib.get('value'))
|
|
firstcol.append(coltitles[0].attrib.get('value'))
|
|
cols=[]
|
|
for idy, coltitle in enumerate(coltitles):
|
|
cols.append(coltitle.attrib.get('value'))
|
|
# allcols[idy].append(coltitle.attrib.get('value'))
|
|
# print(idx, coltitle.tag)
|
|
# print(idx, coltitle.attrib.get('value'))
|
|
allcols.append(cols)
|
|
print(f'getcoldesc:{firstcol}; coldesclist:{coldesclist}; allcols:{allcols}')
|
|
# print(allcols)
|
|
return allcols
|
|
|
|
def _get_total(self, response_string):
|
|
# print(response_string)
|
|
# Parse the response into an Element Tree and peel away the layers of response
|
|
QBXML = ET.fromstring(response_string)
|
|
if self.status_ok(QBXML):
|
|
print("")
|
|
# print(QBXML)
|
|
# QBXMLMsgsRs = QBXML.find('QBXMLMsgsRs') #this is a must have. dont CHANGE. this is the root
|
|
total = QBXML.findall('.//TotalRow')
|
|
return {'Total':total[0][1].attrib.get('value')}
|
|
else:
|
|
return ()
|
|
|
|
def get_reportsubtitle(self, QBXML):
|
|
reportsubtitle=QBXML.find('.//ReportSubtitle').text
|
|
return reportsubtitle
|
|
|
|
def _get_datarow(self, response_string):
|
|
# print(response_string)
|
|
print('get_datarow')
|
|
# Parse the response into an Element Tree and peel away the layers of response
|
|
QBXML = ET.fromstring(response_string)
|
|
datadict={}
|
|
reportsubtitle=None
|
|
total = float(QBXML.find('.//TotalRow')[1].attrib.get('value'))
|
|
status, statusdict= self.status_ok(QBXML)
|
|
print(status, statusdict, total)
|
|
if status and total > 0:
|
|
# print("get_datarow status ok")
|
|
reportsubtitle=self.get_reportsubtitle(QBXML)
|
|
DataRowRs = QBXML.iter("DataRow")
|
|
col_desc=self.get_coldesc(QBXML)
|
|
# print(f'getdatarow coldesc:{col_desc}')
|
|
# print(f'Datarow length:{len(list(DataRowRs))}')
|
|
temp=[]
|
|
rowvalue=[]
|
|
amount=[]
|
|
rowType=[]
|
|
idxerrorcount=0
|
|
# print(DataRowRs)
|
|
for idx_datarow, DataRow in enumerate(DataRowRs):
|
|
# print(DataRow.attrib.get('rowNumber'))
|
|
RowData=DataRow.find('RowData')
|
|
if RowData is None:
|
|
print(f'{idx_datarow} none continue')
|
|
idxerrorcount+=1
|
|
continue
|
|
if 'value' not in DataRow.find('RowData').attrib:
|
|
print(f'{idx_datarow} value continue')
|
|
idxerrorcount+=1
|
|
continue
|
|
RowData_value=DataRow.find('RowData').attrib.get('value')
|
|
rowType.append(RowData_value)
|
|
ColDatas = DataRow.findall("ColData")
|
|
cols=[]
|
|
for idx_coldata, coldata in enumerate(ColDatas):
|
|
if idx_coldata==0:
|
|
cols.append(rowType[idx_datarow-idxerrorcount])
|
|
else:
|
|
cols.append(coldata.attrib.get('value'))
|
|
rowvalue.append(cols)
|
|
temp.append(ColDatas[0].attrib.get('value'))
|
|
# datadict[ColDatas[0].attrib.get('colID')]=temp
|
|
amount.append(ColDatas[1].attrib.get('value'))
|
|
# datadict[ColDatas[1].attrib.get('colID')]=amount
|
|
# print(rowvalue)
|
|
datadict['rowvalue']=rowvalue
|
|
datadict['RowType']=rowType
|
|
datadict['CustomerFullName']=temp #ganti dgn rowType
|
|
datadict['Sales']=amount
|
|
# print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
|
|
|
|
df=pd.DataFrame(rowvalue)
|
|
col_desc= [cdesc[-1] for cdesc in col_desc]
|
|
print(f'coldesc getdatarow:{col_desc}')
|
|
|
|
# print(df)
|
|
df.columns=col_desc
|
|
# print(df)
|
|
# print('lendatadict')
|
|
print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
|
|
datadict=df.to_dict('list')
|
|
# print(datadict, type(datadict))
|
|
return datadict, reportsubtitle
|
|
else:
|
|
return datadict, reportsubtitle
|
|
|
|
def validate_date(self, date_text):
|
|
if date_text is None:
|
|
return None
|
|
try:
|
|
return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
return None
|
|
# raise ValueError("Incorrect data format, should be YYYY-MM-DD")
|
|
print('### SalesOrder ###')
|
|
if __name__ == '__main__':
|
|
ini=SalesOrderQuery(ReportDateMacro='LastYear', FullName='Abadi Serpong')
|
|
# ini=SalesOrderQuery(FromReportDate='2023-01-11', ToReportDate='2023-01-12')
|
|
# ini=SalesOrderQuery(SalesOrderType='SalesByItemSummary')
|
|
# ini=SalesOrderQuery(SalesOrderType='SalesByRepSummary')
|
|
# ini=SalesOrderQuery(SalesOrderType='PurchaseByVendorSummary')
|
|
# ini=SalesOrderQuery(SalesOrderType='ProfitAndLossStandard')
|
|
# ini=SalesOrderQuery(SalesOrderType='PhysicalInventoryWorksheet')
|
|
# ini=SalesOrderQuery(SalesOrderType='InventoryStockStatusByItem')
|
|
print(ini.create_QBXML())
|
|
print(f'print ini:{ini}')
|
|
# print(type(ini.get_datarow()))
|
|
# print(ini.get_total())
|
|
# print(f'ini.getdatarow:{ini.get_datarow()}')
|
|
df=pd.DataFrame(ini.get_datarow()[0])
|
|
headers=list(df.columns)
|
|
print(df.tail(10))
|
|
df.columns=['CustomerFullName', 'TotalSales']
|
|
# df['TotalSales']=df['TotalSales'].astype(float)
|
|
df['TotalSales']=pd.to_numeric(df['TotalSales'])
|
|
# df['TotalSales']=df['TotalSales'].astype('Int64')
|
|
print(df.loc[df['TotalSales']>0])
|
|
print(df.tail(10))
|
|
print(headers)
|
|
print(list(df.keys().values)) |