From 94bc44ad0e87ae64df65876577312f3b9d048108 Mon Sep 17 00:00:00 2001 From: Reckless_Satoshi <90936742+Reckless-Satoshi@users.noreply.github.com> Date: Thu, 16 Mar 2023 00:53:37 +0000 Subject: [PATCH] Improve tasks reliability (#390) * Add broadcasted flag * Improve celery tasks reliability --- api/admin.py | 1 + api/lightning/node.py | 4 +- api/management/commands/follow_invoices.py | 69 ++++++--- api/models.py | 2 + api/tasks.py | 139 ++++++++++-------- docker-compose.yml | 2 +- .../EncryptedSocketChat/index.tsx | 9 +- requirements.txt | 4 +- 8 files changed, 139 insertions(+), 91 deletions(-) diff --git a/api/admin.py b/api/admin.py index 959fe5d4..4aba39ab 100644 --- a/api/admin.py +++ b/api/admin.py @@ -290,6 +290,7 @@ class OnchainPaymentAdmin(AdminChangeLinksMixin, admin.ModelAdmin): "address", "concept", "status", + "broadcasted", "num_satoshis", "hash", "swap_fee_rate", diff --git a/api/lightning/node.py b/api/lightning/node.py index 31cecdf0..218ba388 100644 --- a/api/lightning/node.py +++ b/api/lightning/node.py @@ -158,7 +158,9 @@ class LNNode: request, metadata=[("macaroon", MACAROON.hex())] ) - onchainpayment.txid = response.txid + if response.txid: + onchainpayment.txid = response.txid + onchainpayment.broadcasted = True onchainpayment.save() return True diff --git a/api/management/commands/follow_invoices.py b/api/management/commands/follow_invoices.py index e7342fc5..30593264 100644 --- a/api/management/commands/follow_invoices.py +++ b/api/management/commands/follow_invoices.py @@ -152,45 +152,66 @@ class Command(BaseCommand): queryset_retries = LNPayment.objects.filter( type=LNPayment.Types.NORM, - status__in=[LNPayment.Status.VALIDI, LNPayment.Status.FAILRO], + status=LNPayment.Status.FAILRO, in_flight=False, + routing_attempts__in=[1, 2], last_routing_time__lt=( timezone.now() - timedelta(minutes=int(config("RETRY_TIME"))) ), ) - queryset = queryset.union(queryset_retries) + # Payments that still have the in_flight flag whose last payment attempt was +3 min ago + # are probably stuck. We retry them. The follow_send_invoice() task can also do TrackPaymentV2 if the + # previous attempt is still ongoing + queryset_stuck = LNPayment.objects.filter( + type=LNPayment.Types.NORM, + status__in=[LNPayment.Status.FAILRO, LNPayment.Status.FLIGHT], + in_flight=True, + last_routing_time__lt=(timezone.now() - timedelta(minutes=3)), + ) - if len(queryset) > 0: - for lnpayment in queryset: + queryset = queryset.union(queryset_retries).union(queryset_stuck) + + for lnpayment in queryset: + # Checks that this onchain payment is part of an order with a settled escrow + if not hasattr(lnpayment, "order_paid_LN"): + self.stdout.write(f"Ln payment {str(lnpayment)} has no parent order!") + return + order = lnpayment.order_paid_LN + if order.trade_escrow.status == LNPayment.Status.SETLED: follow_send_payment.delay(lnpayment.payment_hash) def send_onchain_payments(self): queryset = OnchainPayment.objects.filter( status=OnchainPayment.Status.QUEUE, + broadcasted=False, ) - if len(queryset) > 0: - for onchainpayment in queryset: - # Checks that this onchain payment is part of an order with a settled escrow - if not hasattr(onchainpayment, "order_paid_TX"): - self.stdout.write( - f"Onchain payment {str(onchainpayment)} has no parent order!" - ) - return - order = onchainpayment.order_paid_TX - if order.trade_escrow.status == LNPayment.Status.SETLED: - # Sends out onchainpayment - LNNode.pay_onchain( - onchainpayment, - OnchainPayment.Status.QUEUE, - OnchainPayment.Status.MEMPO, - ) - else: - self.stdout.write( - f"Onchain payment {str(onchainpayment)} for order {str(order)} escrow is not settled!" - ) + for onchainpayment in queryset: + # Checks that this onchain payment is part of an order with a settled escrow + if not hasattr(onchainpayment, "order_paid_TX"): + self.stdout.write( + f"Onchain payment {str(onchainpayment)} has no parent order!" + ) + return + order = onchainpayment.order_paid_TX + if ( + order.trade_escrow.status == LNPayment.Status.SETLED + and order.trade_escrow.num_satoshis >= onchainpayment.num_satoshis + ): + # Sends out onchainpayment + LNNode.pay_onchain( + onchainpayment, + OnchainPayment.Status.QUEUE, + OnchainPayment.Status.MEMPO, + ) + onchainpayment.save() + + else: + self.stdout.write( + f"Onchain payment {str(onchainpayment)} for order {str(order)} escrow is not settled!" + ) def update_order_status(self, lnpayment): """Background process following LND hold invoices diff --git a/api/models.py b/api/models.py index 94cc0f6c..2d702ed2 100644 --- a/api/models.py +++ b/api/models.py @@ -203,6 +203,8 @@ class OnchainPayment(models.Model): choices=Status.choices, null=False, default=Status.CREAT ) + broadcasted = models.BooleanField(default=False, null=False, blank=False) + # payment info address = models.CharField( max_length=100, unique=False, default=None, null=True, blank=True diff --git a/api/tasks.py b/api/tasks.py index d53533b4..a9e024a5 100644 --- a/api/tasks.py +++ b/api/tasks.py @@ -86,6 +86,8 @@ def follow_send_payment(hash): from api.models import LNPayment, Order lnpayment = LNPayment.objects.get(payment_hash=hash) + lnpayment.last_routing_time = timezone.now() + lnpayment.save() # Default is 0ppm. Set by the user over API. Client's default is 1000 ppm. fee_limit_sat = int( @@ -101,74 +103,75 @@ def follow_send_payment(hash): ) order = lnpayment.order_paid_LN + if order.trade_escrow.num_satoshis < lnpayment.num_satoshis: + print(f"Order: {order.id} Payout is larger than collateral !?") + return + + def handle_response(response): + lnpayment.status = LNPayment.Status.FLIGHT + lnpayment.in_flight = True + lnpayment.save() + order.status = Order.Status.PAY + order.save() + + if response.status == 0: # Status 0 'UNKNOWN' + # Not sure when this status happens + print(f"Order: {order.id} UNKNOWN. Hash {hash}") + lnpayment.in_flight = False + lnpayment.save() + + if response.status == 1: # Status 1 'IN_FLIGHT' + print(f"Order: {order.id} IN_FLIGHT. Hash {hash}") + + if response.status == 3: # Status 3 'FAILED' + lnpayment.status = LNPayment.Status.FAILRO + lnpayment.last_routing_time = timezone.now() + lnpayment.routing_attempts += 1 + lnpayment.failure_reason = response.failure_reason + lnpayment.in_flight = False + if lnpayment.routing_attempts > 2: + lnpayment.status = LNPayment.Status.EXPIRE + lnpayment.routing_attempts = 0 + lnpayment.save() + + order.status = Order.Status.FAI + order.expires_at = timezone.now() + timedelta( + seconds=order.t_to_expire(Order.Status.FAI) + ) + order.save() + print( + f"Order: {order.id} FAILED. Hash: {hash} Reason: {LNNode.payment_failure_context[response.failure_reason]}" + ) + return { + "succeded": False, + "context": f"payment failure reason: {LNNode.payment_failure_context[response.failure_reason]}", + } + + if response.status == 2: # Status 2 'SUCCEEDED' + print(f"SUCCEEDED. Order: {order.id}. Hash: {hash}") + lnpayment.status = LNPayment.Status.SUCCED + lnpayment.fee = float(response.fee_msat) / 1000 + lnpayment.preimage = response.payment_preimage + lnpayment.save() + order.status = Order.Status.SUC + order.expires_at = timezone.now() + timedelta( + seconds=order.t_to_expire(Order.Status.SUC) + ) + order.save() + results = {"succeded": True} + return results + try: for response in LNNode.routerstub.SendPaymentV2( request, metadata=[("macaroon", MACAROON.hex())] ): - lnpayment.in_flight = True - lnpayment.save() - - if response.status == 0: # Status 0 'UNKNOWN' - # Not sure when this status happens - lnpayment.in_flight = False - lnpayment.save() - - if response.status == 1: # Status 1 'IN_FLIGHT' - print("IN_FLIGHT") - lnpayment.status = LNPayment.Status.FLIGHT - lnpayment.in_flight = True - lnpayment.save() - order.status = Order.Status.PAY - order.save() - - if response.status == 3: # Status 3 'FAILED' - print("FAILED") - lnpayment.status = LNPayment.Status.FAILRO - lnpayment.last_routing_time = timezone.now() - lnpayment.routing_attempts += 1 - lnpayment.failure_reason = response.failure_reason - lnpayment.in_flight = False - if lnpayment.routing_attempts > 2: - lnpayment.status = LNPayment.Status.EXPIRE - lnpayment.routing_attempts = 0 - lnpayment.save() - - order.status = Order.Status.FAI - order.expires_at = timezone.now() + timedelta( - seconds=order.t_to_expire(Order.Status.FAI) - ) - order.save() - context = { - "routing_failed": LNNode.payment_failure_context[ - response.failure_reason - ], - "IN_FLIGHT": False, - } - - # If failed due to not route, reset mission control. (This won't scale well, just a temporary fix) - # ResetMC deactivate temporary for tests - # if response.failure_reason==2: - # LNNode.resetmc() - - return False, context - - if response.status == 2: # Status 2 'SUCCEEDED' - print("SUCCEEDED") - lnpayment.status = LNPayment.Status.SUCCED - lnpayment.fee = float(response.fee_msat) / 1000 - lnpayment.preimage = response.payment_preimage - lnpayment.save() - order.status = Order.Status.SUC - order.expires_at = timezone.now() + timedelta( - seconds=order.t_to_expire(Order.Status.SUC) - ) - order.save() - return True, None + handle_response(response) except Exception as e: + if "invoice expired" in str(e): - print("INVOICE EXPIRED") + print(f"Order: {order.id}. INVOICE EXPIRED. Hash: {hash}") lnpayment.status = LNPayment.Status.EXPIRE lnpayment.last_routing_time = timezone.now() lnpayment.in_flight = False @@ -178,8 +181,20 @@ def follow_send_payment(hash): seconds=order.t_to_expire(Order.Status.FAI) ) order.save() - context = {"routing_failed": "The payout invoice has expired"} - return False, context + results = {"succeded": False, "context": "The payout invoice has expired"} + return results + + if "payment is in transition" in str(e): + print(f"Order: {order.id}. ALREADY IN TRANSITION. Hash: {hash}.") + + request = LNNode.routerrpc.TrackPaymentRequest( + payment_hash=bytes.fromhex(hash) + ) + + for response in LNNode.routerstub.TrackPaymentV2( + request, metadata=[("macaroon", MACAROON.hex())] + ): + handle_response(response) @shared_task(name="payments_cleansing") diff --git a/docker-compose.yml b/docker-compose.yml index 49e8f423..3cd81c6b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -100,7 +100,7 @@ services: volumes: - .:/usr/src/robosats - ./node/lnd:/lnd - command: celery -A robosats worker --loglevel=WARNING + command: celery -A robosats worker --loglevel=INFO --concurrency 4 --max-tasks-per-child=4 --max-memory-per-child=200000 depends_on: - redis network_mode: service:tor diff --git a/frontend/src/components/TradeBox/EncryptedChat/EncryptedSocketChat/index.tsx b/frontend/src/components/TradeBox/EncryptedChat/EncryptedSocketChat/index.tsx index b8f23be9..92d4df2c 100644 --- a/frontend/src/components/TradeBox/EncryptedChat/EncryptedSocketChat/index.tsx +++ b/frontend/src/components/TradeBox/EncryptedChat/EncryptedSocketChat/index.tsx @@ -1,4 +1,4 @@ -import React, { useEffect, useState } from 'react'; +import React, { useEffect, useLayoutEffect, useState } from 'react'; import { useTranslation } from 'react-i18next'; import { Button, Tooltip, TextField, Grid, Paper } from '@mui/material'; import { encryptMessage, decryptMessage } from '../../../../pgp'; @@ -73,6 +73,13 @@ const EncryptedSocketChat: React.FC = ({ } }, [status]); + useLayoutEffect(() => { + // On component unmount close reconnecting-websockets + return () => { + connection?.close(); + }; + }, []); + useEffect(() => { if (messages.length > messageCount) { audio.play(); diff --git a/requirements.txt b/requirements.txt index 9750c4ed..4866ead7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ django-timezone-field==4.2.3 djangorestframework==3.13.1 channels==3.0.4 channels-redis==3.3.1 -celery==5.2.3 +celery==5.2.7 grpcio==1.43.0 googleapis-common-protos==1.53.0 grpcio-tools==1.43.0 @@ -21,7 +21,7 @@ ring==0.9.1 git+https://github.com/Reckless-Satoshi/Robohash.git scipy==1.8.0 gunicorn==20.1.0 -psycopg2==2.9.3 +psycopg2==2.9.5 SQLAlchemy==1.4.31 django-import-export==2.7.1 requests[socks]