dasaproject/SO_to_Inv/PriceLevelQuery.py
2024-01-06 03:01:02 +07:00

479 lines
23 KiB
Python

import xml.etree.ElementTree as ET
import win32com.client
import xmltodict
import pprint
import datetime
import pandas as pd
from datetime import date
import timeit
import pythoncom
import os
class PriceLevelQuery:
def __init__(self, **kwargs) -> None:
# print(f'kwargs:{kwargs}')
# print(args)
# self.SalesOrderList=[]
self.IncludeLineItems = kwargs['IncludeLineItems'] if 'IncludeLineItems' in kwargs else 'true'
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else []
self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None
self.FullName = kwargs['FullName'] if 'FullName' in kwargs and isinstance(kwargs['FullName'], list) else []
print(f'FULLNAME:{self.FullName}')
self.NameFilter = kwargs['NameFilter'] if 'NameFilter' in kwargs else None
self.ItemRef = kwargs['ItemRef'] if 'ItemRef' in kwargs else None
self.QBXML = None
self.response_string = None
self.status_ok = False
self.status_msg = None
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
if type(attrib) is not dict:
attrib = {}
ele = ET.SubElement(parentNode, thisNode)
for x in attrib:
ele.set(x, attrib[x])
ele.text = text
tail = "\n"
for x in range(whiteSpace):
tail = tail + " "
ele.tail = tail
return ele
def create_QBXML(self):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
# QBXMLMsgsRq.set("onError", "continueOnError")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
PriceLevelQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "PriceLevelQueryRq","\n " )
if len(self.FullName)>0:
for x in self.FullName:
FullName = self.create_sub_element(ET, PriceLevelQueryRq, "FullName", x, 6)
if self.NameFilter:
NameFilter = self.create_sub_element(ET, PriceLevelQueryRq, "NameFilter","\n ", 6)
MatchCriterion = self.create_sub_element(ET, NameFilter, "MatchCriterion", "StartsWith")
Name = self.create_sub_element(ET, NameFilter, "Name", self.NameFilter)
if self.ItemRef:
ItemRef = self.create_sub_element(ET, PriceLevelQueryRq, "ItemRef", "\n ")
ItemRefFullName = self.create_sub_element(ET, ItemRef, "FullName", self.ItemRef)
if len(self.IncludeRetElement)>0:
for x in self.IncludeRetElement:
IncludeRetElement = self.create_sub_element(ET, PriceLevelQueryRq, "IncludeRetElement", x, 4)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
# print(f'create_QBXML->qbxml_query: {qbxml_query}')
self.QBXML = qbxml_query
return self.QBXML
def create_invoiceadd_QBXML(self):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
InvoiceAddRq = self.create_sub_element(ET, QBXMLMsgsRq, "InvoiceAddRq", "\n ",2 )
InvoiceAdd = self.create_sub_element(ET, InvoiceAddRq, "InvoiceAdd", "\n ",4 )
CustomerRef = self.create_sub_element(ET, InvoiceAdd, "CustomerRef", "\n ",8 )
FullName = self.create_sub_element(ET, CustomerRef, "FullName", self.SalesOrderList[1]['CustomerFullName'], 8 )
TemplateRef = self.create_sub_element(ET, InvoiceAdd, "TemplateRef", "\n ", 8 )
tempFullName = self.create_sub_element(ET, TemplateRef, "FullName", "DBW Invoice (11%)", 8 )
today = str(date.today())
TxnDate = self.create_sub_element(ET, InvoiceAdd, "TxnDate", today,8) # self.DN['TxnDate'], 8 )
RefNumber = self.create_sub_element(ET, InvoiceAdd, "RefNumber", today,8 ) # self.DN['DNRefNum'], 8 )
ShipDate = self.create_sub_element(ET, InvoiceAdd, "ShipDate", today,8) # self.DN['TxnDate'], 8 )
# Memo = self.create_sub_element(ET, InvoiceAdd, "Memo", self.DN['Memo'], 10 )
disc_amount = 0
for soidx, salesorder in enumerate(self.SalesOrderList):
SOTxnId = salesorder['TxnID']
disc_amount+=int(salesorder['Disc_Amount'])
print(f'create_invoiceadd_QBXML->SOTxnId: {SOTxnId}')
for itemline in salesorder['SalesOrderLineRet']:
# if 'DNQuantity' in itemline:
InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
# Quantity = self.create_sub_element(ET, InvoiceLineAdd, "Quantity", str(itemline['DNQuantity'] ), 12 )
# Quantity = self.create_sub_element(ET, InvoiceLineAdd, "Quantity", str(itemline['BackOrdered'] ), 12 )
# UnitOfMeasure = self.create_sub_element(ET, InvoiceLineAdd, "UnitOfMeasure", str(itemline['UOM']), 12 )
LinkToTxn = self.create_sub_element(ET, InvoiceLineAdd, "LinkToTxn", "\n ", 10 )
TxnID = self.create_sub_element(ET, LinkToTxn, "TxnID", SOTxnId,14 )
TxnLineID = self.create_sub_element(ET, LinkToTxn, "TxnLineID", itemline['TxnLineID'], 12 )
if soidx == len(self.SalesOrderList)-1: #last list then set the'400_Sales_discount'
print(f'disc_amount:{format(disc_amount, ".2f")}')
# disc_amount=format(disc_amount, ".2f")
InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
ItemRef = self.create_sub_element(ET, InvoiceLineAdd, "ItemRef", "\n ", 12)
ItemFullName = self.create_sub_element(ET, ItemRef, "FullName", "400_Sales Discount", 12)
ItemRate = self.create_sub_element(ET, InvoiceLineAdd, "Rate", str(disc_amount), 10)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
print(f'create_invoiceadd_QBXML->Create_Invoiceadd_QBXML: {qbxml_query}')
# print(f"replyfrom qbxml:{self.connect_to_quickbooks(qbxml_query)}")
return qbxml_query
def connect_to_quickbooks(self, qbxml_query):
# Connect to Quickbooks
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
# sessionManager.OpenConnection('', 'DASA')
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
# print(enumfodnc)
# print(enumfodnc.qbFileOpenDoNotCare)
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor", pythoncom.CoInitialize())
sessionManager.OpenConnection('', 'DASA2')
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
ticket = sessionManager.BeginSession("", 2)
# Send query and receive response
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
# Disconnect from Quickbooks
sessionManager.EndSession(ticket) # Close the company file
sessionManager.CloseConnection() # Close the connection
# print (f'response_string:{response_string}')
self.response_string = response_string
return self.response_string
def __str__(self, *args) -> str:
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
# print("__str__")
return str(self.get_sales_order_header())
# return "hello"
def get_sales_order_header(self, *args):
return self. _get_sales_order_header(self.connect_to_quickbooks(self.create_QBXML()))
def get_status(self, QBXML=None): #for InvoiceAddRS
if not QBXML:
tree = ET.fromstring(self.response_string)
else:
tree = ET.fromstring(QBXML)
GSRQRs = tree.find(".//PriceLevelQueryRs")
# print(f"GSRQRs:{GSRQRs}")
status_code = GSRQRs.attrib #.get('statusCode')
# print(GSRQRs.attrib)
# print(GSRQRs.attrib['statusCode'])
status=GSRQRs.attrib.get('statusMessage')
print(f'get_status={status}')
self.status_msg = status_code
if 'OK' in status:
self.status_ok = True
return True, status_code
else:
self.status_ok = False
return False, status_code
def _get_sales_order_header(self, response_string):
print('_get_sales_order_header')
# print(f'responsestring:{response_string}')
QBXML = ET.fromstring(response_string)
datadict = {}
SalesOrderdict = {}
_SalesOrderlist = []
SalesOrderRets = QBXML.findall('.//SalesOrderRet')
# print(SalesOrderRets)
for SalesOrderRet in SalesOrderRets:
RefNumber = SalesOrderRet.find('RefNumber').text
# Memo = SalesOrderRet.find('Memo').text
CustomerFullName = SalesOrderRet.find('CustomerRef/FullName').text
TxnID = SalesOrderRet.find('TxnID').text
TotalAmount = SalesOrderRet.find('TotalAmount').text
IsFullyInvoiced = SalesOrderRet.find('IsFullyInvoiced').text
IsManuallyClosed = SalesOrderRet.find('IsManuallyClosed').text
# print(CustomerFullName, TxnID, TotalAmount)
SalesOrderdict = {'RefNumber':RefNumber, 'CustomerFullName':CustomerFullName, 'TxnID':TxnID,
'TotalAmount':TotalAmount, 'IsFullyInvoiced':IsFullyInvoiced, 'IsManuallyClosed':IsManuallyClosed, 'SalesOrderLineRet':[]}
SalesOrderLineRet = SalesOrderRet.findall('SalesOrderLineRet')
# print(len(SalesOrderLineRet))
if len(SalesOrderLineRet) > 0:
disc_amount=0
for SalesOrderLineRet in SalesOrderLineRet:
TxnLineID = SalesOrderLineRet.find('TxnLineID').text
ItemFullName = SalesOrderLineRet.find('ItemRef/FullName').text
Quantity = SalesOrderLineRet.find('Quantity').text
UnitOfMeasure = SalesOrderLineRet.find('UnitOfMeasure').text
Rate = float(SalesOrderLineRet.find('Rate').text)
Amount = float(SalesOrderLineRet.find('Amount').text)
disc_amount+=float(Quantity)*2000 #testing only
Invoiced = SalesOrderLineRet.find('Invoiced').text
LineIsManuallyClosed = SalesOrderLineRet.find('IsManuallyClosed').text
# print(TxnLineID, ItemFullName)
BackOrdered = float(Quantity) - float(Invoiced)
if BackOrdered:
SalesOrderLinedict = {'TxnLineID':TxnLineID,
'ItemFullName':ItemFullName,
'Quantity':Quantity,
'UOM':UnitOfMeasure,
'Rate':Rate,
'Amount':Amount,
'BackOrdered':BackOrdered,
'Invoiced':Invoiced,
'LineIsManuallyClosed':LineIsManuallyClosed,
}
SalesOrderdict['SalesOrderLineRet'].append(SalesOrderLinedict)
SalesOrderdict['Disc_Amount']=disc_amount
_SalesOrderlist.append(SalesOrderdict)
# print(_SalesOrderlist)
self.SalesOrderList=_SalesOrderlist
# print(f'_get_sales_order_header->Salesorderlist: {self.SalesOrderList}')
return self.SalesOrderList
def addDiscountToInvoiceList(self, _dict:dict):
print("addDiscountToInvoiceList")
def addDNQtyToSalesOrderList(self, _dict:dict):
_bolfoundrefnum=False
_bol_dictisadded=False
Error_msg = None
for poidx, _po in enumerate(self.SalesOrderList):
if _po['RefNumber']==_dict['RefNum']:
_bolfoundrefnum=True
if len(_po['SalesOrderLineRet'])>0:
for polineidx, _poline in enumerate(_po['SalesOrderLineRet']):
pass
if _poline['ItemFullName']==_dict['FullName']:
if _poline['UOM'].upper()==_dict['UOM'].upper():
# first do UOM in _dict convert treatment
QuantityIn_dict = _dict['Quantity']
if _dict['UOM'].upper().startswith('ROLL_'):
print(f"addDNQtyToSalesOrderList->DNqty:{_dict['Quantity']}, Roll_:{_dict['UOM'].split('_')[1]}")
QuantityIn_dict = _dict['Quantity'] * int(_dict['UOM'].split("_")[1])
pass
elif _dict['UOM'].upper() == 'BOX' and _dict['Item No'].upper() == "CUTTER":
print("addDNQtyToSalesOrderList->cutter")
elif _dict['UOM'].upper() == 'BOX' and (_dict['Item No'].upper().startswith("EB-") or _dict['Item No'].upper().startswith("TA-")):
print("addDNQtyToSalesOrderList->LEM")
if _dict['Item No'].split("-")[1].endswith("1006"):
QuantityIn_dict = QuantityIn_dict * 12
elif _dict['Item No'].split("-")[1].endswith("1025"):
QuantityIn_dict = QuantityIn_dict * 6
elif _dict['Item No'].split("-")[1].endswith("1100"):
pass
print("1100 lem")
elif _dict['UOM'].upper() == 'BOX' and (_dict['Item No'].upper().startswith("TL-") or _dict['Item No'].upper().startswith("TFL-")):
print("addDNQtyToSalesOrderList->Lock")
QuantityIn_dict = QuantityIn_dict * 20
elif _dict['UOM'].upper() == 'BOX' and _dict['Item No'].upper() == "EDG-TRIMMER":
QuantityIn_dict = QuantityIn_dict * 12 #coz box of 12
if _poline['BackOrdered']>=QuantityIn_dict:
self.SalesOrderList[poidx]['SalesOrderLineRet'][polineidx]['DNQuantity']=float(QuantityIn_dict)
_bol_dictisadded = True
else:
print(f"{_poline['ItemFullName']} BackOrdered < Qty in DN {_poline['BackOrdered']}<{QuantityIn_dict}")
Error_msg = f"BackOrdered < Qty in DN {_poline['BackOrdered']}<{QuantityIn_dict}"
else:
# print(f"UOM different {_poline['UOM']} <> {_dict['UOM']}")
Error_msg = f"UOM different {_poline['UOM']} <> {_dict['UOM']}"
else:
# print("errorpoline <>DN")
Error_msg = f"poline[ItemFullName] <> DN[FullName]; {_poline['ItemFullName']} <> {_dict['FullName']}, maybe there are 2 same namefromtaco in QB"
else:
pass
# print(f"this refnum {_dict['RefNum']} have no QB PO Return Line")
Error_msg = f"this refnum {_dict['RefNum']} have no QB PO Return Line"
# print (_bol_dictisadded, Error_msg)
return _bol_dictisadded, Error_msg
def prepareInvoice(self, df:pd.DataFrame = None):
# print(df)
_bolAllDNareOk = True
_notindflist=[]
_yescounter = 0
_nocounter = 0
if df is not None:
_dflist = df.to_dict('records')
# print(_dflist)
else:
_dflist = self.dfDN.to_dict('records')
# print(self.dfDN)
# print(f'_dflist:{_dflist}')
for idx, xdf in enumerate(_dflist):
_boladdDN, _Errormsg = self.addDNQtyToSalesOrderList(xdf)
# print(f'prepareInvoice->_Errormsg:{_Errormsg}')
if _boladdDN:
_dflist[idx]['ADDED']=True
elif _Errormsg:
_dflist[idx]['ERROR']=_Errormsg
for xdf in (_dflist):
if 'ADDED' not in xdf:
# print (f"prepareInvoice->not added: {xdf['Item No']}")
print (f"prepareInvoice->not added: {xdf}")
_notindflist.append(xdf)
_nocounter+=1
_bolAllDNareOk = False
else:
print (f"ADDED: {xdf['Item No']}")
print(f'{len(_dflist) - _nocounter} of {len(_dflist)} are added')
return _bolAllDNareOk, _notindflist
def validate_date(self, date_text):
if date_text is None:
return None
try:
return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
except ValueError:
return None
# raise ValueError("Incorrect data format, should be YYYY-MM-DD")
def get_ext_doc_no_list(self, dndict=None):
if dndict:
dnlist = dndict['lines']
else:
dnlist = self.DN
df = pd.DataFrame(dnlist)
# print(df)
df['RefNum']= df['No.SO/Ext.Doc.No.'].apply(lambda x: "L"+ x.split("L-")[1])
# print(df)
ext_doc_no=df['RefNum'].unique().tolist()
if len(ext_doc_no)>0:
return df, ext_doc_no
else:
return df, None
def get_pricelevel(self, response_string=None):
if not response_string:
response_string = self.connect_to_quickbooks(self.create_QBXML())
statusok, status = self.get_status(response_string)
if statusok:
QBXML = ET.fromstring(response_string)
PriceLevellist = {}
fullnamelist = []
custompricelist =[]
PriceLevelRets = QBXML.findall('.//PriceLevelRet')
PriceLevelNamelist = []
# PriceLevelName = QBXML.find('.//Name').text
for PriceLevelRet in PriceLevelRets:
PriceLevelPerItemRets = PriceLevelRet.findall('.//PriceLevelPerItemRet')
PriceLevelName = PriceLevelRet.find('.//Name').text
print(f'pricelevelname:{PriceLevelName}')
for PriceLevelPerItemRet in PriceLevelPerItemRets:
FullName = PriceLevelPerItemRet.find('.//FullName').text
fullnamelist.append(FullName)
CustomPrice = PriceLevelPerItemRet.find('CustomPrice').text
custompricelist.append(CustomPrice)
PriceLevelNamelist.append(PriceLevelName)
zip(fullnamelist, custompricelist)
PriceLevellist['PriceLevelName'] = PriceLevelNamelist
PriceLevellist['FullName']= fullnamelist
PriceLevellist['CustomPrice']= custompricelist
PriceLeveldf = pd.DataFrame.from_dict(PriceLevellist)
PriceLeveldf.sort_values(by=['PriceLevelName', 'FullName'], inplace=True)
PriceLeveldf=PriceLeveldf.reset_index(drop=True)
print(PriceLeveldf)
print(os.getcwd())
PriceLeveldf.to_excel('ItemInventory\PriceLevel.xlsx', sheet_name=PriceLevelName, index=False )
# print(PriceLevellist)
return PriceLevellist, status
else:
return None, status
def create_open_sales_order_qbxml(self, txnid:list, IncludeLineItems=True):
root = ET.Element("QBXML")
root.tail = "\n"
root.text = "\n "
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
# QBXMLMsgsRq.set("onError", "continueOnError")
QBXMLMsgsRq.set("onError", "stopOnError")
QBXMLMsgsRq.tail = "\n"
QBXMLMsgsRq.text = "\n "
SalesOrderQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "SalesOrderQueryRq","\n " )
if len(txnid)>0:
for x in txnid:
TxnID = self.create_sub_element(ET, SalesOrderQueryRq, "TxnID", x, 6 )
else:
return None
if IncludeLineItems:
IncludeLineItems = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeLineItems", 'true', 4)
# if len(self.IncludeRetElement)>0:
# for x in self.IncludeRetElement:
# IncludeRetElement = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeRetElement", x, 4)
mydata = ET.tostring(root, encoding = "unicode")
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
qbxml_query = qbxml_query + "\n" + mydata
print(f'create_QBXML->qbxml_query: {qbxml_query}')
return qbxml_query
def get_open_sales_order(self, txnid:list):
print(f'txnid: {txnid}', type(txnid))
txnid = [x[0] if isinstance(x, list) else x for x in txnid ]
if txnid:
# print(self.connect_to_quickbooks(self.create_open_sales_order_qbxml(txnid)))
return self._get_sales_order_header(self.connect_to_quickbooks(self.create_open_sales_order_qbxml(txnid)))
else:
print("There is No Open Sales Order")
return None
return None
def name_list(self):
self.get_status(self.connect_to_quickbooks(self.create_QBXML()))
namelist = []
if self.status_ok:
tree = ET.fromstring(self.response_string)
for _ in tree.iter('Name'):
# print(_.text)
namelist.append(_.text)
if len(namelist)>0:
return namelist
return None
print('### PriceLevelQuery ###')
if __name__ == '__main__':
starttime = timeit.default_timer()
FullName = ['t 202202', 'M 202202', 'b 202202']
FullName = None
ini = PriceLevelQuery(FullName = FullName, IncludeRetElement=['Name'] )
# ini = PriceLevelQuery( ItemRef="ECO:0:ECO-002")
print(ini.name_list())
# itu = ini.create_QBXML()
# print(f'createQBXML->main:{itu}')
# response_string = ini.connect_to_quickbooks(itu)
# print(f'response_string:{response_string}')
# response_string = None
# pricelevel, status = ini.get_pricelevel()
# if pricelevel:
# print(f'Success Save Price Level : {FullName}')
# else:
# print(f"Saving Not Success. status: {status}")
# print(f'pricelevel:{pricelevel}')
# print(ini.get_open_sales_order(open_sales_orders))
# print(ini.create_invoiceadd_QBXML())
# ini.connect_to_quickbooks(ini.create_invoiceadd_QBXML())
print("The time difference is :", timeit.default_timer() - starttime)