mirror of
https://github.com/bcomsugi/dasaproject.git
synced 2026-01-09 15:22:37 +07:00
701 lines
38 KiB
Python
701 lines
38 KiB
Python
import xml.etree.ElementTree as ET
|
|
import win32com.client
|
|
import xmltodict
|
|
import pprint
|
|
import datetime
|
|
import pandas as pd
|
|
from datetime import date
|
|
import timeit
|
|
import os
|
|
import pythoncom
|
|
# from icecream import ic
|
|
from decimal import Decimal
|
|
|
|
# ic.configureOutput(includeContext=True, )
|
|
class SalesOrderQuery:
|
|
def __init__(self, **kwargs) -> None:
|
|
# print(f'kwargs:{kwargs}')
|
|
# print(args)
|
|
# self.SalesOrderList=[]
|
|
self.PriceLevelName = None
|
|
self.SPPriceLevelName = None
|
|
self.cwd = kwargs['cwd'] if 'cwd' in kwargs else os.getcwd()
|
|
self.TxnID = kwargs['TxnID'] if 'TxnID' in kwargs else None
|
|
|
|
self.item_inventory_path = "ItemInventory"
|
|
self.price_level_filename = "PriceLevel.xlsx"
|
|
self._df_price_level = pd.read_excel(os.path.join(self.cwd, self.item_inventory_path, self.price_level_filename), usecols=['FullName', 'PriceLevelName', 'CustomPrice'],)
|
|
print(self._df_price_level)
|
|
# print(type(self._df_price_level.loc[(self._df_price_level['FullName']=="ECO:0:ECO-002") & (self._df_price_level['PriceLevelName']=="T 202202")].values.tolist()[0][2]))
|
|
# print(self._df_price_level.loc[(self._df_price_level['FullName']=="TEDG:S122:EDG-009-1/22") & (self._df_price_level['PriceLevelName']=="T 202202")].values.tolist()[0][2])
|
|
self.FullName = kwargs['FullName'] if 'FullName' in kwargs else None
|
|
|
|
self.CustomerPriceLevelName_filename = "CustomerList.xlsx"
|
|
self._df_customer = pd.read_excel(os.path.join(self.cwd, self.item_inventory_path, self.CustomerPriceLevelName_filename), usecols=['FullName', 'PriceLevelName', 'SPName'],)
|
|
self._df_customer = self._df_customer.fillna('')
|
|
print(self._df_customer)
|
|
self.Customer = None
|
|
if self.FullName:
|
|
if self.FullName in self._df_customer['FullName'].values:
|
|
self.Customer = self._df_customer.loc[(self._df_customer["FullName"]==self.FullName)].values.tolist()[0]
|
|
print(f'Customer:{self.Customer}')
|
|
self.DN = kwargs['DN'] if 'DN' in kwargs else {}
|
|
self.Reuse = kwargs['Reuse'] if 'Reuse' in kwargs else None
|
|
|
|
self.SalesOrderList= kwargs['SalesOrderList'] if 'SalesOrderList' in kwargs else []
|
|
self.InvoiceList = None
|
|
self.SalesOrderType = kwargs['SalesOrderType'] if 'SalesOrderType' in kwargs else 'SalesByCustomerSummary'
|
|
self.IncludeLineItems = kwargs['IncludeLineItems'] if 'IncludeLineItems' in kwargs else 'true'
|
|
self.IncludeLinkedTxns = kwargs['IncludeLinkedTxns'] if 'IncludeLinkedTxns' in kwargs else 'true'
|
|
self.IncludeRetElement = kwargs['IncludeRetElement'] if 'IncludeRetElement' in kwargs else []
|
|
self.TxnDateRangeFilter = kwargs['TxnDateRangeFilter'] if 'TxnDateRangeFilter' in kwargs else None
|
|
self.DateMacro = None
|
|
if 'DateMacro' in kwargs:
|
|
if kwargs['DateMacro'] in ['All', 'Today', 'ThisWeek', 'ThisWeekToDate', 'ThisMonth', 'ThisMonthToDate', 'ThisQuarter', 'ThisQuarterToDate', 'ThisYear', 'ThisYearToDate', 'Yesterday', 'LastWeek', 'LastWeekToDate', 'LastMonth', 'LastMonthToDate', 'LastQuarter', 'LastQuarterToDate', 'LastYear', 'LastYearToDate', 'NextWeek', 'NextFourWeeks', 'NextMonth', 'NextQuarter', 'NextYear']:
|
|
self.DateMacro = kwargs['DateMacro']
|
|
self.FromTxnDate = self.validate_date(kwargs['FromTxnDate']) if 'FromTxnDate' in kwargs else None
|
|
self.ToTxnDate = self.validate_date(kwargs['ToTxnDate']) if 'ToTxnDate' in kwargs else None
|
|
self.ReportEntityFilter = kwargs['ReportEntityFilter'] if 'ReportEntityFilter' in kwargs else None
|
|
|
|
# print(self.DateMacro, self.TxnDateRangeFilter, self.FromTxnDate, self.ToTxnDate)
|
|
# print(self.Reuse)
|
|
# if not self.Reuse:
|
|
# self.dfDN, self.ext_doc_no_list = self.get_ext_doc_no_list(self.DN)
|
|
self.RefNumber = kwargs['RefNumber'] if 'RefNumber' in kwargs else None
|
|
# self.get_sales_order_header()
|
|
|
|
|
|
def pprintXml(self, qbxml_query):
|
|
import xml.dom.minidom
|
|
from xml.sax.saxutils import escape
|
|
from lxml import etree
|
|
# dom = xml.dom.minidom.parse(xml_fname) # or
|
|
if isinstance(qbxml_query, str):
|
|
dom = xml.dom.minidom.parseString(qbxml_query)
|
|
pretty_xml_as_string = dom.toprettyxml(" ").split('\n')
|
|
pretty_xml_as_string = '\n'.join([s for s in pretty_xml_as_string if s.strip() ])
|
|
# print(f'pprintxml:\n{pretty_xml_as_string}')
|
|
return pretty_xml_as_string
|
|
|
|
|
|
def create_sub_element(self, ET, parentNode, thisNode, text="\n", whiteSpace = 0, attrib =None):
|
|
if type(attrib) is not dict:
|
|
attrib = {}
|
|
ele = ET.SubElement(parentNode, thisNode)
|
|
for x in attrib:
|
|
ele.set(x, attrib[x])
|
|
ele.text = text
|
|
tail = "\n"
|
|
for x in range(whiteSpace):
|
|
tail = tail + " "
|
|
ele.tail = tail
|
|
return ele
|
|
|
|
def create_QBXML(self):
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
# QBXMLMsgsRq.set("onError", "continueOnError")
|
|
QBXMLMsgsRq.set("onError", "stopOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
SalesOrderQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "SalesOrderQueryRq","\n " )
|
|
# SalesOrderType = self.create_sub_element(ET, SalesOrderQueryRq, 'SalesOrderType', self.SalesOrderType)
|
|
if self.TxnID:
|
|
pass
|
|
print(self.TxnID)
|
|
if isinstance(self.TxnID, list):
|
|
for _ in self.TxnID:
|
|
TxnID = self.create_sub_element(ET, SalesOrderQueryRq, "TxnID", _, )
|
|
else:
|
|
TxnID = self.create_sub_element(ET, SalesOrderQueryRq, "TxnID", self.TxnID, )
|
|
else:
|
|
if self.RefNumber:
|
|
pass
|
|
RefNumberFilter = self.create_sub_element(ET, SalesOrderQueryRq, 'RefNumberFilter', "\n ",)
|
|
MatchCriterion = self.create_sub_element(ET,RefNumberFilter, 'MatchCriterion', "Contains",)
|
|
RefNumber = self.create_sub_element(ET, RefNumberFilter, 'RefNumber', self.RefNumber, )
|
|
elif self.FullName:
|
|
EntityFilter = self.create_sub_element(ET, SalesOrderQueryRq, 'EntityFilter', "\n ")
|
|
FullName = self.create_sub_element(ET, EntityFilter, "FullName", self.FullName, 6)
|
|
elif self.DateMacro:
|
|
TxnDateRangeFilter = self.create_sub_element(ET, SalesOrderQueryRq, "TxnDateRangeFilter", "\n ",)
|
|
SalesOrderType = self.create_sub_element(ET, TxnDateRangeFilter, "DateMacro", self.DateMacro)
|
|
# SalesOrderType = self.create_sub_element(ET, SalesOrderQueryRq, "DateMacro", self.DateMacro)
|
|
|
|
elif type(self.FromTxnDate) is datetime.date or type(self.ToTxnDate) is datetime.date:
|
|
TxnDateRangeFilter = self.create_sub_element(ET, SalesOrderQueryRq, "TxnDateRangeFilter", "\n ",)
|
|
if type(self.FromTxnDate) is datetime.date:
|
|
FromTxnDate = self.create_sub_element(ET, TxnDateRangeFilter, "FromTxnDate", self.FromTxnDate.strftime('%Y-%m-%d'),4)
|
|
if type(self.ToTxnDate) is datetime.date:
|
|
ToTxnDate = self.create_sub_element(ET, TxnDateRangeFilter, "ToTxnDate", self.ToTxnDate.strftime('%Y-%m-%d'))
|
|
if self.IncludeLineItems:
|
|
IncludeLineItems = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeLineItems", self.IncludeLineItems, 4)
|
|
if self.IncludeLinkedTxns:
|
|
IncludeLinkedTxns = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeLinkedTxns", self.IncludeLinkedTxns, 4)
|
|
if len(self.IncludeRetElement)>0:
|
|
for x in self.IncludeRetElement:
|
|
IncludeRetElement = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeRetElement", x, 4)
|
|
mydata = ET.tostring(root, encoding = "unicode")
|
|
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
# print(f'create_QBXML->qbxml_query: {qbxml_query}')
|
|
return qbxml_query
|
|
|
|
def get_customer_pricelevel(self, response_string):
|
|
QBXML = ET.fromstring(response_string)
|
|
PriceLevelName = QBXML.find('.//PriceLevelRef')
|
|
if PriceLevelName:
|
|
|
|
PriceLevelName = PriceLevelName.find("FullName").text
|
|
print(f'PriceLevelName:{PriceLevelName}')
|
|
DataExtRets = QBXML.findall('.//DataExtRet')
|
|
SP_PriceLevelName = None
|
|
if len(DataExtRets)>0:
|
|
for DataExtRet in DataExtRets:
|
|
DataExtName = DataExtRet.find('DataExtName').text
|
|
DataExtValue = DataExtRet.find('DataExtValue').text
|
|
if DataExtName.lower() == 'special cust'.lower():
|
|
SP_PriceLevelName = DataExtValue.upper()
|
|
break
|
|
self.PriceLevelName = PriceLevelName
|
|
self.SPPriceLevelName = SP_PriceLevelName
|
|
return PriceLevelName, SP_PriceLevelName
|
|
|
|
def create_customerquery_QBXML(self ):
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
# QBXMLMsgsRq.set("onError", "continueOnError")
|
|
QBXMLMsgsRq.set("onError", "stopOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
CustomerQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "CustomerQueryRq","\n " )
|
|
# SalesOrderType = self.create_sub_element(ET, CustomerQueryRq, 'SalesOrderType', self.SalesOrderType)
|
|
FullName = self.create_sub_element(ET, CustomerQueryRq, "FullName", self.FullName, 6)
|
|
IncludeRetElement = ['FullName', 'PriceLevelRef', 'DataExtRet']
|
|
for x in IncludeRetElement:
|
|
IncludeRetElement = self.create_sub_element(ET, CustomerQueryRq, "IncludeRetElement", x, 4)
|
|
OwnerID = self.create_sub_element(ET, CustomerQueryRq, "OwnerID", "0", 6)
|
|
mydata = ET.tostring(root, encoding = "unicode")
|
|
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
print(f'create_customer_QBXML->qbxml_query: {qbxml_query}')
|
|
response_string=self.connect_to_quickbooks(qbxml_query)
|
|
|
|
return self.get_customer_pricelevel(response_string)
|
|
return response_string
|
|
|
|
def create_invoiceadd_QBXML(self, soDict=None):
|
|
|
|
print('create_ionvoiceadd_QBXML')
|
|
txn_date = str(date.today())
|
|
ref_number = None
|
|
if soDict:
|
|
self.SalesOrderList = soDict['data']
|
|
txn_date = soDict.get('txn_date', str(date.today()))
|
|
ref_number = soDict.get('ref_number', None)
|
|
# txn_date = str(date.today())
|
|
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
QBXMLMsgsRq.set("onError", "stopOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
InvoiceAddRq = self.create_sub_element(ET, QBXMLMsgsRq, "InvoiceAddRq", "\n ",2 )
|
|
InvoiceAdd = self.create_sub_element(ET, InvoiceAddRq, "InvoiceAdd", "\n ",4 )
|
|
CustomerRef = self.create_sub_element(ET, InvoiceAdd, "CustomerRef", "\n ",8 )
|
|
FullName = self.create_sub_element(ET, CustomerRef, "FullName", self.SalesOrderList[0]['CustomerFullName'], 8 )
|
|
TemplateRef = self.create_sub_element(ET, InvoiceAdd, "TemplateRef", "\n ", 8 )
|
|
TemplateFullName = self.create_sub_element(ET, TemplateRef, "FullName", "DBW Invoice (11%)", 8 )
|
|
# today = str(date.today())
|
|
# print(txn_date)
|
|
TxnDate = self.create_sub_element(ET, InvoiceAdd, "TxnDate", txn_date, 8) # self.DN['TxnDate'], 8 )
|
|
if ref_number:
|
|
RefNumber = self.create_sub_element(ET, InvoiceAdd, "RefNumber", ref_number, 8 ) # self.DN['DNRefNum'], 8 )
|
|
# ShipDate = self.create_sub_element(ET, InvoiceAdd, "ShipDate", txn_date,8) # self.DN['TxnDate'], 8 )
|
|
# Memo = self.create_sub_element(ET, InvoiceAdd, "Memo", self.DN['Memo'], 10 )
|
|
disc_amount = 0
|
|
for soidx, salesorder in enumerate(self.SalesOrderList):
|
|
disc_amount = 0
|
|
if 'TxnID' in salesorder:
|
|
SOTxnId = salesorder['TxnID']
|
|
# disc_amount+=int(salesorder['Disc_Amount'])
|
|
print(f'create_invoiceadd_QBXML->SOTxnId: {SOTxnId}')
|
|
for itemline in salesorder['SalesOrderLineRet']:
|
|
backOrdered = str(itemline['BackOrdered'])
|
|
# backOrdered = '1' #testing purpose
|
|
if float(backOrdered) > 0:
|
|
discPerPcs = float(itemline['discPerPcs'])
|
|
discPerItem = float(backOrdered) * discPerPcs
|
|
disc_amount += discPerItem
|
|
InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
|
|
# Quantity = self.create_sub_element(ET, InvoiceLineAdd, "Quantity", str(itemline['DNQuantity'] ), 12 )
|
|
Quantity = self.create_sub_element(ET, InvoiceLineAdd, "Quantity", backOrdered, 12 )
|
|
# UnitOfMeasure = self.create_sub_element(ET, InvoiceLineAdd, "UnitOfMeasure", str(itemline['UOM']), 12 )
|
|
LinkToTxn = self.create_sub_element(ET, InvoiceLineAdd, "LinkToTxn", "\n ", 10 )
|
|
TxnID = self.create_sub_element(ET, LinkToTxn, "TxnID", SOTxnId,14 )
|
|
TxnLineID = self.create_sub_element(ET, LinkToTxn, "TxnLineID", itemline['TxnLineID'], 12 )
|
|
# if soidx == len(self.SalesOrderList)-1: #last list then set the'400_Sales_discount'
|
|
# print(f'disc_amount:{format(disc_amount, ".2f")}')
|
|
# if disc_amount != 0:
|
|
# # disc_amount=format(disc_amount, ".2f")
|
|
# InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
|
|
# ItemRef = self.create_sub_element(ET, InvoiceLineAdd, "ItemRef", "\n ", 12)
|
|
# ItemFullName = self.create_sub_element(ET, ItemRef, "FullName", "400_Sales Discount", 12)
|
|
# ItemRate = self.create_sub_element(ET, InvoiceLineAdd, "Rate", str(disc_amount), 10)
|
|
|
|
if salesorder['Disc_Amount']!=0: # disc_amount != 0:
|
|
# disc_amount=format(disc_amount, ".2f")
|
|
InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
|
|
ItemRef = self.create_sub_element(ET, InvoiceLineAdd, "ItemRef", "\n ", 12)
|
|
ItemFullName = self.create_sub_element(ET, ItemRef, "FullName", "400_Sales Discount", 12)
|
|
ItemRate = self.create_sub_element(ET, InvoiceLineAdd, "Rate", str(disc_amount), 10)
|
|
elif 'other_itemFullName' in salesorder and 'other_qty' in salesorder and 'other_rate' in salesorder:
|
|
pass
|
|
InvoiceLineAdd = self.create_sub_element(ET, InvoiceAdd, "InvoiceLineAdd", "\n ", 10 )
|
|
ItemRef = self.create_sub_element(ET, InvoiceLineAdd, "ItemRef", "\n ", 12)
|
|
ItemFullName = self.create_sub_element(ET, ItemRef, "FullName", salesorder['other_itemFullName'], 12)
|
|
Quantity = self.create_sub_element(ET, InvoiceLineAdd, "Quantity", str(salesorder['other_qty']), 12 )
|
|
ItemRate = self.create_sub_element(ET, InvoiceLineAdd, "Rate", str(salesorder['other_rate']), 10)
|
|
|
|
mydata = ET.tostring(root, encoding = "unicode")
|
|
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
# print(f'create_invoiceadd_QBXML->Create_Invoiceadd_QBXML: {qbxml_query}')
|
|
# print(f"replyfrom qbxml:{self.connect_to_quickbooks(qbxml_query)}")
|
|
# print([s for s in qbxml_query.split('\n') if s.strip(' ') != ''])
|
|
|
|
# print(self.pprintXml(qbxml_query))
|
|
return self.pprintXml(qbxml_query)
|
|
|
|
|
|
def connect_to_quickbooks(self, qbxml_query):
|
|
# Connect to Quickbooks
|
|
# sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
|
|
# sessionManager.OpenConnection('', 'DASA')
|
|
# enumfodnc= win32com.client.Dispatch('QBXMLRP2.RequestProcessor')
|
|
# print(enumfodnc)
|
|
# print(enumfodnc.qbFileOpenDoNotCare)
|
|
sessionManager = win32com.client.Dispatch("QBXMLRP2.RequestProcessor", pythoncom.CoInitialize())
|
|
sessionManager.OpenConnection('', 'DASA2')
|
|
# ticket = sessionManager.BeginSession("z:\\DBW Bogor.qbw", 2)
|
|
|
|
ticket = sessionManager.BeginSession("", 2)
|
|
|
|
# Send query and receive response
|
|
response_string = sessionManager.ProcessRequest(ticket, qbxml_query)
|
|
|
|
# Disconnect from Quickbooks
|
|
sessionManager.EndSession(ticket) # Close the company file
|
|
sessionManager.CloseConnection() # Close the connection
|
|
# print (f'response_string:{response_string}')
|
|
return self.pprintXml(response_string)
|
|
|
|
def __str__(self, *args) -> str:
|
|
# return str(self._get_datarow(self.connect_to_quickbooks(self.create_QBXML())))
|
|
# print("__str__")
|
|
return str(self.get_sales_order_header())
|
|
# return "hello"
|
|
|
|
def get_sales_order_header(self, *args):
|
|
return self._get_sales_order_header(self.connect_to_quickbooks(self.create_QBXML()))
|
|
|
|
def status_ok(self, QBXML): #for InvoiceAddRS
|
|
|
|
tree = ET.fromstring(QBXML)
|
|
|
|
GSRQRs = tree.find(".//InvoiceAddRs")
|
|
# print(f"GSRQRs:{GSRQRs}")
|
|
status_code = GSRQRs.attrib #.get('statusCode')
|
|
# print(GSRQRs.attrib)
|
|
# print(GSRQRs.attrib['statusCode'])
|
|
status=GSRQRs.attrib.get('statusMessage')
|
|
|
|
print(f'status={status}')
|
|
if 'OK' in status:
|
|
return True, status_code
|
|
else:
|
|
return False, status_code
|
|
|
|
|
|
def get_saved_refnumber(self, QBXML): #for InvoiceAddRS
|
|
|
|
tree = ET.fromstring(QBXML)
|
|
objects = {}
|
|
GSRQRs = tree.find(".//InvoiceAddRs")
|
|
refnumber = tree.find(".//RefNumber").text
|
|
print(f"saved refnumber:{refnumber}")
|
|
objects['RefNumber'] = refnumber
|
|
objects['Customer_FullName']= tree.find(".//CustomerRef/FullName").text
|
|
objects['TxnDate'] = tree.find(".//TxnDate").text
|
|
objects['BalanceRemaining'] = tree.find(".//BalanceRemaining").text
|
|
return objects
|
|
|
|
def get_discperpcs(self, ItemFullName, Rate):
|
|
discPerPcs = 0
|
|
if self.Customer:
|
|
if self.Customer[2]:
|
|
try:
|
|
pricelist = self._df_price_level.loc[(self._df_price_level['FullName']==ItemFullName) & (self._df_price_level['PriceLevelName']==self.Customer[2])].values.tolist()[0][2]
|
|
discPerPcs = Rate-pricelist
|
|
# ic(Rate, disc, (Rate - self._df_price_level.loc[(self._df_price_level['FullName']==ItemFullName) & (self._df_price_level['PriceLevelName']==self.SPPriceLevelName)].values.tolist()[0][2]))
|
|
if discPerPcs < 0 :
|
|
discPerPcs = 0
|
|
print(f"WARNING: Rate is Lower than Pricelist Cust:{self.Customer} ItemName:{ItemFullName} Rate:{Rate} < {pricelist}")
|
|
except:
|
|
print('Pricelevelname not found')
|
|
return None
|
|
return discPerPcs
|
|
|
|
|
|
def _get_sales_order_header(self, response_string, includefullInvoiced=False):
|
|
print('_get_sales_order_header')
|
|
print(f'responsestring sales order header:{self.pprintXml(response_string)}')
|
|
QBXML = ET.fromstring(response_string)
|
|
datadict = {}
|
|
SalesOrderdict = {}
|
|
_SalesOrderlist = []
|
|
SalesOrderRets = QBXML.findall('.//SalesOrderRet')
|
|
if self.Customer == None:
|
|
custtemp = QBXML.find('.//SalesOrderRet/CustomerRef/FullName')
|
|
if custtemp != None:
|
|
self.Customer = custtemp.text
|
|
self.Customer = self._df_customer.loc[(self._df_customer["FullName"]==self.Customer)].values.tolist()[0]
|
|
print(f'Customer:{self.Customer}')
|
|
# print(SalesOrderRets)
|
|
for SalesOrderRet in SalesOrderRets:
|
|
RefNumber = SalesOrderRet.find('RefNumber').text
|
|
# Memo = SalesOrderRet.find('Memo').text
|
|
TxnDate = SalesOrderRet.find('TxnDate').text
|
|
TxnNumber = SalesOrderRet.find('TxnNumber').text
|
|
CustomerFullName = SalesOrderRet.find('CustomerRef/FullName').text
|
|
TxnID = SalesOrderRet.find('TxnID').text
|
|
TotalAmount = SalesOrderRet.find('TotalAmount').text
|
|
IsFullyInvoiced = SalesOrderRet.find('IsFullyInvoiced').text
|
|
IsManuallyClosed = SalesOrderRet.find('IsManuallyClosed').text
|
|
if includefullInvoiced==False:
|
|
if IsFullyInvoiced.lower()=='true':
|
|
# print(IsFullyInvoiced)
|
|
continue
|
|
# print(CustomerFullName, TxnID, TotalAmount)
|
|
SalesOrderdict = {'RefNumber':RefNumber, 'CustomerFullName':CustomerFullName, 'TxnID':TxnID,
|
|
'TxnDate':TxnDate, 'TxnNumber':TxnNumber,
|
|
'TotalAmount':TotalAmount, 'IsFullyInvoiced':IsFullyInvoiced, 'IsManuallyClosed':IsManuallyClosed, 'SalesOrderLineRet':[]}
|
|
SalesOrderLineRet = SalesOrderRet.findall('SalesOrderLineRet')
|
|
# ic(len(SalesOrderLineRet))
|
|
disc_amount=0
|
|
if len(SalesOrderLineRet) > 0:
|
|
# disc_amount=0
|
|
for SalesOrderLineRet in SalesOrderLineRet:
|
|
discPerItem = 0
|
|
discPerPcs = 0
|
|
convertQTY = 1
|
|
TxnLineID = SalesOrderLineRet.find('TxnLineID').text
|
|
ItemFullName = SalesOrderLineRet.find('ItemRef/FullName')
|
|
if ItemFullName is None:
|
|
print("no itemfullname")
|
|
continue #skip this orderline
|
|
else:
|
|
ItemFullName=ItemFullName.text
|
|
# print(ItemFullName)
|
|
if 'Sales' in ItemFullName and 'Disc' in ItemFullName:
|
|
continue #skip this sales discount line
|
|
Quantity = SalesOrderLineRet.find('Quantity').text
|
|
UnitOfMeasure = SalesOrderLineRet.find('UnitOfMeasure').text
|
|
|
|
### modified if UOM has ConvertQTY: '_' or ' of '-> in OverrideUOMSetRef
|
|
if '_' in UnitOfMeasure:
|
|
convertQTY = int(UnitOfMeasure.split('_')[1])
|
|
OverrideUOMSetRef = SalesOrderLineRet.find('OverrideUOMSetRef/FullName')
|
|
if OverrideUOMSetRef != None:
|
|
OverrideUOMSetRef = OverrideUOMSetRef.text
|
|
if 'of' in OverrideUOMSetRef and UnitOfMeasure.upper() == 'BOX':
|
|
convertQTY = int(OverrideUOMSetRef.split('of')[1])
|
|
print(f'OverrideUOMSetRef:{OverrideUOMSetRef}')
|
|
###
|
|
|
|
Rate = Decimal(SalesOrderLineRet.find('Rate').text)
|
|
Amount = Decimal(SalesOrderLineRet.find('Amount').text)
|
|
# if self.SPPriceLevelName:
|
|
Invoiced = SalesOrderLineRet.find('Invoiced').text
|
|
LineIsManuallyClosed = SalesOrderLineRet.find('IsManuallyClosed').text
|
|
# print(TxnLineID, ItemFullName)
|
|
BackOrdered = Decimal(Quantity) - Decimal(Invoiced)
|
|
if BackOrdered > 0 and LineIsManuallyClosed.lower() == 'false' :
|
|
# ic(self.Customer)
|
|
discPerPcs = self.get_discperpcs(ItemFullName, Rate)
|
|
if discPerPcs == None:
|
|
return False
|
|
discPerItem = BackOrdered * discPerPcs
|
|
disc_amount += discPerItem
|
|
# if self.Customer:
|
|
# if self.Customer[2]:
|
|
# discPerPcs = Rate-self._df_price_level.loc[(self._df_price_level['FullName']==ItemFullName) & (self._df_price_level['PriceLevelName']==self.Customer[2])].values.tolist()[0][2]
|
|
# # ic(Rate, disc, (Rate - self._df_price_level.loc[(self._df_price_level['FullName']==ItemFullName) & (self._df_price_level['PriceLevelName']==self.SPPriceLevelName)].values.tolist()[0][2]))
|
|
# if discPerPcs > 0:
|
|
# print(Quantity, BackOrdered, Rate, (Rate - self._df_price_level.loc[(self._df_price_level['FullName']==ItemFullName) & (self._df_price_level['PriceLevelName']==self.Customer[2])].values.tolist()[0][2]))
|
|
# discPerItem = BackOrdered * discPerPcs
|
|
# disc_amount += discPerItem
|
|
# # disc_amount += BackOrdered * discPerPcs # (Rate-self._df_price_level.loc[(self._df_price_level['FullName']==ItemFullName) & (self._df_price_level['PriceLevelName']==self.Customer[2])].values.tolist()[0][2])
|
|
# else:
|
|
# discPerPcs = 0
|
|
SalesOrderLinedict = {'TxnLineID':TxnLineID,
|
|
'ItemFullName':ItemFullName,
|
|
'Quantity':Quantity,
|
|
'UOM':UnitOfMeasure,
|
|
'Rate':Rate,
|
|
'Amount':Amount,
|
|
'BackOrdered':BackOrdered,
|
|
'Invoiced':Invoiced,
|
|
'LineIsManuallyClosed':LineIsManuallyClosed,
|
|
'discPerItem':discPerItem, # backorder qty * disc per pcs
|
|
'discPerPcs':discPerPcs,
|
|
'convertQTY':convertQTY,
|
|
}
|
|
SalesOrderdict['SalesOrderLineRet'].append(SalesOrderLinedict)
|
|
SalesOrderdict['Disc_Amount']=disc_amount
|
|
_SalesOrderlist.append(SalesOrderdict)
|
|
# print(_SalesOrderlist)
|
|
self.SalesOrderList=_SalesOrderlist
|
|
# print(f'_get_sales_order_header->Salesorderlist: {self.SalesOrderList}')
|
|
return self.SalesOrderList
|
|
|
|
|
|
def addDiscountToInvoiceList(self, _dict:dict):
|
|
print("addDiscountToInvoiceList")
|
|
|
|
|
|
def addDNQtyToSalesOrderList(self, _dict:dict):
|
|
_bolfoundrefnum=False
|
|
_bol_dictisadded=False
|
|
Error_msg = None
|
|
for poidx, _po in enumerate(self.SalesOrderList):
|
|
if _po['RefNumber']==_dict['RefNum']:
|
|
_bolfoundrefnum=True
|
|
if len(_po['SalesOrderLineRet'])>0:
|
|
for polineidx, _poline in enumerate(_po['SalesOrderLineRet']):
|
|
pass
|
|
if _poline['ItemFullName']==_dict['FullName']:
|
|
if _poline['UOM'].upper()==_dict['UOM'].upper():
|
|
# first do UOM in _dict convert treatment
|
|
QuantityIn_dict = _dict['Quantity']
|
|
if _dict['UOM'].upper().startswith('ROLL_'):
|
|
print(f"addDNQtyToSalesOrderList->DNqty:{_dict['Quantity']}, Roll_:{_dict['UOM'].split('_')[1]}")
|
|
QuantityIn_dict = _dict['Quantity'] * int(_dict['UOM'].split("_")[1])
|
|
pass
|
|
elif _dict['UOM'].upper() == 'BOX' and _dict['Item No'].upper() == "CUTTER":
|
|
print("addDNQtyToSalesOrderList->cutter")
|
|
elif _dict['UOM'].upper() == 'BOX' and (_dict['Item No'].upper().startswith("EB-") or _dict['Item No'].upper().startswith("TA-")):
|
|
print("addDNQtyToSalesOrderList->LEM")
|
|
if _dict['Item No'].split("-")[1].endswith("1006"):
|
|
QuantityIn_dict = QuantityIn_dict * 12
|
|
elif _dict['Item No'].split("-")[1].endswith("1025"):
|
|
QuantityIn_dict = QuantityIn_dict * 6
|
|
elif _dict['Item No'].split("-")[1].endswith("1100"):
|
|
pass
|
|
print("1100 lem")
|
|
elif _dict['UOM'].upper() == 'BOX' and (_dict['Item No'].upper().startswith("TL-") or _dict['Item No'].upper().startswith("TFL-")):
|
|
print("addDNQtyToSalesOrderList->Lock")
|
|
QuantityIn_dict = QuantityIn_dict * 20
|
|
elif _dict['UOM'].upper() == 'BOX' and _dict['Item No'].upper() == "EDG-TRIMMER":
|
|
QuantityIn_dict = QuantityIn_dict * 12 #coz box of 12
|
|
|
|
|
|
if _poline['BackOrdered']>=QuantityIn_dict:
|
|
self.SalesOrderList[poidx]['SalesOrderLineRet'][polineidx]['DNQuantity']=float(QuantityIn_dict)
|
|
_bol_dictisadded = True
|
|
else:
|
|
print(f"{_poline['ItemFullName']} BackOrdered < Qty in DN {_poline['BackOrdered']}<{QuantityIn_dict}")
|
|
Error_msg = f"BackOrdered < Qty in DN {_poline['BackOrdered']}<{QuantityIn_dict}"
|
|
else:
|
|
# print(f"UOM different {_poline['UOM']} <> {_dict['UOM']}")
|
|
Error_msg = f"UOM different {_poline['UOM']} <> {_dict['UOM']}"
|
|
else:
|
|
# print("errorpoline <>DN")
|
|
Error_msg = f"poline[ItemFullName] <> DN[FullName]; {_poline['ItemFullName']} <> {_dict['FullName']}, maybe there are 2 same namefromtaco in QB"
|
|
else:
|
|
pass
|
|
# print(f"this refnum {_dict['RefNum']} have no QB PO Return Line")
|
|
Error_msg = f"this refnum {_dict['RefNum']} have no QB PO Return Line"
|
|
|
|
# print (_bol_dictisadded, Error_msg)
|
|
return _bol_dictisadded, Error_msg
|
|
|
|
|
|
def prepareInvoice(self, df:pd.DataFrame = None):
|
|
# print(df)
|
|
_bolAllDNareOk = True
|
|
_notindflist=[]
|
|
_yescounter = 0
|
|
_nocounter = 0
|
|
if df is not None:
|
|
_dflist = df.to_dict('records')
|
|
# print(_dflist)
|
|
|
|
# else:
|
|
# _dflist = self.dfDN.to_dict('records')
|
|
# print(self.dfDN)
|
|
# print(f'_dflist:{_dflist}')
|
|
for idx, xdf in enumerate(_dflist):
|
|
_boladdDN, _Errormsg = self.addDNQtyToSalesOrderList(xdf)
|
|
# print(f'prepareInvoice->_Errormsg:{_Errormsg}')
|
|
if _boladdDN:
|
|
_dflist[idx]['ADDED']=True
|
|
elif _Errormsg:
|
|
_dflist[idx]['ERROR']=_Errormsg
|
|
for xdf in (_dflist):
|
|
if 'ADDED' not in xdf:
|
|
# print (f"prepareInvoice->not added: {xdf['Item No']}")
|
|
print (f"prepareInvoice->not added: {xdf}")
|
|
|
|
_notindflist.append(xdf)
|
|
_nocounter+=1
|
|
_bolAllDNareOk = False
|
|
else:
|
|
print (f"ADDED: {xdf['Item No']}")
|
|
|
|
print(f'{len(_dflist) - _nocounter} of {len(_dflist)} are added')
|
|
return _bolAllDNareOk, _notindflist
|
|
|
|
|
|
def validate_date(self, date_text):
|
|
if date_text is None:
|
|
return None
|
|
try:
|
|
return datetime.datetime.strptime(date_text, '%Y-%m-%d').date()
|
|
except ValueError:
|
|
return None
|
|
# raise ValueError("Incorrect data format, should be YYYY-MM-DD")
|
|
|
|
def get_ext_doc_no_list(self, dndict=None):
|
|
if dndict:
|
|
dnlist = dndict['lines']
|
|
else:
|
|
dnlist = self.DN
|
|
df = pd.DataFrame(dnlist)
|
|
# print(df)
|
|
df['RefNum']= df['No.SO/Ext.Doc.No.'].apply(lambda x: "L"+ x.split("L-")[1])
|
|
# print(df)
|
|
ext_doc_no=df['RefNum'].unique().tolist()
|
|
if len(ext_doc_no)>0:
|
|
return df, ext_doc_no
|
|
else:
|
|
return df, None
|
|
|
|
def get_open_so(self, response_string=None):
|
|
if not response_string:
|
|
response_string = self.connect_to_quickbooks(self.create_QBXML())
|
|
|
|
QBXML = ET.fromstring(response_string)
|
|
_OpenSalesOrderlist = []
|
|
SalesOrderRets = QBXML.findall('.//SalesOrderRet')
|
|
# print(f'SalesOrderRets count:{len(SalesOrderRets)}')
|
|
for SalesOrderRet in SalesOrderRets:
|
|
IsFullyInvoiced = SalesOrderRet.find('IsFullyInvoiced').text
|
|
IsManuallyClosed = SalesOrderRet.find('IsManuallyClosed').text
|
|
if IsFullyInvoiced=='false' and IsManuallyClosed=='false':
|
|
txndate = SalesOrderRet.find('TxnDate').text
|
|
totalamount = SalesOrderRet.find('TotalAmount').text
|
|
refnumber = SalesOrderRet.find('RefNumber').text
|
|
|
|
_OpenSalesOrderlist.append([SalesOrderRet.find('TxnID').text, txndate, totalamount, refnumber, ])
|
|
# _OpenSalesOrderlist.append(SalesOrderRet.find('TxnID').text)
|
|
# RefNumber = SalesOrderRet.find('RefNumber').text
|
|
# Memo = SalesOrderRet.find('Memo').text
|
|
# CustomerFullName = SalesOrderRet.find('CustomerRef/FullName').text
|
|
# TxnID = SalesOrderRet.find('TxnID').text
|
|
# TotalAmount = SalesOrderRet.find('TotalAmount').text
|
|
# IsFullyReceived = SalesOrderRet.find('IsFullyReceived').text
|
|
# IsManuallyClosed = SalesOrderRet.find('IsManuallyClosed').text
|
|
# print(_OpenSalesOrderlist)
|
|
return _OpenSalesOrderlist
|
|
|
|
def create_open_sales_order_qbxml(self, txnid:list, IncludeLineItems=True):
|
|
root = ET.Element("QBXML")
|
|
root.tail = "\n"
|
|
root.text = "\n "
|
|
QBXMLMsgsRq = ET.SubElement(root, "QBXMLMsgsRq")
|
|
# QBXMLMsgsRq.set("onError", "continueOnError")
|
|
QBXMLMsgsRq.set("onError", "stopOnError")
|
|
QBXMLMsgsRq.tail = "\n"
|
|
QBXMLMsgsRq.text = "\n "
|
|
SalesOrderQueryRq = self.create_sub_element(ET, QBXMLMsgsRq, "SalesOrderQueryRq","\n " )
|
|
if len(txnid)>0:
|
|
for x in txnid:
|
|
TxnID = self.create_sub_element(ET, SalesOrderQueryRq, "TxnID", x, 6 )
|
|
else:
|
|
return None
|
|
if IncludeLineItems:
|
|
IncludeLineItems = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeLineItems", 'true', 4)
|
|
# if len(self.IncludeRetElement)>0:
|
|
# for x in self.IncludeRetElement:
|
|
# IncludeRetElement = self.create_sub_element(ET, SalesOrderQueryRq, "IncludeRetElement", x, 4)
|
|
mydata = ET.tostring(root, encoding = "unicode")
|
|
|
|
qbxml_query = """<?xml version="1.0" encoding="utf-8"?>\n"""
|
|
qbxml_query = qbxml_query + """<?qbxml version="13.0"?>"""
|
|
qbxml_query = qbxml_query + "\n" + mydata
|
|
print(f'create_open_sale_order_qbxml->qbxml_query: {qbxml_query}')
|
|
return qbxml_query
|
|
|
|
|
|
def get_open_sales_order(self, txnlist:list):
|
|
print(f'txnid: {txnlist}', type(txnlist))
|
|
txnid = []
|
|
for x in txnlist:
|
|
# print(x, type(x))
|
|
if isinstance(x, list):
|
|
txnid.append(x[0])
|
|
else:
|
|
txnid.append(x)
|
|
# break
|
|
# txnid = [x[0] if isinstance(x, list) else x for x in txnid ]
|
|
print(txnid)
|
|
if txnid:
|
|
# print(self.connect_to_quickbooks(self.create_open_sales_order_qbxml(txnid)))
|
|
return self._get_sales_order_header(self.connect_to_quickbooks(self.create_open_sales_order_qbxml(txnid)))
|
|
else:
|
|
print("There is No Open Sales Order")
|
|
return None
|
|
return None
|
|
|
|
print('### SalesOrder ###')
|
|
if __name__ == '__main__':
|
|
starttime = timeit.default_timer()
|
|
# ini=SalesOrderQuery(FullName= '999 HPL', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'IsManuallyClosed', 'IsFullyInvoiced'])
|
|
|
|
# ini=SalesOrderQuery(FullName= 'YSM Interior', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount'])
|
|
# ini=SalesOrderQuery(FullName= 'Abadi Serpong', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount'])
|
|
# ini=SalesOrderQuery(RefNumber = 'B23070685', FullName= 'Abadi Serpong', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount'])
|
|
ini=SalesOrderQuery(RefNumber = 'B23070685', IncludeRetElement = ['TxnID', 'TimeCreated', 'TimeModified','TxnNumber', 'CustomerRef', 'TxnDate', 'RefNumber', 'IsManuallyClosed', 'IsFullyInvoiced','TotalAmount'])
|
|
# iya = ini.create_customerquery_QBXML() #pakai excel saja lebih cepat
|
|
# print(iya)
|
|
print(f'createQBXML:{ini.create_QBXML()}')
|
|
response_string = ini.connect_to_quickbooks(ini.create_QBXML())
|
|
# print(f'response_string:{response_string}')
|
|
response_string = None
|
|
open_sales_orders = ini.get_open_so()
|
|
print(f'open sales orders:{open_sales_orders}')
|
|
if open_sales_orders:
|
|
|
|
itu = ini.get_open_sales_order(open_sales_orders)
|
|
# print(itu)
|
|
print(f'get_open_sales_order:{itu}')
|
|
if itu:
|
|
|
|
print(ini.create_invoiceadd_QBXML())
|
|
|
|
# ini.connect_to_quickbooks(ini.create_invoiceadd_QBXML())
|
|
|
|
print("The time difference is :", timeit.default_timer() - starttime) |