This commit is contained in:
bcomsugi 2025-02-20 06:31:59 +07:00
parent 86ff995133
commit 33273aa292
5 changed files with 981173 additions and 132 deletions

View File

@ -1,132 +0,0 @@
# from . import QBClasses
from pprint import pprint
from QBClass.QBClasses import InvoiceQuery, SalesOrderQuery
# import timeit
import time
print('succes Loading modules')
def timer(func):
def wrapper(*args, **kwargs):
nonlocal total
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
total += duration
print(f"Execution time: {duration} Total: {total}")
return result
total = 0
return wrapper
@timer
def get_all_so_from_invoice():
# iq = InvoiceQuery(MaxReturned= 20, IncludeLinkedTxns='true', IncludeLineItems='true')
start = time.time()
print('Get Invoice Query List. Processing..... wait for at minute(1 month=90secs)')
iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate='2024-09-1', TxnDateRangeFilter_ToTxnDate='2024-09-31', IncludeLinkedTxns='true', IncludeLineItems='true')
# pprint(iq.all(), sort_dicts=False)
# print(iq.all())
print(f"Execution time InvoiceQuery: {time.time()-start} {len(iq.all()) = }")
so_list = []
iq_list = []
dup_so_list = []
for idx, txn in enumerate(iq.all()):
# iq_list.append(txn)
# print(f"{idx = } {txn['RefNumber'] = } {txn['TxnDate'] = } {txn['Subtotal'] = } ")
if 'LinkedTxn' in txn:
# pprint(txn['LinkedTxn'], sort_dicts=False)
if not isinstance(txn['LinkedTxn'], list): #if there is no receive payment and only 1 linked traction, need to change to a list. RECORD it
txn_linkedTxn = [txn['LinkedTxn']]
else:
txn_linkedTxn = txn['LinkedTxn']
for linkedtxn in txn_linkedTxn:
if linkedtxn['TxnType']=='SalesOrder':
if linkedtxn['RefNumber'] not in so_list:
so_list.append(linkedtxn['RefNumber'])
else:
dup_so_list.append(linkedtxn['RefNumber'])
print(f'{dup_so_list = }')
print()
so_dict = {}
print(f"Execution time before SO: {time.time()-start}")
print('Get Sales Order Query List. Processing..... wait for at minute(1 month=130 secs)')
so = SalesOrderQuery(RefNumber = so_list, IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
print(f"Execution time SalesOrderQuery: {time.time()-start}")
for idx, txn in enumerate(so.all()):
so_dict[txn['RefNumber']] = txn
# pprint(so.all(), sort_dicts=False)
res = next(iter(so_dict))
print(f'{so_dict[res] = }')
print(f'{len(iq.all()) = } {len(so.all()) = } {len(so_list) = } {len(dup_so_list) = } {len(so_dict) = }')
@timer
def process():
# iq = InvoiceQuery(MaxReturned= 20, IncludeLinkedTxns='true', IncludeLineItems='true')
iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate='2024-02-01', TxnDateRangeFilter_ToTxnDate='2024-02-29', IncludeLinkedTxns='true', IncludeLineItems='true')
# pprint(iq.all(), sort_dicts=False)
# print(iq.all())
for idx, txn in enumerate(iq.all()):
print(f"{idx = } {txn['RefNumber'] = } {txn['TxnDate'] = } {txn['Subtotal'] = } ")
# print(f"{txn['Subtotal'] = }")
if 'LinkedTxn' in txn:
# pprint(txn['LinkedTxn'], sort_dicts=False)
if not isinstance(txn['LinkedTxn'], list): #if there is no receive payment and only 1 linked traction, need to change to a list. RECORD it
txn_linkedTxn = [txn['LinkedTxn']]
else:
txn_linkedTxn = txn['LinkedTxn']
for linkedtxn in txn_linkedTxn:
try:
if linkedtxn['TxnType']=='SalesOrder':
so = SalesOrderQuery(RefNumber = linkedtxn['RefNumber'], IncludeLinkedTxns='true', debug=False)
is_soLinkedToOneInvoice = False
if 'LinkedTxn' in so.all():
if not isinstance(so.all()['LinkedTxn'], list):
# print(so.all())
so_linkedTxn = [so.all()['LinkedTxn']]
else:
so_linkedTxn = so.all()['LinkedTxn']
# print(so.all())
for solinkedtxn in so_linkedTxn:
# print(len(so_linkedTxn))
if solinkedtxn['TxnType']=='Invoice' and len(so_linkedTxn)==1:
# print(so.all()['RefNumber'], 'the only one SO')
is_soLinkedToOneInvoice=True
# pass
else:
is_soLinkedToOneInvoice=False
print(so.all()['RefNumber'], 'NOT the only One, this SO have other Invoice number')
if float(linkedtxn['Amount'])<0:
if so.all()['TotalAmount']!=linkedtxn['Amount'][1:]:
if is_soLinkedToOneInvoice: #maybe the SO is manually closed, check it item by item, find which item is not in invoice
if so.all()['IsManuallyClosed'] == 'true':
pass
print(f"{so.all()['TxnID'] = } {so.all()['RefNumber'] = }")
else:
print('SO TotalAmount<>Amount in Invoice. not Manually closed and not fully Invoiced')
pprint(f'{linkedtxn = }', sort_dicts=False)
print(f"{so.all()['TxnID'] = } {so.all()['RefNumber'] = }")
print(so.all())
else:
pass # this is one SO is fully invoiced
else:
print('Linkedtxn amount is positif(should be negatif')
except Exception as e:
print('ERROR')
pprint(linkedtxn, sort_dicts=False)
print(f"{so.all()['TxnID'] = }")
print(so.all())
print(e)
break
# print(timeit.repeat(process, repeat=1))
# process()
get_all_so_from_invoice()

279
exim.py Normal file
View File

@ -0,0 +1,279 @@
# from . import QBClasses
from pprint import pprint
# from QBClass.QBClasses import InvoiceQuery, SalesOrderQuery
from QBClass.QBClasses import InvoiceQuery, SalesOrderQuery
# import timeit
import time
import json
import pandas as pd
print('succes Loading modules')
def timer(func):
def wrapper(*args, **kwargs):
nonlocal total
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
total += duration
print(f"Execution time: {duration} Total: {total}")
return result
total = 0
return wrapper
@timer
def get_all_so_from_invoice(MaxReturned=None, FromTxnDate=None, ToTxnDate=None):
print(MaxReturned, FromTxnDate, ToTxnDate)
start = time.time()
print('Get Invoice Query List. Processing..... wait for at minute(1 month=90secs)')
if MaxReturned:
iq = InvoiceQuery(MaxReturned= MaxReturned, IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
elif FromTxnDate and ToTxnDate:
print('here')
iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate=FromTxnDate, TxnDateRangeFilter_ToTxnDate=ToTxnDate, IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
else:
iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate='2021-01-01', TxnDateRangeFilter_ToTxnDate='2021-03-30', IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
# pprint(iq.all(), sort_dicts=False)
# print(iq.all())
print(f"Execution time InvoiceQuery: {time.time()-start} {len(iq.all()) = }")
so_list = []
iq_list = []
dup_so_list = []
for idx, txn in enumerate(iq.all()):
# iq_list.append(txn)
# print(f"{idx = } {txn['RefNumber'] = } {txn['TxnDate'] = } {txn['Subtotal'] = } ")
if 'LinkedTxn' in txn:
# pprint(txn['LinkedTxn'], sort_dicts=False)
if not isinstance(txn['LinkedTxn'], list): #if there is no receive payment and only 1 linked traction, need to change to a list. RECORD it
txn_linkedTxn = [txn['LinkedTxn']]
else:
txn_linkedTxn = txn['LinkedTxn']
for linkedtxn in txn_linkedTxn:
if linkedtxn['TxnType']=='SalesOrder':
if linkedtxn['RefNumber'] not in so_list:
so_list.append(linkedtxn['RefNumber'])
else:
dup_so_list.append(linkedtxn['RefNumber'])
print(f'{dup_so_list = }')
print()
so_dict = {}
print(f"Execution time before SO: {time.time()-start}")
print('Get Sales Order Query List. Processing..... wait for at minute(1 month=130 secs)')
so = SalesOrderQuery(RefNumber = so_list, IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
duplicateSO = []
soWithNoLinkedTxn = []
print(f"Execution time SalesOrderQuery: {time.time()-start}")
for idx, txn in enumerate(so.all()):
if 'LinkedTxn' in txn:
if txn['RefNumber'] not in so_dict:
so_dict[txn['RefNumber']] = txn
else:
duplicateSO.append(txn['RefNumber'])
else:
soWithNoLinkedTxn.append(txn)
# pprint(so.all(), sort_dicts=False)
res = next(iter(so_dict))
# print(f'{so_dict[res] = }')
# pprint(so_dict[res])
print(f'{soWithNoLinkedTxn = }')
print(f'{len(iq.all()) = } {len(so.all()) = } {len(so_list) = } {len(dup_so_list) = } {len(so_dict) = }')
print(f'{duplicateSO = }')
print()
### start processing like below module ###
pass
return iq.all(), so_dict
@timer
def process():
iq = InvoiceQuery(MaxReturned= 20, IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
# iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate='2024-02-01', TxnDateRangeFilter_ToTxnDate='2024-02-29', IncludeLinkedTxns='true', IncludeLineItems='true')
pprint(iq.all(), sort_dicts=False)
# print(iq.all())
nolinkInv = []
soNotInOneInv = []
for idx, txn in enumerate(iq.all()):
print(f"{idx = } {txn['RefNumber'] = } {txn['TxnDate'] = } {txn['Subtotal'] = } ")
# print(f"{txn['Subtotal'] = }")
if 'LinkedTxn' in txn:
# pprint(txn['LinkedTxn'], sort_dicts=False)
if not isinstance(txn['LinkedTxn'], list): #if there is no receive payment and only 1 linked traction, need to change to a list. RECORD it
txn_linkedTxn = [txn['LinkedTxn']]
else:
txn_linkedTxn = txn['LinkedTxn']
for linkedtxn in txn_linkedTxn:
try:
if linkedtxn['TxnType']=='SalesOrder':
so = SalesOrderQuery(RefNumber = linkedtxn['RefNumber'], IncludeLinkedTxns='true', debug=False)
is_soLinkedToOneInvoice = False
if 'LinkedTxn' in so.all():
if not isinstance(so.all()['LinkedTxn'], list):
# print(so.all())
so_linkedTxn = [so.all()['LinkedTxn']] #make a list
else:
so_linkedTxn = so.all()['LinkedTxn'] #just copy, already list
# print(so.all())
for solinkedtxn in so_linkedTxn:
# print(len(so_linkedTxn))
if solinkedtxn['TxnType']=='Invoice' and len(so_linkedTxn)==1:
# print(so.all()['RefNumber'], 'the only one SO')
is_soLinkedToOneInvoice=True
# pass
else:
is_soLinkedToOneInvoice=False
print(so.all()['RefNumber'], 'NOT the only One, this SO have other Invoice number') #make append to a list
soNotInOneInv.append(so.all()['RefNumber'])
if float(linkedtxn['Amount'])<0:
if so.all()['TotalAmount']!=linkedtxn['Amount'][1:]:
if is_soLinkedToOneInvoice: #maybe the SO is manually closed, check it item by item, find which item is not in invoice
if so.all()['IsManuallyClosed'] == 'true':
pass
print(f"{so.all()['TxnID'] = } {so.all()['RefNumber'] = }")
else:
print('SO TotalAmount<>Amount in Invoice. not Manually closed and not fully Invoiced')
pprint(f'{linkedtxn = }', sort_dicts=False)
print(f"{so.all()['TxnID'] = } {so.all()['RefNumber'] = }")
print(so.all())
else:
pass # this SO is fully invoiced, starting process to export the details
else:
print('Linkedtxn amount is positif(should be negatif')
except Exception as e:
print('ERROR')
pprint(linkedtxn, sort_dicts=False)
print(f"{so.all()['TxnID'] = }")
print(so.all())
print(e)
break
else:
nolinkInv.append(txn)
print(f'{len(nolinkInv) = }')
@timer
def process_data(iq_list, so_dict):
# iq = InvoiceQuery(MaxReturned= 20, IncludeLinkedTxns='true', IncludeLineItems='true', debug=False)
# iq = InvoiceQuery(TxnDateRangeFilter_FromTxnDate='2024-02-01', TxnDateRangeFilter_ToTxnDate='2024-02-29', IncludeLinkedTxns='true', IncludeLineItems='true')
# pprint(iq_list, sort_dicts=False)
# print(iq_list)
nolinkInv = []
soNotInOneInv = []
manuallyClosedSO = []
openSO = []
fullyInvoicedSO = []
so_list = [x for x in so_dict]
print(f'{len(so_list) = }')
for idx, txn in enumerate(iq_list):
# print(f"{idx = } {txn['RefNumber'] = } {txn['TxnDate'] = } {txn['Subtotal'] = } ")
# print(f"{txn['Subtotal'] = }")
if 'LinkedTxn' in txn:
# pprint(txn['LinkedTxn'], sort_dicts=False)
if not isinstance(txn['LinkedTxn'], list): #if there is no receive payment and only 1 linked traction, need to change to a list. RECORD it
txn_linkedTxn = [txn['LinkedTxn']]
else:
txn_linkedTxn = txn['LinkedTxn']
for linkedtxn in txn_linkedTxn:
try:
if linkedtxn['TxnType']=='SalesOrder':
# so_list.remove(linkedtxn['RefNumber'])
# so = SalesOrderQuery(RefNumber = linkedtxn['RefNumber'], IncludeLinkedTxns='true', debug=False)
sodt = so_dict[linkedtxn['RefNumber']]
is_soLinkedToOneInvoice = False
if sodt['IsManuallyClosed']=='true':
manuallyClosedSO.append(sodt['RefNumber'])
if sodt['IsFullyInvoiced']=='true':
fullyInvoicedSO.append(sodt['RefNumber'])
if 'LinkedTxn' in sodt:
if not isinstance(sodt['LinkedTxn'], list):
# print(sodt)
so_linkedTxn = [sodt['LinkedTxn']] #make a list
else:
so_linkedTxn = sodt['LinkedTxn'] #just copy, already list
# print(sodt)
for solinkedtxn in so_linkedTxn:
# print(len(so_linkedTxn))
if solinkedtxn['TxnType']=='Invoice' and len(so_linkedTxn)==1:
# print(sodt['RefNumber'], 'the only one SO')
is_soLinkedToOneInvoice=True
# pass
else:
is_soLinkedToOneInvoice=False
print(sodt['RefNumber'], 'NOT the only One, this SO have other Invoice number') #make append to a list
soNotInOneInv.append(sodt['RefNumber'])
if float(linkedtxn['Amount'])<0:
if sodt['TotalAmount']!=linkedtxn['Amount'][1:]:
if is_soLinkedToOneInvoice: #maybe the SO is manually closed, check it item by item, find which item is not in invoice
if sodt['IsManuallyClosed'] == 'true':
pass
print(f"{sodt['TxnID'] = } {sodt['RefNumber'] = }")
manuallyClosedSO.append(sodt['RefNumber'])
else:
print('SO TotalAmount<>Amount in Invoice. not Manually closed and not fully Invoiced')
pprint(f'{linkedtxn = }', sort_dicts=False)
print(f"{sodt['TxnID'] = } {sodt['RefNumber'] = }")
print(sodt)
openSO.append(sodt['RefNumber'])
else:
pass # this SO is fully invoiced, starting process to export the details
else:
print('Linkedtxn amount is positif(should be negatif')
except Exception as e:
print('ERROR')
pprint(linkedtxn, sort_dicts=False)
print(f"{sodt['TxnID'] = }")
print(sodt)
print(e)
break
else:
nolinkInv.append(txn)
print(f'{len(nolinkInv) = }\n{nolinkInv = }\n{soNotInOneInv = }\n{manuallyClosedSO = }\n{openSO = }\n{len(fullyInvoicedSO) = }')
c = [item for item in so_list if item not in fullyInvoicedSO]
print(f'not fuuly invoice, leftover SO: {c}')
def writeToFile(iq_list, so_dict):
try:
iq_list_json = json.dumps(iq_list, indent=2)
so_dict_json = json.dumps(so_dict, indent=2)
# Writing to sample.json
with open("iq.json", "w") as outfile:
outfile.write(iq_list_json)
with open("iq_so_dict.json", "w") as outfile:
outfile.write(so_dict_json)
return True
except Exception as e:
print(e)
return False
def readJsonFromFile(iq_filename, so_filename):
try:
with open(iq_filename, "r") as infile:
iq_list = json.load(infile)
with open(so_filename, "r") as infile:
so_dict = json.load(infile)
return iq_list, so_dict
except Exception as e:
print(e)
return [],{}
# print(timeit.repeat(process, repeat=1))
# process()
# iq_list, so_dict = get_all_so_from_invoice(MaxReturned=20)
iq_list, so_dict = get_all_so_from_invoice(FromTxnDate="2024-08-01", ToTxnDate="2024-08-31")
writeToFile(iq_list, so_dict)
iq_list, so_dict = readJsonFromFile("iq.json", "iq_so_dict.json")
print(f'{len(iq_list) = } {len(so_dict) = }')
process_data(iq_list, so_dict)

437061
iq.json Normal file

File diff suppressed because it is too large Load Diff

543705
iq_so_dict.json Normal file

File diff suppressed because it is too large Load Diff

128
test from mike.py Normal file
View File

@ -0,0 +1,128 @@
import win32com.client
import xml.etree.ElementTree as ET
class QuickBooksSDK:
def __init__(self):
""" Initialize QBSDK Request Processor """
self.qb = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
# self.session_id = None
# def open_connection(self):
# """ Open connection to QuickBooks """
# self.qb.OpenConnection("", "My Python App")
# self.qb.BeginSession("", 2) # 2 = Use current QuickBooks session
# def close_connection(self):
# """ Close connection to QuickBooks """
# self.qb.EndSession()
# self.qb.CloseConnection()
def open_connection(self):
""" Open connection to QuickBooks """
self.qb = win32com.client.Dispatch("QBXMLRP2.RequestProcessor")
self.qb.OpenConnection("", "My QuickBooks App")
self.session_id = self.qb.BeginSession("", 2) # 2 = Open in "No UI" mode
print("Connection to QuickBooks established.", self.session_id)
def close_connection(self):
""" Close QuickBooks session and connection """
if self.session_id:
self.qb.EndSession(self.session_id)
self.qb.CloseConnection()
print("Connection to QuickBooks closed.")
# def send_request(self, qbxml: str) -> str:
# """ Send qbXML request and return response """
# print("Sending XML Request to QuickBooks:\n", qbxml) # Debugging print
# response = self.qb.ProcessRequest(qbxml)
# return response
def send_request(self, qbxml: str) -> str:
""" Send QBXML request to QuickBooks and return response """
try:
print("\n🔷 SENDING REQUEST TO QUICKBOOKS:")
print(qbxml) # Print the exact request
print(self.session_id)
response = self.qb.ProcessRequest(self.session_id, qbxml)
print("\n✅ RESPONSE RECEIVED FROM QUICKBOOKS:")
print(response if response else "No response received!")
return response
except Exception as e:
print("\n❌ ERROR Processing Request:", str(e))
return ""
def parse_sales_order_response(self, xml_response: str):
""" Parse SalesOrderQueryRs XML response into a Python dictionary """
print(f'{xml_response = }')
root = ET.fromstring(xml_response)
sales_orders = []
for order in root.findall(".//SalesOrderRet"):
order_data = {
"TxnID": order.findtext("TxnID", ""),
"TxnDate": order.findtext("TxnDate", ""),
"CustomerRef": order.findtext("CustomerRef/FullName", ""),
"TotalAmount": order.findtext("TotalAmount", ""),
"LineItems": []
}
for line in order.findall(".//SalesOrderLineRet"):
line_item = {
"ItemRef": line.findtext("ItemRef/FullName", ""),
"Quantity": line.findtext("Quantity", ""),
"Rate": line.findtext("Rate", ""),
"Amount": line.findtext("Amount", "")
}
order_data["LineItems"].append(line_item)
sales_orders.append(order_data)
return sales_orders
def get_sales_orders(self, from_date: str, to_date: str) -> str:
""" Fetch sales orders from QuickBooks within date range """
qbxml = f"""
<?xml version="1.0" encoding="utf-8"?>
<?qbxml version="13.0"?>
<QBXML>
<QBXMLMsgsRq onError="stopOnError">
<SalesOrderQueryRq requestID="1">
<TxnDateRangeFilter>
<FromTxnDate>2024-01-01</FromTxnDate>
<ToTxnDate>2024-01-05</ToTxnDate>
</TxnDateRangeFilter>
<IncludeLineItems>1</IncludeLineItems>
</SalesOrderQueryRq>
</QBXMLMsgsRq>
</QBXML>
"""
qbxml = qbxml.strip()
print("\n🔍 CHECKING QBXML FORMAT BEFORE SENDING:")
print(qbxml) # Print request before sending
response = self.send_request(qbxml)
return response
# **🔹 Main Execution**
if __name__ == "__main__":
qb = QuickBooksSDK()
try:
qb.open_connection()
# Query sales orders from January 1, 2024 to January 5, 2024
sales_orders = qb.get_sales_orders("2024-01-01", "2024-01-05")
# Print hasil dalam format dictionary
print("Sales Orders:", sales_orders)
finally:
qb.close_connection()