Quickbooks-API/server.py

406 lines
16 KiB
Python

import pprint
import xmltodict
import win32com.client
import xml.etree.ElementTree as ET
from time import time
def timing(f):
# @wraps(f)
def wrap(*args, **kw):
ts = time()
result = f(*args, **kw)
te = time()
print('func:%r args:[%r, %r] took: %2.6f sec' % \
(f.__name__, args, kw, te-ts))
return result
return wrap
class baseQBQuery:
def __init__(self, *args, **kwargs, ) -> None:
# print(f'{kwargs = }')
# print(f'{args = }')
self.QBXML = None
self.QBDict = {}
self.response_string = None
self.Rs = None
self.varDict = {}
### start ### variable to be replace with other class init value
self.onError = "continueOnError"
self.cleanIncludeRetElements = None
self.includeRetElements_allowed = None
self.retName = None
### end ### variable to be replace with other class init value
self.listOfDict = self.ListOfDict(None, self.varDict, self.retName)
self.statusCode = -1
self.statusMessage = ""
self.statusSeverity = ""
@timing
def create_QBXML(self):
version = "13.0"
dataDict = { ### Header for qmxml with version attribute
"?qbxml": {
"@version": version,
}
}
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Simple Example ###
# "@onError": "continueOnError",
# "GeneralSummaryReportQueryRq": {
# "GeneralSummaryReportType": self.GeneralSummaryReportType,
# }
# }
# }
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
# "@onError": "continueOnError",
# "ItemInventoryQueryRq": {
# "FullName": ["TACO:AA:TH-003AA",
# "TACO:AA:TH-010AA"]
# },
# }
# }
firstKey = str(list(self.QBDict.keys())[0])
dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
"@onError": self.onError,
firstKey: self.QBDict[firstKey]}}
print(f'{dataDict = }')
# # QBXML = '<?qbxml version="13.0"?>' + xmltodict.unparse(dataDict, pretty=True)
self.QBXML = xmltodict.unparse(dataDict, pretty=True).replace("</?qbxml>", "").replace(f'version="{version}"', f'version="{version}"?')
print(self.QBXML)
return self.QBXML
@timing
def connect_to_quickbooks(self, qbxml_query=None):
# Connect to Quickbooks
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
self.response_string = sessionManager.ProcessRequest(ticket, self.QBXML)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
# print(f'{self.response_string = }')
self.isDataOK()
return self.response_string
def isDataOK(self):
# print("isdataok")
# QBXML = ET.fromstring(self.response_string)
# print(xmltodict.parse(self.response_string))
self.varDict = xmltodict.parse(self.response_string)
# pprint.pprint(self.varDict)
self.listOfDict.varDict = self.varDict
self.listOfDict.filterKey = ["@statusCode"]
self.statusCode = self.listOfDict.firstValue().get('@statusCode',None)
self.listOfDict.filterKey = ["@statusMessage"]
self.statusMessage = self.listOfDict.firstValue().get('@statusMessage',None)
self.listOfDict.filterKey = ["@statusSeverity"]
self.statusSeverity = self.listOfDict.firstValue().get('@statusSeverity')
print(f'{self.statusCode = }, {self.statusMessage = }, {self.statusSeverity = }')
# isStatusOK=None
# for _ in self.find_listOfDict("FullName", ): ###berhasil
# print(f'{_ = }')
# for _ in self.gen_dict_extract("@statusMessage", self.varDict):
# print(_)
# if 'Status OK'.lower()==_.lower():
# print(_)
# isStatusOK = True
# break
# else:
# isStatusOK=False
# if self.ListOfDict.find_firstListOfDict("@statusMessage")['@statusMessage'].lower()=="status OK".lower():
# # print(f'{self.retName = }')
# self.Rs = self.find_firstListOfDict(self.retName)[self.retName]
# # self.Rs=self.returnRet(self.varDict)
# # # print(self.find_listOfDict("FullName", )) ###test
# # print(self.find_listOfDict("FullName", self.find_listOfDict("QBXMLMsgsRs1", ))) ###test
# # # print(self.find_listOfDict("@statusMessage", )) ###test
# # for _ in self.find_listOfDict("QBXMLMsgsRs",): ###trial blm berhasil
# # print(f'2{_ = }')
# # print(f'{self.Rs = }')
# # print(type(self.Rs))
# # print(self.find_firstListOfDict("FullName")['FullName'])
# # print(self.find_firstListOfDict("FullName"))
# # print(self.find_allListOfDict("FullName"))
def returnRet(self, varDict):
# pprint.pprint(self.varDict)
print(f'{varDict = }')
varDict = varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"]
print(f'{varDict = }')
for idx, key in enumerate(varDict):
# print(idx, key, len(varDict))
if "Ret" in key:
return varDict[key]
return None
def runCheck(self):
if self.varDict:
return True
if self.response_string:
return True
if self.Rs:
return True
if self.QBDict:
self.create_QBXML()
self.connect_to_quickbooks()
return True
return False
def filter(self, key=None):
if not self.runCheck():
return []
if isinstance(key, str):
key = [key]
elif isinstance(key, list):
pass
elif isinstance(key, dict):
key = [x for x,y in key.items()]
else:
return []
key = self.cleanIncludeRetElements(self.includeRetElements_allowed, key)
print(f'f {key = }')
if key:
return self.ListOfDict(key, self.varDict, self.retName)
else:
return self.ListOfDict(["abc"], self.varDict, self.retName)
### dont use this way, better returning class because the value if you assign to variable, the valu will be the last filterKey inputed
### if return class, every filterKey is an object, different from other filterKey
self.listOfDict.varDict = self.varDict
self.listOfDict.filterKey = key
return self.listOfDict
###
def all(self) -> dict:
if not self.runCheck():
return None
# return self.ListOfDict(None, self.varDict, self.retName).firstValue()
return self.ListOfDict(None, self.varDict, self.retName).firstValue()[self.retName]
### dont use this way
self.listOfDict.varDict = self.varDict
self.listOfDict.filterKey = self.retName
return self.listOfDict
###
class ListOfDict:
def __init__(self, key, var, retName) -> None:
# print(f'{key =}, {var =}')
# self.first = self.find_firstListOfDict(key)
if key:
self.filterKey = key
else:
self.filterKey = [retName]
# print(f"{self.filterKey = }")
self.varDict = var
# print("listofDict")
def __repr__(self) -> str:
return str(self.all())
def filter(self, filterKey):
self.filterKey=filterKey
def dct(self, var:dict=None, dataRetList:list=None):
for xdct in self.find_listOfDict(var, dataRetList):
print(f'{xdct = }')
_dct = [x[self.filterKey] for x in self.find_listOfDict(var, dataRetList)]
return _dct
def all(self, var:dict=None, dataRetList:list=None):
_lst = [x for x in self.find_listOfDict(var, dataRetList)]
# _lst = [x[self.filterKey] for x in self.find_listOfDict(var, dataRetList)]
# if _lst:
return _lst
# else:
# return []
def allOnlyValue(self, var:dict=None, dataRetList:list=None):
_lst = [x for x in self.find_listOfDict(var, dataRetList)]
return _lst
def first(self, var:dict=None, dataRetList:list=None):
return next(self.find_listOfDict( var, dataRetList), None)
def firstValue(self, var:dict=None, dataRetList:list=None):
# return self.first(var, dataRetList)[self.filterKey]
_val=self.first(var, dataRetList)
# print(f'{_val = }')
if _val:
# return _val[self.filterKey]
return _val
else:
return []
def last(self, var:dict=None, dataRetList:list=None):
# *_, last = self.find_listOfDict( var, dataRetList)
_val= self.all(var, dataRetList)
if _val:return _val[-1]
else: return []
def lastValue(self, var:dict=None, dataRetList:list=None):
_val=self.last(var, dataRetList)
# print(f"lastValue {_val =}")
if _val:
# return _val[self.filterKey]
return _val
else:
return []
def count(self, var:dict=None, dataRetList:list=None):
# print(len(self.all()))
return len(self.all())
# def find_listOfDict(self, var:dict=None, dataRetList:list=None, ):
# # print("genfinekeys")
# if var==None:
# var=self.varDict
# # print(f"{var = }")
# if dataRetList is None:
# dataRetList = []
# if isinstance(var, list):
# # print("list var")
# for _ in var:
# yield from self.find_listOfDict( _, )
# elif isinstance(var, dict):
# # print("dict var")
# if self.filterKey in var:
# dataRetList.append({self.filterKey: var[self.filterKey]})
# print(f"{dataRetList = }")
# yield {self.filterKey: var[self.filterKey]}
# else:
# # print(f'dict else var={var}')
# for _ in var:
# # print(_)
# yield from self.find_listOfDict(var[_], )
# return dataRetList
def find_listOfDict(self, var:dict=None, dataRetList:list=None, ):
# print("genfinekeys")
if var==None:
var=self.varDict
# print(f"{var = }")
if dataRetList is None:
dataRetList = []
if isinstance(var, list):
# print("list var")
for _ in var:
yield from self.find_listOfDict( _, )
elif isinstance(var, dict):
# print("dict var")
found = False
tempDct = {}
for fKey in self.filterKey:
# if self.filterKey in var:
if fKey in var:
found = True
tempDct[fKey]=var[fKey]
# print(f'{tempDct = }')
if found:
# dataRetList.append({self.filterKey: var[self.filterKey]})
dataRetList.append(tempDct)
# print(f"{dataRetList = }")
yield tempDct #{self.filterKey: var[self.filterKey]}
else:
# print(f'dict else var={var}')
for _ in var:
# print(_)
yield from self.find_listOfDict(var[_], )
return dataRetList
# def find_allListOfDict(self, key, var:dict=None, dataRetList:list=None):
# return [x for x in self.find_listOfDict(key, var, dataRetList)]
# def find_firstListOfDictValue(self, key, var:dict=None, dataRetList:list=None):
# return self.find_firstListOfDict(key, var, dataRetList)[key]
# def find_firstListOfDict(self, key, var:dict=None, dataRetList:list=None):
# return next(self.find_listOfDict(key, var, dataRetList), None)
# def find_listOfDict(self, key, var:dict=None, dataRetList:list=None, ):
# # print("genfinekeys")
# if var==None:
# var=self.varDict
# # print(f"{var = }")
# if dataRetList is None:
# dataRetList = []
# if isinstance(var, list):
# # print("list var")
# for _ in var:
# yield from self.find_listOfDict(key, _, )
# elif isinstance(var, dict):
# # print("dict var")
# if key in var:
# dataRetList.append({key: var[key]})
# # print(f"{dataRetList = }")
# yield {key: var[key]}
# else:
# # print(f'dict else var={var}')
# for _ in var:
# # print(_)
# yield from self.find_listOfDict(key, var[_], )
# return dataRetList
#### dont delete.
### Example of extracting dictionary value by key
def gen_dict_extract(self, key, var:dict=None): ### Utils
if var==None:
var=self.response_string
# print("var")
if hasattr(var,'items'): # hasattr(var,'items') for python 3, hasattr(var,'iteritems') for python 2
# print("hassattr")
for k, v in var.items(): # var.items() for python 3, var.iteritems() for python 2
# print(k,v)
if k == key:
yield v
if isinstance(v, dict):
for result in self.gen_dict_extract(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in self.gen_dict_extract(key, d):
yield result
def __str__(self, *args, **kwargs) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# print("__str__")
return str(self.all())
return self.__class__.__name__
return str(self.get_datarow())
# def get_datarow(self, *args):
# return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
# def get_dict(self, *args):
# return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
def status_ok(self, QBXML):
GSRQRs=QBXML.find('.//GeneralSummaryReportQueryRs')
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'status={status}')
if 'OK' in status:
return True, status_code
else:
return False, status_code
if __name__ == '__main__':
pass