mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-10 05:52:38 +07:00
Compare commits
2 Commits
faf5becb20
...
6815133b01
| Author | SHA1 | Date | |
|---|---|---|---|
| 6815133b01 | |||
| e38d9c35e0 |
1235
QBClass/QBClasses.py
Normal file
1235
QBClass/QBClasses.py
Normal file
File diff suppressed because it is too large
Load Diff
0
QBClass/__init__.py
Normal file
0
QBClass/__init__.py
Normal file
511
QBClass/server.py
Normal file
511
QBClass/server.py
Normal file
@ -0,0 +1,511 @@
|
||||
import pprint
|
||||
import xmltodict
|
||||
import win32com.client
|
||||
# import xml.etree.ElementTree as ET
|
||||
import xml.dom.minidom as minidom
|
||||
from time import time
|
||||
from .utils import cleanIncludeRetElements
|
||||
import json
|
||||
|
||||
def timing(f):
|
||||
# @wraps(f)
|
||||
def wrap(*args, **kw):
|
||||
ts = time()
|
||||
result = f(*args, **kw)
|
||||
te = time()
|
||||
print('func:%r args:[%r, %r] took: %2.6f sec' % \
|
||||
(f.__name__, args, kw, te-ts))
|
||||
return result
|
||||
return wrap
|
||||
|
||||
|
||||
class baseQBQuery:
|
||||
def __init__(self, *args, **kwargs, ) -> None:
|
||||
# print(f'{kwargs = }')
|
||||
# print(f'{args = }')
|
||||
self.QBXML = None
|
||||
self.QBDict = {}
|
||||
self.response_string = None
|
||||
self.Rs = None
|
||||
self.varDict = {}
|
||||
### start ### variable to be replace with other class init value
|
||||
self.onError = "continueOnError"
|
||||
# self.cleanIncludeRetElements = None
|
||||
self.includeRetElements_allowed = None
|
||||
self.retName = None
|
||||
self.defaultFilterKey = None
|
||||
self.class_debug = False
|
||||
### end ### variable to be replace with other class init value
|
||||
self.listOfDict = self.ListOfDict(None, self.varDict, self.retName, False)
|
||||
self.requestID = None
|
||||
self.statusCode = -1
|
||||
self.statusMessage = ""
|
||||
self.statusSeverity = ""
|
||||
self.statusOk = False
|
||||
if self.__class__.__name__=="baseQBQuery":
|
||||
print("baseqbquey same with classname")
|
||||
else:
|
||||
print("accessed from child class")
|
||||
print("basequery is accessed from ", self.__class__.__name__ + "Rq")
|
||||
|
||||
# @timing
|
||||
def create_QBXML(self):
|
||||
version = "13.0"
|
||||
dataDict = { ### Header for qmxml with version attribute
|
||||
"?qbxml": {
|
||||
"@version": version,
|
||||
}
|
||||
}
|
||||
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Simple Example ###
|
||||
# "@onError": "continueOnError",
|
||||
# "GeneralSummaryReportQueryRq": {
|
||||
# "GeneralSummaryReportType": self.GeneralSummaryReportType,
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
|
||||
# dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
|
||||
# "@onError": "continueOnError",
|
||||
# "ItemInventoryQueryRq": {
|
||||
# "FullName": ["TACO:AA:TH-003AA",
|
||||
# "TACO:AA:TH-010AA"]
|
||||
# },
|
||||
# }
|
||||
# }
|
||||
firstKey = str(list(self.QBDict.keys())[0])
|
||||
dataDict["?qbxml"]["QBXML"] = {"QBXMLMsgsRq": { ### Example with multiple FullName Item ###
|
||||
"@onError": self.onError,
|
||||
firstKey: self.QBDict[firstKey]}}
|
||||
# print(f'{dataDict = }')
|
||||
# # QBXML = '<?qbxml version="13.0"?>' + xmltodict.unparse(dataDict, pretty=True)
|
||||
self.QBXML = xmltodict.unparse(dataDict, pretty=True).replace("</?qbxml>", "").replace(f'version="{version}"', f'version="{version}"?')
|
||||
print(self.QBXML, type(self.QBXML))
|
||||
return self.QBXML
|
||||
|
||||
# @timing
|
||||
def connect_to_quickbooks(self, qbxml_query=None):
|
||||
# Connect to Quickbooks
|
||||
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
||||
sessionManager.OpenConnection('', 'DASA2')
|
||||
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
|
||||
ticket = sessionManager.BeginSession("", 2)
|
||||
|
||||
# Send query and receive response
|
||||
self.response_string = sessionManager.ProcessRequest(ticket, self.QBXML)
|
||||
|
||||
# Disconnect from Quickbooks
|
||||
sessionManager.EndSession(ticket) # Close the company file
|
||||
sessionManager.CloseConnection() # Close the connection
|
||||
|
||||
# Beautify response_string
|
||||
# print(f'{self.response_string = }')
|
||||
xml = minidom.parseString(self.response_string.replace("\n", ""))
|
||||
self.response_string = xml.toprettyxml()
|
||||
# print(f'{self.response_string = }')
|
||||
|
||||
self.statusOk = self.isDataOK()
|
||||
return self.statusOk
|
||||
return self.response_string
|
||||
|
||||
def isDataOK(self):
|
||||
# print("isdataok")
|
||||
# QBXML = ET.fromstring(self.response_string)
|
||||
# print(xmltodict.parse(self.response_string))
|
||||
self.varDict = xmltodict.parse(self.response_string)
|
||||
if self.class_debug:
|
||||
pprint.pprint("isDataOK", self.varDict)
|
||||
self.listOfDict.varDict = self.varDict
|
||||
self.listOfDict.filterKey = ["@requestID"]
|
||||
self.requestID = self.listOfDict.firstValue().get('@requestID',None)
|
||||
self.listOfDict.filterKey = ["@statusCode"]
|
||||
self.statusCode = self.listOfDict.firstValue().get('@statusCode',None)
|
||||
self.listOfDict.filterKey = ["@statusMessage"]
|
||||
self.statusMessage = self.listOfDict.firstValue().get('@statusMessage',None)
|
||||
self.listOfDict.filterKey = ["@statusSeverity"]
|
||||
self.statusSeverity = self.listOfDict.firstValue().get('@statusSeverity')
|
||||
self.listOfDict.filterKey = [self.retName]
|
||||
if self.class_debug:
|
||||
print(f'isDataOK -> {self.listOfDict.firstValue() = }')
|
||||
if self.listOfDict.firstValue().get(self.retName,None)==None:
|
||||
return False
|
||||
|
||||
print(f'{self.statusCode = }, {self.statusMessage = }, {self.statusSeverity = }')
|
||||
varDict = self.varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"]
|
||||
return True
|
||||
# isStatusOK=None
|
||||
|
||||
# for _ in self.findKeyInDict("FullName", ): ###berhasil
|
||||
# print(f'{_ = }')
|
||||
# for _ in self.gen_dict_extract("@statusMessage", self.varDict):
|
||||
# print(_)
|
||||
# if 'Status OK'.lower()==_.lower():
|
||||
# print(_)
|
||||
# isStatusOK = True
|
||||
# break
|
||||
# else:
|
||||
# isStatusOK=False
|
||||
|
||||
# if self.ListOfDict.find_firstListOfDict("@statusMessage")['@statusMessage'].lower()=="status OK".lower():
|
||||
# # print(f'{self.retName = }')
|
||||
# self.Rs = self.find_firstListOfDict(self.retName)[self.retName]
|
||||
# # self.Rs=self.returnRet(self.varDict)
|
||||
# # # print(self.findKeyInDict("FullName", )) ###test
|
||||
# # print(self.findKeyInDict("FullName", self.findKeyInDict("QBXMLMsgsRs1", ))) ###test
|
||||
# # # print(self.findKeyInDict("@statusMessage", )) ###test
|
||||
# # for _ in self.findKeyInDict("QBXMLMsgsRs",): ###trial blm berhasil
|
||||
# # print(f'2{_ = }')
|
||||
# # print(f'{self.Rs = }')
|
||||
# # print(type(self.Rs))
|
||||
# # print(self.find_firstListOfDict("FullName")['FullName'])
|
||||
# # print(self.find_firstListOfDict("FullName"))
|
||||
# # print(self.find_allListOfDict("FullName"))
|
||||
|
||||
def returnRet(self, varDict:dict = None):
|
||||
if varDict== None:
|
||||
varDict=self.varDict
|
||||
# pprint.pprint(self.varDict)
|
||||
|
||||
# print(f'{varDict = }')
|
||||
varDict = varDict['QBXML']['QBXMLMsgsRs'][self.__class__.__name__+"Rs"]
|
||||
# print(f'{varDict = }')
|
||||
|
||||
for idx, key in enumerate(varDict):
|
||||
# print(idx, key, len(varDict))
|
||||
if self.retName in key:
|
||||
return varDict[key]
|
||||
return None
|
||||
|
||||
def runCheck(self):
|
||||
# print("runCheck")
|
||||
if self.varDict:
|
||||
return True
|
||||
if self.response_string:
|
||||
return True
|
||||
if self.Rs:
|
||||
return True
|
||||
if self.QBDict:
|
||||
self.create_QBXML()
|
||||
if self.connect_to_quickbooks():
|
||||
return True
|
||||
return False
|
||||
|
||||
def __repr__(self) -> str:
|
||||
self.all()
|
||||
# print(f'{self.returnRet() = }')
|
||||
return self.response_string
|
||||
|
||||
def count(self) -> int:
|
||||
# objs = self.filter(self.defaultFilterKey).all()
|
||||
# print(f"{objs = }", type(objs))
|
||||
return len(self.filter(self.defaultFilterKey).all())
|
||||
|
||||
|
||||
def filter(self, key=None):
|
||||
print(f'{key = }')
|
||||
# print(f'{self.statusOk = }')
|
||||
if not self.runCheck():
|
||||
print("not runcheck")
|
||||
return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)
|
||||
return []
|
||||
if isinstance(key, str):
|
||||
key = [key]
|
||||
elif isinstance(key, list):
|
||||
pass
|
||||
elif isinstance(key, dict):
|
||||
key = [x for x,y in key.items()]
|
||||
elif key is None:
|
||||
# print(f"key is none. {self.retName = }")
|
||||
return self.ListOfDict(self.retName, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)#.firstValue()#[self.retName]
|
||||
else:
|
||||
return []
|
||||
key = cleanIncludeRetElements(self.includeRetElements_allowed, key)
|
||||
# print(f'f {key = }')
|
||||
if key:
|
||||
return self.ListOfDict(key, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)
|
||||
else:
|
||||
return self.ListOfDict(["abc"], self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk)
|
||||
### dont use this way, better returning class because the value if you assign to variable, the valu will be the last filterKey inputed
|
||||
### if return class, every filterKey is an object, different from other filterKey
|
||||
self.listOfDict.varDict = self.varDict
|
||||
self.listOfDict.filterKey = key
|
||||
return self.listOfDict
|
||||
###
|
||||
|
||||
def all(self) -> dict:
|
||||
if not self.runCheck():
|
||||
return None
|
||||
# return self.ListOfDict(None, self.varDict, self.retName).firstValue()
|
||||
temp = self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue()[self.retName]
|
||||
if self.requestID:
|
||||
temp['requestID']=self.requestID
|
||||
# print(f'{temp = }')
|
||||
return temp
|
||||
# return self.ListOfDict(None, self.varDict, self.retName, self.includeRetElements_allowed, self.statusOk).firstValue()[self.retName]
|
||||
### dont use this way
|
||||
self.listOfDict.varDict = self.varDict
|
||||
self.listOfDict.filterKey = self.retName
|
||||
return self.listOfDict
|
||||
###
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.all())
|
||||
|
||||
|
||||
class ListOfDict:
|
||||
def __init__(self, key, var, retName, includeRetElements_allowed:list ,statusOk:bool = True) -> None:
|
||||
# print(f'{key =}, {var =}')
|
||||
# self.first = self.find_firstListOfDict(key)
|
||||
if key:
|
||||
if isinstance(key, str):
|
||||
self.filterKey = [key]
|
||||
else:
|
||||
self.filterKey = key
|
||||
else:
|
||||
self.filterKey = [retName]
|
||||
# print(f"{self.filterKey = }")
|
||||
self.varDict = var
|
||||
self.statusOk = statusOk
|
||||
self._includeRetElements_allowed = includeRetElements_allowed
|
||||
# print("listofDict")
|
||||
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return str(self.all())
|
||||
|
||||
# def filter(self, filterKey):
|
||||
# self.filterKey=filterKey
|
||||
|
||||
|
||||
def getValuesOf(self, key:str=None, var:dict=None, dataRetList:list=None) :
|
||||
if key==None:
|
||||
key = self.filterKey
|
||||
elif isinstance(key, str):
|
||||
key=[key]
|
||||
elif isinstance(key, list):
|
||||
pass
|
||||
else:
|
||||
raise TypeError(f'{key=} should be string not {type(key)}')
|
||||
print(key)
|
||||
key = cleanIncludeRetElements(self._includeRetElements_allowed, key)
|
||||
print(key)
|
||||
if len(key)==0:
|
||||
key = self.filterKey
|
||||
else:
|
||||
key = key
|
||||
# print(f'getvaluesof {key = }')
|
||||
# for xdct in self.findKeyInDict(var, dataRetList):
|
||||
# print(f'{xdct = }', type(xdct), self.filterKey[0], key)
|
||||
lstresult = []
|
||||
for x in self.findKeyInDict(var, dataRetList):
|
||||
templstresult = []
|
||||
for y in key:
|
||||
templstresult.append(x.get(y, ""))
|
||||
lstresult.append(templstresult)
|
||||
print(f'{lstresult[-1] =}')
|
||||
return lstresult
|
||||
_lst = [x[key] for x in self.findKeyInDict(var, dataRetList)]
|
||||
# print(_dct)
|
||||
return _lst
|
||||
|
||||
def all(self, var:dict=None, dataRetList:list=None) -> list:
|
||||
# print(f'{self.statusOk = }')
|
||||
if not self.statusOk:
|
||||
return []
|
||||
_lst = [x for x in self.findKeyInDict(var, dataRetList)]
|
||||
# _lst = [x[self.filterKey] for x in self.findKeyInDict(var, dataRetList)]
|
||||
# if _lst:
|
||||
return _lst
|
||||
# else:
|
||||
# return []
|
||||
|
||||
def allOnlyValue(self, var:dict=None, dataRetList:list=None):
|
||||
if not self.statusOk:
|
||||
return []
|
||||
_lst = [x for x in self.findKeyInDict(var, dataRetList)]
|
||||
return _lst
|
||||
|
||||
def first(self, var:dict=None, dataRetList:list=None) -> dict:
|
||||
if not self.statusOk:
|
||||
return {}
|
||||
return next(self.findKeyInDict( var, dataRetList), {})
|
||||
|
||||
def firstValue(self, var:dict=None, dataRetList:list=None) ->dict:
|
||||
if not self.statusOk:
|
||||
print("firstValue statusOk is False")
|
||||
return {}
|
||||
# return self.first(var, dataRetList)[self.filterKey]
|
||||
_val=self.first(var, dataRetList)
|
||||
# print(f'{_val = }')
|
||||
if _val:
|
||||
# return _val[self.filterKey]
|
||||
return _val
|
||||
else:
|
||||
return {}
|
||||
|
||||
def last(self, var:dict=None, dataRetList:list=None) -> dict:
|
||||
if not self.statusOk:
|
||||
return {}
|
||||
# *_, last = self.findKeyInDict( var, dataRetList)
|
||||
_val= self.all(var, dataRetList)
|
||||
if _val:return _val[-1]
|
||||
else: return {}
|
||||
|
||||
def lastValue(self, var:dict=None, dataRetList:list=None) -> dict:
|
||||
if not self.statusOk:
|
||||
return {}
|
||||
_val=self.last(var, dataRetList)
|
||||
# print(f"lastValue {_val =}")
|
||||
if _val:
|
||||
# return _val[self.filterKey]
|
||||
return _val
|
||||
else:
|
||||
return {}
|
||||
|
||||
def count(self, var:dict=None, dataRetList:list=None) -> int:
|
||||
if not self.statusOk:
|
||||
return 0
|
||||
# print(len(self.all()))
|
||||
return len(self.all())
|
||||
|
||||
# def findKeyInDict(self, var:dict=None, dataRetList:list=None, ):
|
||||
# # print("genfinekeys")
|
||||
# if var==None:
|
||||
# var=self.varDict
|
||||
# # print(f"{var = }")
|
||||
# if dataRetList is None:
|
||||
# dataRetList = []
|
||||
# if isinstance(var, list):
|
||||
# # print("list var")
|
||||
# for _ in var:
|
||||
# yield from self.findKeyInDict( _, )
|
||||
# elif isinstance(var, dict):
|
||||
# # print("dict var")
|
||||
# if self.filterKey in var:
|
||||
# dataRetList.append({self.filterKey: var[self.filterKey]})
|
||||
# print(f"{dataRetList = }")
|
||||
# yield {self.filterKey: var[self.filterKey]}
|
||||
# else:
|
||||
# # print(f'dict else var={var}')
|
||||
# for _ in var:
|
||||
# # print(_)
|
||||
# yield from self.findKeyInDict(var[_], )
|
||||
# return dataRetList
|
||||
|
||||
def findKeyInDict(self, var:dict=None, dataRetList:list=None, ):
|
||||
# print("genfinekeys")
|
||||
if var==None:
|
||||
var=self.varDict
|
||||
# print(f"{var = }")
|
||||
if dataRetList is None:
|
||||
dataRetList = []
|
||||
if isinstance(var, list):
|
||||
# print("list var")
|
||||
for _ in var:
|
||||
yield from self.findKeyInDict( _, )
|
||||
elif isinstance(var, dict):
|
||||
# print("dict var")
|
||||
found = False
|
||||
tempDct = {}
|
||||
for fKey in self.filterKey:
|
||||
# if self.filterKey in var:
|
||||
if fKey in var:
|
||||
found = True
|
||||
tempDct[fKey]=var[fKey]
|
||||
# print(f'{tempDct = }')
|
||||
if found:
|
||||
# dataRetList.append({self.filterKey: var[self.filterKey]})
|
||||
dataRetList.append(tempDct)
|
||||
# print(f"{dataRetList = }")
|
||||
yield tempDct #{self.filterKey: var[self.filterKey]}
|
||||
else:
|
||||
# print(f'dict else var={var}')
|
||||
for _ in var:
|
||||
# print(_)
|
||||
yield from self.findKeyInDict(var[_], )
|
||||
return dataRetList
|
||||
|
||||
|
||||
# def find_allListOfDict(self, key, var:dict=None, dataRetList:list=None):
|
||||
# return [x for x in self.findKeyInDict(key, var, dataRetList)]
|
||||
|
||||
# def find_firstListOfDictValue(self, key, var:dict=None, dataRetList:list=None):
|
||||
# return self.find_firstListOfDict(key, var, dataRetList)[key]
|
||||
|
||||
# def find_firstListOfDict(self, key, var:dict=None, dataRetList:list=None):
|
||||
# return next(self.findKeyInDict(key, var, dataRetList), None)
|
||||
|
||||
# def findKeyInDict(self, key, var:dict=None, dataRetList:list=None, ):
|
||||
# # print("genfinekeys")
|
||||
# if var==None:
|
||||
# var=self.varDict
|
||||
# # print(f"{var = }")
|
||||
# if dataRetList is None:
|
||||
# dataRetList = []
|
||||
# if isinstance(var, list):
|
||||
# # print("list var")
|
||||
# for _ in var:
|
||||
# yield from self.findKeyInDict(key, _, )
|
||||
# elif isinstance(var, dict):
|
||||
# # print("dict var")
|
||||
# if key in var:
|
||||
# dataRetList.append({key: var[key]})
|
||||
# # print(f"{dataRetList = }")
|
||||
# yield {key: var[key]}
|
||||
# else:
|
||||
# # print(f'dict else var={var}')
|
||||
# for _ in var:
|
||||
# # print(_)
|
||||
# yield from self.findKeyInDict(key, var[_], )
|
||||
# return dataRetList
|
||||
|
||||
|
||||
#### dont delete.
|
||||
### Example of extracting dictionary value by key
|
||||
def gen_dict_extract(self, key, var:dict=None): ### Utils
|
||||
if var==None:
|
||||
var=self.response_string
|
||||
# print("var")
|
||||
if hasattr(var,'items'): # hasattr(var,'items') for python 3, hasattr(var,'iteritems') for python 2
|
||||
# print("hassattr")
|
||||
for k, v in var.items(): # var.items() for python 3, var.iteritems() for python 2
|
||||
# print(k,v)
|
||||
if k == key:
|
||||
yield v
|
||||
if isinstance(v, dict):
|
||||
for result in self.gen_dict_extract(key, v):
|
||||
yield result
|
||||
elif isinstance(v, list):
|
||||
for d in v:
|
||||
for result in self.gen_dict_extract(key, d):
|
||||
yield result
|
||||
|
||||
def __str__(self, *args, **kwargs) -> str:
|
||||
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
|
||||
# print("__str__")
|
||||
return str(self.all())
|
||||
return self.__class__.__name__
|
||||
return str(self.get_datarow())
|
||||
|
||||
|
||||
# def get_datarow(self, *args):
|
||||
# return self._get_datarow(self.connect_to_quickbooks(self.create_QBXML()))
|
||||
|
||||
# def get_dict(self, *args):
|
||||
# return pd.DataFrame(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
|
||||
|
||||
def status_ok(self, QBXML):
|
||||
GSRQRs=QBXML.find('.//GeneralSummaryReportQueryRs')
|
||||
status_code = GSRQRs.attrib #.get('statusCode')
|
||||
# print(GSRQRs.attrib)
|
||||
# print(GSRQRs.attrib['statusCode'])
|
||||
status=GSRQRs.attrib.get('statusMessage')
|
||||
|
||||
print(f'status={status}')
|
||||
if 'OK' in status:
|
||||
return True, status_code
|
||||
else:
|
||||
return False, status_code
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||
142
QBClass/utils.py
Normal file
142
QBClass/utils.py
Normal file
@ -0,0 +1,142 @@
|
||||
|
||||
from time import time
|
||||
from typing import Union
|
||||
|
||||
|
||||
def getItemFullName(shortItem:str)->str:
|
||||
# def searchitem(SearchItem = '001aba', QTY = 0, dbTable="itemlist", column="FullName"):
|
||||
FullName = shortItem
|
||||
return FullName
|
||||
|
||||
def getCustomerFullName(shortCustomerName:str)->str:
|
||||
# def searchitem(SearchItem = '001aba', QTY = 0, dbTable="itemlist", column="FullName"):
|
||||
FullName = shortCustomerName
|
||||
return FullName
|
||||
|
||||
def getRefNumber()->str:
|
||||
return "refnumber345"
|
||||
|
||||
def prepareData(dt:str):
|
||||
dtLines = dt.strip().split('\n')
|
||||
smoothdtLines:list[str] = []
|
||||
for dtLine in dtLines:
|
||||
if dtLine.strip()=='':
|
||||
continue
|
||||
smoothdtLines.append(dtLine)
|
||||
for smIdx, smdtLine in enumerate(smoothdtLines):
|
||||
itemName, Qty, memoLine = ('', '', '')
|
||||
if smIdx==0 and ':' in smdtLine:
|
||||
CustomerName = smdtLine.split(':')[1].strip()
|
||||
CustomerRef_FullName = getCustomerFullName(CustomerName)
|
||||
RefNumber = getRefNumber()
|
||||
continue
|
||||
elif smIdx==0 and ':' not in smdtLine:
|
||||
return False
|
||||
else: #proccess the second till end lines
|
||||
for splitChar in [':', '=']:
|
||||
if splitChar in smdtLine:
|
||||
objects = smdtLine.split(splitChar)
|
||||
if len(objects)==2:
|
||||
itemName, Qty = tuple(smdtLine.split(splitChar))
|
||||
if len(objects)==3:
|
||||
itemName, Qty, memoLine = tuple(smdtLine.split(splitChar))
|
||||
break
|
||||
itemName = itemName.strip()
|
||||
Qty=Qty.strip()
|
||||
memoLine = memoLine.strip()
|
||||
print(CustomerRef_FullName, RefNumber, itemName, Qty, memoLine)
|
||||
ItemRef_FullName = getItemFullName(itemName)
|
||||
|
||||
if memoLine:
|
||||
print(memoLine)
|
||||
else:
|
||||
print("no memo")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def makeLinkedTxnList(x:list, key:str=None)->list:
|
||||
txnIDs=[]
|
||||
if isinstance(x, dict):
|
||||
x=[x]
|
||||
print(f'{type(x)=}')
|
||||
for y in x: #y = each dict of LinkedTxn, and it has a listofdict or a dict inside
|
||||
linkedTxns=y.get('LinkedTxn',None)
|
||||
if linkedTxns is None:
|
||||
return
|
||||
if isinstance(linkedTxns, dict):
|
||||
linkedTxns = [linkedTxns]
|
||||
for linkedTxn in linkedTxns:
|
||||
# print(f'{key = }')
|
||||
if key and linkedTxn.get('TxnType')==key:
|
||||
txnIDs.append(linkedTxn.get('TxnID',None))
|
||||
print(f'{txnIDs = }')
|
||||
return txnIDs
|
||||
|
||||
def makeAList(x:Union[str , dict , list], dictvalue:bool=False, listofdict:bool=False)->list :
|
||||
if isinstance(x,str):
|
||||
x=[x]
|
||||
elif isinstance(x, list) and not listofdict: #lists of str or int values
|
||||
pass
|
||||
elif isinstance(x, list) and not dictvalue: #lists of dict, and get all the keys
|
||||
res=[]
|
||||
for z in x:
|
||||
for k in z:
|
||||
res.append(k)
|
||||
return res
|
||||
elif isinstance(x, list) and dictvalue: #lists of dict, and get all the values
|
||||
res=[]
|
||||
for z in x:
|
||||
for k,v in z.items():
|
||||
res.append(v)
|
||||
return res
|
||||
elif isinstance(x, list): #lists of str or int values
|
||||
res=[]
|
||||
for z in x:
|
||||
for k in z:
|
||||
res.append(k)
|
||||
return res
|
||||
elif isinstance(x, dict) and not dictvalue: #get a list of keys from dict
|
||||
x=[k for k in x]
|
||||
elif isinstance(x, dict): #get a list of values from dict
|
||||
x=[v for k,v in x.items()]
|
||||
|
||||
return x
|
||||
|
||||
def timing(f):
|
||||
# @wraps(f)
|
||||
def wrap(*args, **kw):
|
||||
ts = time()
|
||||
result = f(*args, **kw)
|
||||
te = time()
|
||||
print('func:%r args:[%r, %r] took: %2.6f sec' % \
|
||||
(f.__name__, args, kw, te-ts))
|
||||
return result
|
||||
return wrap
|
||||
|
||||
|
||||
def cleanIncludeRetElements(includeRetElements_allowed:list, includeRetElements:Union[list, str], default_val:str=None):
|
||||
if default_val==None:
|
||||
default_val = []
|
||||
# else:
|
||||
# default = [default]
|
||||
iREs = []
|
||||
# print(f'{includeRetElements_allowed = }\n{includeRetElements = }')
|
||||
if isinstance(includeRetElements, str): #if user put 1 str argument in IncludeRetElements, change it to list
|
||||
includeRetElements = [includeRetElements]
|
||||
for iRE in includeRetElements:
|
||||
for iRE_a in includeRetElements_allowed:
|
||||
if iRE.lower() == iRE_a.lower():
|
||||
iREs.append(iRE_a)
|
||||
break
|
||||
if len(iREs)>0:
|
||||
return iREs
|
||||
else:
|
||||
return [default_val]
|
||||
|
||||
if __name__=='__main__':
|
||||
#Example makelist
|
||||
print(makeAList("hallo"), makeAList({'world':1, 'case':'another'}), makeAList(['here', 'we', 'go']), makeAList(1), makeAList({'world':1, 'case':'another'}, True))
|
||||
alistofdict = [{'RefNumber': '0001'}, {'RefNumber': '0022'}, {'RefNumber': '0004'}, {'RefNumber': '0006'}, {'RefNumber': '0008'}, {'RefNumber': '0009'}, {'RefNumber': '0010'}, {'RefNumber': '0015'}, {'RefNumber': '0065'}, {'RefNumber': '0017'}, {'RefNumber': '0019'}, {'RefNumber': '0023'}, {'RefNumber': '0024'}, {'RefNumber': '0025'}, {'RefNumber': '0027'}, {'RefNumber': '0029'}, {'RefNumber': '0030'}, {'RefNumber': '0045'}, {'RefNumber': '0032'}, {'RefNumber': '0034'}, {'RefNumber': '0035'}, {'RefNumber': '0036'}, {'RefNumber': '0040'}, {'RefNumber': '0003'}, {'RefNumber': '0031'}, {'RefNumber': '0033'}, {'RefNumber': '0037'}, {'RefNumber': '0038'}, {'RefNumber': '0039'}, {'RefNumber': '0047'}, {'RefNumber': '0005'}, {'RefNumber': '0007'}, {'RefNumber': '0012'}, {'RefNumber': '0013'}, {'RefNumber': '0014'}, {'RefNumber': '0016'}, {'RefNumber': '0011'}, {'RefNumber': '0021'}, {'RefNumber': '0002'}, {'RefNumber': '0018'}, {'RefNumber': '0020'}, {'RefNumber': '0026'}, {'RefNumber': '0028'}, {'RefNumber': '0041'}, {'RefNumber': '0051'}, {'RefNumber': '0062'}, {'RefNumber': '0042'}, {'RefNumber': '0043'}, {'RefNumber': '0053'}, {'RefNumber': '0055'}, {'RefNumber': '0057'}, {'RefNumber': '0058'}, {'RefNumber': '0059'}, {'RefNumber': '0061'}, {'RefNumber': '0068'}, {'RefNumber': '0044'}, {'RefNumber': '0046'}, {'RefNumber': '0048'}, {'RefNumber': '0049'}, {'RefNumber': '0050'}, {'RefNumber': '0086'}, {'RefNumber': '0054'}, {'RefNumber': '0056'}, {'RefNumber': '0060'}, {'RefNumber': '0052'}, {'RefNumber': '0064'}, {'RefNumber': '0066'}, {'RefNumber': '0070'}, {'RefNumber': '0071'}, {'RefNumber': '0072'}, {'RefNumber': '0075'}, {'RefNumber': '0076'}, {'RefNumber': '0078'}, {'RefNumber': '0080'}, {'RefNumber': '0081'}, {'RefNumber': '0085'}, {'RefNumber': '0067'}, {'RefNumber': '0069'}, {'RefNumber': '0073'}, {'RefNumber': '0074'}, {'RefNumber': '0077'}, {'RefNumber': '0079'}, {'RefNumber': '0082'}, {'RefNumber': '0083'}, {'RefNumber': '0084'}, {'RefNumber': '0087'}, {'RefNumber': '0089'}, {'RefNumber': '0091'}, {'RefNumber': '0092'}, {'RefNumber': '0093'}, {'RefNumber': '0094'}, {'RefNumber': '0095'}, {'RefNumber': '0096'}, {'RefNumber': '0167'}, {'RefNumber': '0088'}, {'RefNumber': '0097'}, {'RefNumber': '0098'}, {'RefNumber': '0099'}, {'RefNumber': '0100'}, {'RefNumber': '0103'}]
|
||||
print(makeAList(alistofdict,listofdict=True, dictvalue=True))
|
||||
21
main.py
21
main.py
@ -18,6 +18,7 @@ import pdfexcel4DNwithxlrd
|
||||
from ItemInventoryQuery import ItemInventoryQuery
|
||||
from SO_to_Inv import readSO, CustomerQuery
|
||||
import os
|
||||
from QBClass.QBClasses import SalesOrderAdd
|
||||
|
||||
import pprint
|
||||
|
||||
@ -223,4 +224,22 @@ async def get_generalsalesreport(request: Request):
|
||||
print(f'{datetime.datetime.now()} get_gsr 4 finish -> {type(getdict)}, {getdict}')
|
||||
print("")
|
||||
|
||||
return datas1[0], responseRt.get_total(), datas1[1]
|
||||
return datas1[0], responseRt.get_total(), datas1[1]
|
||||
|
||||
|
||||
@app.post('/dasa2/salesorderadd')
|
||||
async def salesorderadd(request: Request):
|
||||
salesorderdict = await request.body()
|
||||
print("")
|
||||
try:
|
||||
salesorderdict = json.loads(salesorderdict)
|
||||
except:
|
||||
print('error salesorderadd()')
|
||||
return {'message':'error getting SalesOrder Data. it is not json'}
|
||||
print(f'{type(salesorderdict)}, {salesorderdict = }')
|
||||
|
||||
so = SalesOrderAdd(**salesorderdict)
|
||||
print(so.all())
|
||||
return so.all()
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user