robosats/tests/utils/node.py
2023-11-22 13:16:25 +00:00

303 lines
8.8 KiB
Python

import codecs
import sys
import time
import requests
from decouple import config
from requests.auth import HTTPBasicAuth
from requests.exceptions import ReadTimeout
LNVENDOR = config("LNVENDOR", cast=str, default="LND")
WAIT_STEP = 0.2
def get_node(name="robot"):
"""
We have two regtest LND nodes: "coordinator" (the robosats backend) and "robot" (the robosats user)
"""
if name == "robot":
macaroon = codecs.encode(
open("/lndrobot/data/chain/bitcoin/regtest/admin.macaroon", "rb").read(),
"hex",
)
port = 8080
elif name == "coordinator":
macaroon = codecs.encode(
open("/lnd/data/chain/bitcoin/regtest/admin.macaroon", "rb").read(), "hex"
)
port = 8081
return {"port": port, "headers": {"Grpc-Metadata-macaroon": macaroon}}
def get_lnd_node_id(node_name):
node = get_node(node_name)
response = requests.get(
f'http://localhost:{node["port"]}/v1/getinfo', headers=node["headers"]
)
data = response.json()
return data["identity_pubkey"]
def get_cln_node_id():
from api.lightning.cln import CLNNode
response = CLNNode.get_info()
return response.id.hex()
def wait_for_lnd_node_sync(node_name):
node = get_node(node_name)
waited = 0
while True:
response = requests.get(
f'http://localhost:{node["port"]}/v1/getinfo', headers=node["headers"]
)
if response.json()["synced_to_chain"]:
return
else:
sys.stdout.write(
f"\rWaiting for {node_name} node chain sync {round(waited, 1)}s"
)
sys.stdout.flush()
waited += WAIT_STEP
time.sleep(WAIT_STEP)
def LND_has_active_channels(node_name):
node = get_node(node_name)
response = requests.get(
f'http://localhost:{node["port"]}/v1/getinfo', headers=node["headers"]
)
return True if response.json()["num_active_channels"] > 0 else False
def CLN_has_active_channels():
from api.lightning.cln import CLNNode
response = CLNNode.get_info()
return True if response.num_active_channels > 0 else False
def wait_for_active_channels(lnvendor, node_name="coordinator"):
waited = 0
while True:
if lnvendor == "LND":
if LND_has_active_channels(node_name):
return
else:
sys.stdout.write(
f"\rWaiting for {node_name} LND node channel to be active {round(waited, 1)}s"
)
elif lnvendor == "CLN":
if CLN_has_active_channels():
return
else:
sys.stdout.write(
f"\rWaiting for {node_name} CLN node channel to be active {round(waited, 1)}s"
)
sys.stdout.flush()
waited += WAIT_STEP
time.sleep(WAIT_STEP)
def wait_for_cln_node_sync():
from api.lightning.cln import CLNNode
waited = 0
while True:
response = CLNNode.get_info()
if response.warning_bitcoind_sync or response.warning_lightningd_sync:
sys.stdout.write(
f"\rWaiting for coordinator CLN node sync {round(waited, 1)}s"
)
sys.stdout.flush()
waited += WAIT_STEP
time.sleep(WAIT_STEP)
else:
return
def wait_for_cln_active_channels():
from api.lightning.cln import CLNNode
waited = 0
while True:
response = CLNNode.get_info()
if response.num_active_channels > 0:
return
else:
sys.stdout.write(
f"\rWaiting for coordinator CLN node channels to be active {round(waited, 1)}s"
)
sys.stdout.flush()
waited += WAIT_STEP
time.sleep(WAIT_STEP)
def wait_nodes_sync():
wait_for_lnd_node_sync("robot")
if LNVENDOR == "LND":
wait_for_lnd_node_sync("coordinator")
elif LNVENDOR == "CLN":
wait_for_cln_node_sync()
def wait_channels():
wait_for_active_channels(LNVENDOR, "coordinator")
wait_for_active_channels("LND", "robot")
def set_up_regtest_network():
if channel_is_active():
print("Regtest network was already ready. Skipping initalization.")
return
# Fund two LN nodes in regtest and open channels
# Coordinator is either LND or CLN. Robot user is always LND.
if LNVENDOR == "LND":
coordinator_node_id = get_lnd_node_id("coordinator")
coordinator_port = 9735
elif LNVENDOR == "CLN":
coordinator_node_id = get_cln_node_id()
coordinator_port = 9737
print("Coordinator Node ID: ", coordinator_node_id)
# Fund both robot and coordinator nodes
robot_funding_address = create_address("robot")
coordinator_funding_address = create_address("coordinator")
generate_blocks(coordinator_funding_address, 1)
generate_blocks(robot_funding_address, 101)
wait_nodes_sync()
# Open channel between Robot user and coordinator
print(f"\nOpening channel from Robot user node to coordinator {LNVENDOR} node")
connect_to_node("robot", coordinator_node_id, f"localhost:{coordinator_port}")
open_channel("robot", coordinator_node_id, 100_000_000, 50_000_000)
# Generate 10 blocks so the channel becomes active and wait for sync
generate_blocks(robot_funding_address, 10)
# Wait a tiny bit so payments can be done in the new channel
wait_nodes_sync()
wait_channels()
time.sleep(1)
def channel_is_active():
robot_channel_active = LND_has_active_channels("robot")
if LNVENDOR == "LND":
coordinator_channel_active = LND_has_active_channels("coordinator")
elif LNVENDOR == "CLN":
coordinator_channel_active = CLN_has_active_channels()
return robot_channel_active and coordinator_channel_active
def connect_to_node(node_name, node_id, ip_port):
node = get_node(node_name)
data = {"addr": {"pubkey": node_id, "host": ip_port}}
while True:
response = requests.post(
f'http://localhost:{node["port"]}/v1/peers',
json=data,
headers=node["headers"],
)
if response.json() == {}:
print("Peered robot node to coordinator node!")
return response.json()
else:
if "already connected to peer" in response.json()["message"]:
return response.json()
print(f"Could not peer coordinator node: {response.json()}")
time.sleep(WAIT_STEP)
def open_channel(node_name, node_id, local_funding_amount, push_sat):
node = get_node(node_name)
data = {
"node_pubkey_string": node_id,
"local_funding_amount": local_funding_amount,
"push_sat": push_sat,
}
response = requests.post(
f'http://localhost:{node["port"]}/v1/channels',
json=data,
headers=node["headers"],
)
return response.json()
def create_address_LND(node_name):
node = get_node(node_name)
response = requests.get(
f'http://localhost:{node["port"]}/v1/newaddress', headers=node["headers"]
)
return response.json()["address"]
def create_address_CLN():
from api.lightning.cln import CLNNode
return CLNNode.newaddress()
def create_address(node_name):
if node_name == "coordinator" and LNVENDOR == "CLN":
return create_address_CLN()
else:
return create_address_LND(node_name)
def generate_blocks(address, num_blocks):
print(f"Mining {num_blocks} blocks")
data = {
"jsonrpc": "1.0",
"id": "curltest",
"method": "generatetoaddress",
"params": [num_blocks, address],
}
response = requests.post(
"http://localhost:18443", json=data, auth=HTTPBasicAuth("test", "test")
)
return response.json()
def pay_invoice(node_name, invoice):
reset_mission_control(node_name)
node = get_node(node_name)
data = {"payment_request": invoice}
try:
response = requests.post(
f'http://localhost:{node["port"]}/v1/channels/transactions',
json=data,
headers=node["headers"],
# 0.15s is enough for LND to LND hodl ACCEPT
# 0.4s is enough for LND to CLN hodl ACCEPT
timeout=0.2 if LNVENDOR == "LND" else 1,
)
print(response.json())
except ReadTimeout:
# Request to pay hodl invoice has timed out: that's good!
return
def reset_mission_control(node_name):
node = get_node(node_name)
requests.post(
f'http://localhost:{node["port"]}//v2/router/resetmissioncontrol',
headers=node["headers"],
)
def add_invoice(node_name, amount):
node = get_node(node_name)
data = {"value": amount}
response = requests.post(
f'http://localhost:{node["port"]}/v1/invoices',
json=data,
headers=node["headers"],
)
return response.json()["payment_request"]