dasaproject/SO_to_Inv/PriceLevelQuery.py
2023-09-27 15:49:36 +07:00

444 lines
22 KiB
Python

import xml.etree.ElementTree as ET
import win32com.client
import xmltodict
import pprint
import datetime
import pandas as pd
from datetime import date
import timeit
class PriceLevelQuery:
def __init__(self, **kwargs) -> None:
# print(f'kwargs:{kwargs}')
# print(args)
# self.SalesOrderList=[]
self.IncludeLineItems = kwargs['IncludeLineItems'] if 'IncludeLineItems' in kwargs else 'true'
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else []
self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None
self.FullName = kwargs['FullName'] if 'FullName' in kwargs and isinstance(kwargs['FullName'], list) else []
self.NameFilter = kwargs['NameFilter'] if 'NameFilter' in kwargs else None
self.ItemRef = kwargs['ItemRef'] if 'ItemRef' in kwargs else None
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
if type(attrib) is not dict:
attrib = {}
ele = ET.SubElement(parentNode, thisNode)
for x in attrib:
ele.set(x, attrib[x])
ele.text = text
tail = "\n"
for x in range(whiteSpace):
tail = tail + " "
ele.tail = tail
return ele
def create_QBXML(self):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
# QBXMLMsgsRq.set("onError", "continueOnError")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
PriceLevelQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "PriceLevelQueryRq","\n " )
if len(self.FullName)>0:
for x in self.FullName:
FullName = self.create_sub_element(ET, PriceLevelQueryRq, "FullName", x, 6)
if self.NameFilter:
NameFilter = self.create_sub_element(ET, PriceLevelQueryRq, "NameFilter","\n ", 6)
MatchCriterion = self.create_sub_element(ET, NameFilter, "MatchCriterion", "StartsWith")
Name = self.create_sub_element(ET, NameFilter, "Name", self.NameFilter)
if self.ItemRef:
ItemRef = self.create_sub_element(ET, PriceLevelQueryRq, "ItemRef", "\n ")
ItemRefFullName = self.create_sub_element(ET, ItemRef, "FullName", self.ItemRef)
if len(self.IncludeRetElement)>0:
for x in self.IncludeRetElement:
IncludeRetElement = self.create_sub_element(ET, PriceLevelQueryRq, "IncludeRetElement", x, 4)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
# print(f'create_QBXML->qbxml_query: {qbxml_query}')
return qbxml_query
def create_invoiceadd_QBXML(self):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
InvoiceAddRq = self.create_sub_element(ET, QBXMLMsgsRq, "InvoiceAddRq", "\n ",2 )
InvoiceAdd = self.create_sub_element(ET, InvoiceAddRq, "InvoiceAdd", "\n ",4 )
CustomerRef = self.create_sub_element(ET, InvoiceAdd, "CustomerRef", "\n ",8 )
FullName = self.create_sub_element(ET, CustomerRef, "FullName", self.SalesOrderList[1]['CustomerFullName'], 8 )
TemplateRef = self.create_sub_element(ET, InvoiceAdd, "TemplateRef", "\n ", 8 )
tempFullName = self.create_sub_element(ET, TemplateRef, "FullName", "DBW Invoice (11%)", 8 )
today = str(date.today())
TxnDate = self.create_sub_element(ET, InvoiceAdd, "TxnDate", today,8) # self.DN['TxnDate'], 8 )
RefNumber = self.create_sub_element(ET, InvoiceAdd, "RefNumber", today,8 ) # self.DN['DNRefNum'], 8 )
ShipDate = self.create_sub_element(ET, InvoiceAdd, "ShipDate", today,8) # self.DN['TxnDate'], 8 )
# Memo = self.create_sub_element(ET, InvoiceAdd, "Memo", self.DN['Memo'], 10 )
disc_amount = 0
for soidx, salesorder in enumerate(self.SalesOrderList):
SOTxnId = salesorder['TxnID']
disc_amount+=int(salesorder['Disc_Amount'])
print(f'create_invoiceadd_QBXML->SOTxnId: {SOTxnId}')
for itemline in salesorder['SalesOrderLineRet']:
# if 'DNQuantity' in itemline:
InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
# Quantity = self.create_sub_element(ET, InvoiceLineAdd, "Quantity", str(itemline['DNQuantity'] ), 12 )
# Quantity = self.create_sub_element(ET, InvoiceLineAdd, "Quantity", str(itemline['BackOrdered'] ), 12 )
# UnitOfMeasure = self.create_sub_element(ET, InvoiceLineAdd, "UnitOfMeasure", str(itemline['UOM']), 12 )
LinkToTxn = self.create_sub_element(ET, InvoiceLineAdd, "LinkToTxn", "\n ", 10 )
TxnID = self.create_sub_element(ET, LinkToTxn, "TxnID", SOTxnId,14 )
TxnLineID = self.create_sub_element(ET, LinkToTxn, "TxnLineID", itemline['TxnLineID'], 12 )
if soidx == len(self.SalesOrderList)-1: #last list then set the'400_Sales_discount'
print(f'disc_amount:{format(disc_amount, ".2f")}')
# disc_amount=format(disc_amount, ".2f")
InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
ItemRef = self.create_sub_element(ET, InvoiceLineAdd, "ItemRef", "\n ", 12)
ItemFullName = self.create_sub_element(ET, ItemRef, "FullName", "400_Sales Discount", 12)
ItemRate = self.create_sub_element(ET, InvoiceLineAdd, "Rate", str(disc_amount), 10)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
print(f'create_invoiceadd_QBXML->Create_Invoiceadd_QBXML: {qbxml_query}')
# print(f"replyfrom qbxml:{self.connect_to_quickbooks(qbxml_query)}")
return qbxml_query
def connect_to_quickbooks(self, qbxml_query):
# Connect to Quickbooks
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
# sessionManager.OpenConnection('', 'DASA')
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
# print(enumfodnc)
# print(enumfodnc.qbFileOpenDoNotCare)
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
# print (f'response_string:{response_string}')
return response_string
def __str__(self, *args) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# print("__str__")
return str(self.get_sales_order_header())
# return "hello"
def get_sales_order_header(self, *args):
return self. _get_sales_order_header(self.connect_to_quickbooks(self.create_QBXML()))
def status_ok(self, QBXML): #for InvoiceAddRS
tree = ET.fromstring(QBXML)
GSRQRs = tree.find(".//PriceLevelQueryRs")
# print(f"GSRQRs:{GSRQRs}")
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'status={status}')
if 'OK' in status:
return True, status_code
else:
return False, status_code
def _get_sales_order_header(self, response_string):
print('_get_sales_order_header')
# print(f'responsestring:{response_string}')
QBXML = ET.fromstring(response_string)
datadict = {}
SalesOrderdict = {}
_SalesOrderlist = []
SalesOrderRets = QBXML.findall('.//SalesOrderRet')
# print(SalesOrderRets)
for SalesOrderRet in SalesOrderRets:
RefNumber = SalesOrderRet.find('RefNumber').text
# Memo = SalesOrderRet.find('Memo').text
CustomerFullName = SalesOrderRet.find('CustomerRef/FullName').text
TxnID = SalesOrderRet.find('TxnID').text
TotalAmount = SalesOrderRet.find('TotalAmount').text
IsFullyInvoiced = SalesOrderRet.find('IsFullyInvoiced').text
IsManuallyClosed = SalesOrderRet.find('IsManuallyClosed').text
# print(CustomerFullName, TxnID, TotalAmount)
SalesOrderdict = {'RefNumber':RefNumber, 'CustomerFullName':CustomerFullName, 'TxnID':TxnID,
'TotalAmount':TotalAmount, 'IsFullyInvoiced':IsFullyInvoiced, 'IsManuallyClosed':IsManuallyClosed, 'SalesOrderLineRet':[]}
SalesOrderLineRet = SalesOrderRet.findall('SalesOrderLineRet')
# print(len(SalesOrderLineRet))
if len(SalesOrderLineRet) > 0:
disc_amount=0
for SalesOrderLineRet in SalesOrderLineRet:
TxnLineID = SalesOrderLineRet.find('TxnLineID').text
ItemFullName = SalesOrderLineRet.find('ItemRef/FullName').text
Quantity = SalesOrderLineRet.find('Quantity').text
UnitOfMeasure = SalesOrderLineRet.find('UnitOfMeasure').text
Rate = float(SalesOrderLineRet.find('Rate').text)
Amount = float(SalesOrderLineRet.find('Amount').text)
disc_amount+=float(Quantity)*2000 #testing only
Invoiced = SalesOrderLineRet.find('Invoiced').text
LineIsManuallyClosed = SalesOrderLineRet.find('IsManuallyClosed').text
# print(TxnLineID, ItemFullName)
BackOrdered = float(Quantity) - float(Invoiced)
if BackOrdered:
SalesOrderLinedict = {'TxnLineID':TxnLineID,
'ItemFullName':ItemFullName,
'Quantity':Quantity,
'UOM':UnitOfMeasure,
'Rate':Rate,
'Amount':Amount,
'BackOrdered':BackOrdered,
'Invoiced':Invoiced,
'LineIsManuallyClosed':LineIsManuallyClosed,
}
SalesOrderdict['SalesOrderLineRet'].append(SalesOrderLinedict)
SalesOrderdict['Disc_Amount']=disc_amount
_SalesOrderlist.append(SalesOrderdict)
# print(_SalesOrderlist)
self.SalesOrderList=_SalesOrderlist
# print(f'_get_sales_order_header->Salesorderlist: {self.SalesOrderList}')
return self.SalesOrderList
def addDiscountToInvoiceList(self, _dict:dict):
print("addDiscountToInvoiceList")
def addDNQtyToSalesOrderList(self, _dict:dict):
_bolfoundrefnum=False
_bol_dictisadded=False
Error_msg = None
for poidx, _po in enumerate(self.SalesOrderList):
if _po['RefNumber']==_dict['RefNum']:
_bolfoundrefnum=True
if len(_po['SalesOrderLineRet'])>0:
for polineidx, _poline in enumerate(_po['SalesOrderLineRet']):
pass
if _poline['ItemFullName']==_dict['FullName']:
if _poline['UOM'].upper()==_dict['UOM'].upper():
# first do UOM in _dict convert treatment
QuantityIn_dict = _dict['Quantity']
if _dict['UOM'].upper().startswith('ROLL_'):
print(f"addDNQtyToSalesOrderList->DNqty:{_dict['Quantity']}, Roll_:{_dict['UOM'].split('_')[1]}")
QuantityIn_dict = _dict['Quantity'] * int(_dict['UOM'].split("_")[1])
pass
elif _dict['UOM'].upper() == 'BOX' and _dict['Item No'].upper() == "CUTTER":
print("addDNQtyToSalesOrderList->cutter")
elif _dict['UOM'].upper() == 'BOX' and (_dict['Item No'].upper().startswith("EB-") or _dict['Item No'].upper().startswith("TA-")):
print("addDNQtyToSalesOrderList->LEM")
if _dict['Item No'].split("-")[1].endswith("1006"):
QuantityIn_dict = QuantityIn_dict * 12
elif _dict['Item No'].split("-")[1].endswith("1025"):
QuantityIn_dict = QuantityIn_dict * 6
elif _dict['Item No'].split("-")[1].endswith("1100"):
pass
print("1100 lem")
elif _dict['UOM'].upper() == 'BOX' and (_dict['Item No'].upper().startswith("TL-") or _dict['Item No'].upper().startswith("TFL-")):
print("addDNQtyToSalesOrderList->Lock")
QuantityIn_dict = QuantityIn_dict * 20
elif _dict['UOM'].upper() == 'BOX' and _dict['Item No'].upper() == "EDG-TRIMMER":
QuantityIn_dict = QuantityIn_dict * 12 #coz box of 12
if _poline['BackOrdered']>=QuantityIn_dict:
self.SalesOrderList[poidx]['SalesOrderLineRet'][polineidx]['DNQuantity']=float(QuantityIn_dict)
_bol_dictisadded = True
else:
print(f"{_poline['ItemFullName']} BackOrdered < Qty in DN {_poline['BackOrdered']}<{QuantityIn_dict}")
Error_msg = f"BackOrdered < Qty in DN {_poline['BackOrdered']}<{QuantityIn_dict}"
else:
# print(f"UOM different {_poline['UOM']} <> {_dict['UOM']}")
Error_msg = f"UOM different {_poline['UOM']} <> {_dict['UOM']}"
else:
# print("errorpoline <>DN")
Error_msg = f"poline[ItemFullName] <> DN[FullName]; {_poline['ItemFullName']} <> {_dict['FullName']}, maybe there are 2 same namefromtaco in QB"
else:
pass
# print(f"this refnum {_dict['RefNum']} have no QB PO Return Line")
Error_msg = f"this refnum {_dict['RefNum']} have no QB PO Return Line"
# print (_bol_dictisadded, Error_msg)
return _bol_dictisadded, Error_msg
def prepareInvoice(self, df:pd.DataFrame = None):
# print(df)
_bolAllDNareOk = True
_notindflist=[]
_yescounter = 0
_nocounter = 0
if df is not None:
_dflist = df.to_dict('records')
# print(_dflist)
else:
_dflist = self.dfDN.to_dict('records')
# print(self.dfDN)
# print(f'_dflist:{_dflist}')
for idx, xdf in enumerate(_dflist):
_boladdDN, _Errormsg = self.addDNQtyToSalesOrderList(xdf)
# print(f'prepareInvoice->_Errormsg:{_Errormsg}')
if _boladdDN:
_dflist[idx]['ADDED']=True
elif _Errormsg:
_dflist[idx]['ERROR']=_Errormsg
for xdf in (_dflist):
if 'ADDED' not in xdf:
# print (f"prepareInvoice->not added: {xdf['Item No']}")
print (f"prepareInvoice->not added: {xdf}")
_notindflist.append(xdf)
_nocounter+=1
_bolAllDNareOk = False
else:
print (f"ADDED: {xdf['Item No']}")
print(f'{len(_dflist) - _nocounter} of {len(_dflist)} are added')
return _bolAllDNareOk, _notindflist
def validate_date(self, date_text):
if date_text is None:
return None
try:
return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
except ValueError:
return None
# raise ValueError("Incorrect data format, should be YYYY-MM-DD")
def get_ext_doc_no_list(self, dndict=None):
if dndict:
dnlist = dndict['lines']
else:
dnlist = self.DN
df = pd.DataFrame(dnlist)
# print(df)
df['RefNum']= df['No.SO/Ext.Doc.No.'].apply(lambda x: "L"+ x.split("L-")[1])
# print(df)
ext_doc_no=df['RefNum'].unique().tolist()
if len(ext_doc_no)>0:
return df, ext_doc_no
else:
return df, None
def get_pricelevel(self, response_string=None):
if not response_string:
response_string = self.connect_to_quickbooks(self.create_QBXML())
statusok, status = self.status_ok(response_string)
if statusok:
QBXML = ET.fromstring(response_string)
PriceLevellist = {}
fullnamelist = []
custompricelist =[]
PriceLevelRets = QBXML.findall('.//PriceLevelRet')
PriceLevelNamelist = []
# PriceLevelName = QBXML.find('.//Name').text
# print(f'PriceLevelPerItemRets count:{len(PriceLevelPerItemRets)}')
for PriceLevelRet in PriceLevelRets:
PriceLevelPerItemRets = PriceLevelRet.findall('.//PriceLevelPerItemRet')
PriceLevelName = PriceLevelRet.find('.//Name').text
print(f'pricelevelname:{PriceLevelName}')
for PriceLevelPerItemRet in PriceLevelPerItemRets:
FullName = PriceLevelPerItemRet.find('.//FullName').text
fullnamelist.append(FullName)
CustomPrice = PriceLevelPerItemRet.find('CustomPrice').text
custompricelist.append(CustomPrice)
PriceLevelNamelist.append(PriceLevelName)
zip(fullnamelist, custompricelist)
PriceLevellist['PriceLevelName'] = PriceLevelNamelist
PriceLevellist['FullName']= fullnamelist
PriceLevellist['CustomPrice']= custompricelist
PriceLeveldf = pd.DataFrame.from_dict(PriceLevellist)
PriceLeveldf.sort_values(by=['PriceLevelName', 'FullName'], inplace=True)
PriceLeveldf=PriceLeveldf.reset_index(drop=True)
print(PriceLeveldf)
PriceLeveldf.to_excel('ItemInventory\PriceLevel.xlsx', sheet_name=PriceLevelName, index=False )
# print(PriceLevellist)
return PriceLevellist
else:
return None
def create_open_sales_order_qbxml(self, txnid:list, IncludeLineItems=True):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
# QBXMLMsgsRq.set("onError", "continueOnError")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
SalesOrderQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "SalesOrderQueryRq","\n " )
if len(txnid)>0:
for x in txnid:
TxnID = self.create_sub_element(ET, SalesOrderQueryRq, "TxnID", x, 6 )
else:
return None
if IncludeLineItems:
IncludeLineItems = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeLineItems", 'true', 4)
# if len(self.IncludeRetElement)>0:
# for x in self.IncludeRetElement:
# IncludeRetElement = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeRetElement", x, 4)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
print(f'create_QBXML->qbxml_query: {qbxml_query}')
return qbxml_query
def get_open_sales_order(self, txnid:list):
print(f'txnid: {txnid}', type(txnid))
txnid = [x[0] if isinstance(x, list) else x for x in txnid ]
if txnid:
# print(self.connect_to_quickbooks(self.create_open_sales_order_qbxml(txnid)))
return self._get_sales_order_header(self.connect_to_quickbooks(self.create_open_sales_order_qbxml(txnid)))
else:
print("There is No Open Sales Order")
return None
return None
print('### PriceLevelQuery ###')
if __name__ == '__main__':
starttime = timeit.default_timer()
# ini=PriceLevelQuery(FullName= '999 HPL', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'IsManuallyClosed', 'IsFullyInvoiced'])
# ini=PriceLevelQuery(FullName= 'abadi serpong', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount'])
ini = PriceLevelQuery(FullName = ['t 202202', 'M 202202'], )
# ini = PriceLevelQuery( ItemRef="ECO:0:ECO-002")
itu = ini.create_QBXML()
print(f'createQBXML->main:{itu}')
response_string = ini.connect_to_quickbooks(itu)
print(f'response_string:{response_string}')
response_string = None
pricelevel = ini.get_pricelevel()
# print(f'pricelevel:{pricelevel}')
# print(ini.get_open_sales_order(open_sales_orders))
# print(ini.create_invoiceadd_QBXML())
# ini.connect_to_quickbooks(ini.create_invoiceadd_QBXML())
print("The time difference is :", timeit.default_timer() - starttime)