Quickbooks-API/server.py
2024-06-10 08:46:51 +07:00

343 lines
16 KiB
Python

import pprint
import xmltodict
import win32com.client
import xml.etree.ElementTree as ET
class baseQBQuery:
def __init__(self, *args, **kwargs) -> None:
# print(f'kwargs:{kwargs}')
# print(args)
self.onError = "continueOnError"
self.GeneralSummaryReportType = kwargs['GeneralSummaryReportType'] if 'GeneralSummaryReportType' in kwargs else 'SalesByCustomerSummary'
self.ReportPeriod = kwargs['ReportPeriod'] if 'ReportPeriod' in kwargs else None
self.ReportDateMacro = None
if 'ReportDateMacro' in kwargs:
if kwargs['ReportDateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
self.ReportDateMacro = kwargs['ReportDateMacro']
self.FromReportDate = self.validate_date(kwargs['FromReportDate']) if 'FromReportDate' in kwargs else None
self.ToReportDate = self.validate_date(kwargs['ToReportDate']) if 'ToReportDate' in kwargs else None
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
# print(self.ReportDateMacro, self.ReportPeriod, self.FromReportDate, self.ToReportDate)
self.QBXML = None
self.QBDict = {}
self.response_string = None
# def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
# if type(attrib) is not dict:
# attrib = {}
# ele = ET.SubElement(parentNode, thisNode)
# for x in attrib:
# ele.set(x, attrib[x])
# ele.text = text
# tail = "\n"
# for x in range(whiteSpace):
# tail = tail + " "
# ele.tail = tail
# return ele
def create_QBXML(self):
dataDict = { ### Header for qmxml with version attribute
"?qbxml": {
"@version": "13.0",
}
}
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Simple Example ###
# "@onError": "continueOnError",
# "GeneralSummaryReportQueryRq": {
# "GeneralSummaryReportType": self.GeneralSummaryReportType,
# }
# }
# }
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
# "@onError": "continueOnError",
# "ItemInventoryQueryRq": {
# "FullName": ["TACO:AA:TH-003AA",
# "TACO:AA:TH-010AA"]
# },
# }
# }
firstKey = str(list(self.QBDict.keys())[0])
dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
"@onError": self.onError,
# self.__class__.__name__+"Rq": self.QBDict[self.__class__.__name__+"Rq"]}}
firstKey: self.QBDict[firstKey]}}
print(dataDict)
# print(xmltodict.unparse(dataDict, pretty=True))
# # QBXML = '<?qbxml version="13.0"?>' + xmltodict.unparse(dataDict, pretty=True)
self.QBXML = xmltodict.unparse(dataDict, pretty=True).replace("</?qbxml>", "")
print(self.QBXML)
return self.QBXML
# def create_QBXML(self):
# root = ET.Element("QBXML")
# root.tail = "\n"
# root.text = "\n "
# QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
# QBXMLMsgsRq.set("onError", "continueOnError")
# QBXMLMsgsRq.tail = "\n"
# QBXMLMsgsRq.text = "\n "
# GeneralSummaryReportQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "GeneralSummaryReportQueryRq","\n " )
# GeneralSummaryReportType = self.create_sub_element(ET, GeneralSummaryReportQueryRq, 'GeneralSummaryReportType', self.GeneralSummaryReportType)
# if self.ReportDateMacro:
# GeneralSummaryReportType = self.create_sub_element(ET, GeneralSummaryReportQueryRq, "ReportDateMacro", self.ReportDateMacro)
# elif type(self.FromReportDate) is datetime.date or type(self.ToReportDate) is datetime.date:
# ReportPeriod = self.create_sub_element(ET, GeneralSummaryReportQueryRq, "ReportPeriod", "\n ",)
# if type(self.FromReportDate) is datetime.date:
# FromReportDate = self.create_sub_element(ET, ReportPeriod, "FromReportDate", self.FromReportDate.strftime('%Y-%m-%d'),4)
# if type(self.ToReportDate) is datetime.date:
# ToReportDate = self.create_sub_element(ET, ReportPeriod, "ToReportDate", self.ToReportDate.strftime('%Y-%m-%d'))
# mydata = ET.tostring(root, encoding = "unicode")
# qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
# qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
# qbxml_query = qbxml_query + "\n" + mydata
# return qbxml_query
def connect_to_quickbooks(self, qbxml_query=None):
# Connect to Quickbooks
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
# sessionManager.OpenConnection('', 'DASA')
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
# print(enumfodnc)
# print(enumfodnc.qbFileOpenDoNotCare)
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
# response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
self.response_string = sessionManager.ProcessRequest(ticket, self.QBXML)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
print(self.response_string)
self.isDataOK()
return self.response_string
def isDataOK(self):
print("isdataok")
# QBXML = ET.fromstring(self.response_string)
# print(xmltodict.parse(self.response_string))
varDict = xmltodict.parse(self.response_string)
pprint.pprint(varDict)
for _ in self.gen_dict_extract("@statusMessage", varDict):
print(_)
if 'Status OK'.lower()==_.lower():
print(_)
isStatusOK = True
break
else:
isStatusOK=False
if isStatusOK:
print(self.returnRet(varDict))
def returnRet(self, varDict):
print("returnRet")
pprint.pprint(varDict)
stillNoRetFound = True
counter = 0
print(f'{varDict = }')
varDict = varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"]
print(f'{varDict = }')
for idx, key in enumerate(varDict):
# print(idx, key, len(varDict))
if "Ret" in key:
return varDict[key]
return None
def gen_dict_extract(self, key, var:dict=None): ### Utils
if var==None:
var=self.response_string
# print("var")
if hasattr(var,'items'): # hasattr(var,'items') for python 3, hasattr(var,'iteritems') for python 2
# print("hassattr")
for k, v in var.items(): # var.items() for python 3, var.iteritems() for python 2
# print(k,v)
if k == key:
yield v
if isinstance(v, dict):
for result in self.gen_dict_extract(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in self.gen_dict_extract(key, d):
yield result
def __str__(self, *args) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# print("__str__")
return str(self.get_datarow())
# return "hello"
# def get_datarow(self, *args):
# return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
# def get_dict(self, *args):
# return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# def get_total(self):
# return self._get_total(self.connect_to_quickbooks(self.create_QBXML()))
def status_ok(self, QBXML):
GSRQRs=QBXML.find('.//GeneralSummaryReportQueryRs')
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'status={status}')
if 'OK' in status:
return True, status_code
else:
return False, status_code
# def get_coldesc(self, QBXML):
# coldescs = QBXML.findall('.//ColDesc')
# coldesclist=[]
# firstcol=[]
# allcols=[]
# for idx, coldesc in enumerate(coldescs):
# coltitles = coldesc.findall('ColTitle')
# # coldesclist.append(coltitles[1].attrib.get('value'))
# firstcol.append(coltitles[0].attrib.get('value'))
# cols=[]
# for idy, coltitle in enumerate(coltitles):
# cols.append(coltitle.attrib.get('value'))
# # allcols[idy].append(coltitle.attrib.get('value'))
# # print(idx, coltitle.tag)
# # print(idx, coltitle.attrib.get('value'))
# allcols.append(cols)
# print(f'getcoldesc:{firstcol}; coldesclist:{coldesclist}; allcols:{allcols}')
# # print(allcols)
# return allcols
# def _get_total(self, response_string):
# # print(response_string)
# # Parse the response into an Element Tree and peel away the layers of response
# QBXML = ET.fromstring(response_string)
# if self.status_ok(QBXML):
# print("")
# # print(QBXML)
# # QBXMLMsgsRs = QBXML.find('QBXMLMsgsRs') #this is a must have. dont CHANGE. this is the root
# total = QBXML.findall('.//TotalRow')
# return {'Total':total[0][1].attrib.get('value')}
# else:
# return ()
# def get_reportsubtitle(self, QBXML):
# reportsubtitle=QBXML.find('.//ReportSubtitle').text
# return reportsubtitle
# def _get_datarow(self, response_string):
# # print(response_string)
# print('get_datarow')
# # Parse the response into an Element Tree and peel away the layers of response
# QBXML = ET.fromstring(response_string)
# datadict={}
# reportsubtitle=None
# total = float(QBXML.find('.//TotalRow')[1].attrib.get('value'))
# status, statusdict= self.status_ok(QBXML)
# print(status, statusdict, total)
# if status and total > 0:
# # print("get_datarow status ok")
# reportsubtitle=self.get_reportsubtitle(QBXML)
# DataRowRs = QBXML.iter("DataRow")
# col_desc=self.get_coldesc(QBXML)
# # print(f'getdatarow coldesc:{col_desc}')
# # print(f'Datarow length:{len(list(DataRowRs))}')
# temp=[]
# rowvalue=[]
# amount=[]
# rowType=[]
# idxerrorcount=0
# # print(DataRowRs)
# for idx_datarow, DataRow in enumerate(DataRowRs):
# # print(DataRow.attrib.get('rowNumber'))
# RowData=DataRow.find('RowData')
# if RowData is None:
# print(f'{idx_datarow} none continue')
# idxerrorcount+=1
# continue
# if 'value' not in DataRow.find('RowData').attrib:
# print(f'{idx_datarow} value continue')
# idxerrorcount+=1
# continue
# RowData_value=DataRow.find('RowData').attrib.get('value')
# rowType.append(RowData_value)
# ColDatas = DataRow.findall("ColData")
# cols=[]
# for idx_coldata, coldata in enumerate(ColDatas):
# if idx_coldata==0:
# cols.append(rowType[idx_datarow-idxerrorcount])
# else:
# cols.append(coldata.attrib.get('value'))
# rowvalue.append(cols)
# temp.append(ColDatas[0].attrib.get('value'))
# # datadict[ColDatas[0].attrib.get('colID')]=temp
# amount.append(ColDatas[1].attrib.get('value'))
# # datadict[ColDatas[1].attrib.get('colID')]=amount
# # print(rowvalue)
# datadict['rowvalue']=rowvalue
# datadict['RowType']=rowType
# datadict['CustomerFullName']=temp #ganti dgn rowType
# datadict['Sales']=amount
# # print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
# df=pd.DataFrame(rowvalue)
# col_desc= [cdesc[-1] for cdesc in col_desc]
# print(f'coldesc getdatarow:{col_desc}')
# # print(df)
# df.columns=col_desc
# # print(df)
# # print('lendatadict')
# print(f"lendatadict {len(datadict['CustomerFullName'])}, {len(datadict['Sales'])}, {len(datadict['RowType'])}, {len(datadict['rowvalue'])}")
# datadict=df.to_dict('list')
# # print(datadict, type(datadict))
# return datadict, reportsubtitle
# else:
# return datadict, reportsubtitle
# def validate_date(self, date_text):
# if date_text is None:
# return None
# try:
# return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
# except ValueError:
# return None
# # raise ValueError("Incorrect data format, should be YYYY-MM-DD")
print('### GeneralSummaryReport ###')
if __name__ == '__main__':
ini=GeneralSummaryReportQuery(ReportDateMacro='LastYear')
# ini=GeneralSummaryReportQuery(FromReportDate='2023-01-11', ToReportDate='2023-01-12')
ini=GeneralSummaryReportQuery(GeneralSummaryReportType='SalesByItemSummary')
# ini=GeneralSummaryReportQuery(GeneralSummaryReportType='SalesByRepSummary')
# ini=GeneralSummaryReportQuery(GeneralSummaryReportType='PurchaseByVendorSummary')
# ini=GeneralSummaryReportQuery(GeneralSummaryReportType='ProfitAndLossStandard')
# ini=GeneralSummaryReportQuery(GeneralSummaryReportType='PhysicalInventoryWorksheet')
# ini=GeneralSummaryReportQuery(GeneralSummaryReportType='InventoryStockStatusByItem')
print(ini.create_QBXML())
# print(f'print ini:{ini}')
# print(type(ini.get_datarow()))
# print(ini.get_total())
# print(f'ini.getdatarow:{ini.get_datarow()}')
ini.connect_to_quickbooks()
df=pd.DataFrame(ini.get_datarow()[0])
headers=list(df.columns)
print(df.tail(10))
df.columns=['CustomerFullName', 'TotalSales']
# df['TotalSales']=df['TotalSales'].astype(float)
df['TotalSales']=pd.to_numeric(df['TotalSales'])
# df['TotalSales']=df['TotalSales'].astype('Int64')
print(df.loc[df['TotalSales']>0])
print(df.tail(10))
print(headers)
print(list(df.keys().values))