import argparse import asyncio from dotenv import load_dotenv from naptha_sdk.client.naptha import Naptha from naptha_sdk.user import get_public_key, generate_user from naptha_sdk.schemas import ModuleRun import os import shlex import time import yaml import json load_dotenv() def load_yaml_to_dict(file_path): with open(file_path, 'r') as file: # Load the YAML content into a Python dictionary yaml_content = yaml.safe_load(file) return yaml_content def creds(naptha): return naptha.services.show_credits() async def list_services(naptha): services = await naptha.hub.list_services() for service in services: print(service) async def list_nodes(naptha): nodes = await naptha.hub.list_nodes() for node in nodes: print(node) async def list_modules(naptha): modules = await naptha.hub.list_modules() for module in modules: print(module) async def list_tasks(naptha): tasks = await naptha.hub.list_tasks() for task in tasks: print(task) async def list_rfps(naptha): rfps = await naptha.hub.list_rfps() for rfp in rfps: print(rfp) def generate_new_user(): _, private_key = generate_user() print("PRIVATE_KEY: ", private_key) async def run( naptha, module_name, parameters=None, worker_nodes=None, yaml_file=None, ): if yaml_file and parameters: raise ValueError("Cannot pass both yaml_file and parameters") if yaml_file: parameters = load_yaml_to_dict(yaml_file) module_run_input = { 'consumer_id': naptha.user["id"], "module_name": module_name, 'worker_nodes': worker_nodes, "module_params": parameters, } print(f"Running module {module_name} with parameters: {module_run_input}") print("Checking user...") user = await naptha.node.check_user(user_input=naptha.user) if user["is_registered"] == True: print("Found user...", user) elif user["is_registered"] == False: print("No user found. Registering user...") user = await naptha.node.register_user(user_input=user) print(f"User registered: {user}.") print("Running...") module_run = await naptha.node.run_task(module_run_input) if isinstance(module_run, dict): module_run = ModuleRun(**module_run) print(f"Module Run ID: {module_run.id}") current_results_len = 0 while True: module_run = await naptha.node.check_task(module_run) if isinstance(module_run, dict): module_run = ModuleRun(**module_run) output = f"{module_run.status} {module_run.module_type} {module_run.module_name}" if len(module_run.child_runs) > 0: output += f", task {len(module_run.child_runs)} {module_run.child_runs[-1].module_name} (node: {module_run.child_runs[-1].worker_nodes[0]})" print(output) if len(module_run.results) > current_results_len: print("Output: ", module_run.results[-1]) current_results_len += 1 if module_run.status == 'completed': break if module_run.status == 'error': break time.sleep(3) if module_run.status == 'completed': print(module_run.results) else: print(module_run.error_message) async def read_storage(naptha, hash_or_name, output_dir='./files', ipfs=False): """Read from storage, IPFS, or IPNS.""" try: await naptha.node.read_storage(hash_or_name.strip(), output_dir, ipfs=ipfs) except Exception as err: print(f"Error: {err}") async def write_storage(naptha, storage_input, ipfs=False, publish_to_ipns=False, update_ipns_name=None): """Write to storage, optionally to IPFS and/or IPNS.""" try: response = await naptha.node.write_storage(storage_input, ipfs=ipfs, publish_to_ipns=publish_to_ipns, update_ipns_name=update_ipns_name) print(response) except Exception as err: print(f"Error: {err}") async def main(): user, _ = generate_user(os.getenv("PRIVATE_KEY")) hub_url = os.getenv("HUB_URL") hub_username = os.getenv("HUB_USER") hub_password = os.getenv("HUB_PASS") hf_username = os.getenv("HF_USERNAME") hf_access_token = os.getenv("HF_ACCESS_TOKEN") node_url = os.getenv("NODE_URL", None) routing_url = os.getenv("ROUTING_URL", None) indirect_node_id = os.getenv("INDIRECT_NODE_ID", None) naptha = await Naptha( user=user, hub_username=hub_username, hub_password=hub_password, hf_username=hf_username, hf_access_token=hf_access_token, hub_url=hub_url, node_url=node_url, routing_url=routing_url, indirect_node_id=indirect_node_id, ) parser = argparse.ArgumentParser(description="CLI with for Naptha") subparsers = parser.add_subparsers(title="commands", dest="command") # Node commands nodes_parser = subparsers.add_parser("nodes", help="List available nodes.") # Module commands modules_parser = subparsers.add_parser("modules", help="List available modules.") # Task commands tasks_parser = subparsers.add_parser("tasks", help="List available tasks.") # RFP commands rfps_parser = subparsers.add_parser("rfps", help="List available RFPs.") # Run command run_parser = subparsers.add_parser("run", help="Execute run command.") run_parser.add_argument("module", help="Select the module to run") run_parser.add_argument("-p", '--parameters', type=str, help='Parameters in "key=value" format') run_parser.add_argument("-n", "--worker_nodes", help="Worker nodes to take part in module runs.") run_parser.add_argument("-f", "--file", help="YAML file with module parameters") user_parser = subparsers.add_parser("user", help="Generate user.") # Credits command credits_parser = subparsers.add_parser("credits", help="Show available credits.") services_parser = subparsers.add_parser("services", help="Show available services.") # Read storage commands read_storage_parser = subparsers.add_parser("read_storage", help="Read from storage.") read_storage_parser.add_argument("-id", "--module_run_id", help="Module run ID to read from") read_storage_parser.add_argument("-o", "--output_dir", default="files", help="Output directory to write to") read_storage_parser.add_argument("--ipfs", help="Read from IPFS", action="store_true") # Write storage commands write_storage_parser = subparsers.add_parser("write_storage", help="Write to storage.") write_storage_parser.add_argument("-i", "--storage_input", help="Path to file or directory to write to storage") write_storage_parser.add_argument("--ipfs", help="Write to IPFS", action="store_true") write_storage_parser.add_argument("--publish_to_ipns", help="Publish to IPNS", action="store_true") write_storage_parser.add_argument("--update_ipns_name", help="Update IPNS name") # Parse arguments args = parser.parse_args() if args.command == "credits": creds(naptha) elif args.command == "services": await list_services(naptha) elif args.command == "nodes": await list_nodes(naptha) elif args.command == "modules": await list_modules(naptha) elif args.command == "tasks": await list_tasks(naptha) elif args.command == "rfps": await list_rfps(naptha) elif args.command == "user": generate_new_user() elif args.command == "run": if hasattr(args, 'parameters') and args.parameters is not None: try: # First, try to parse as JSON parsed_params = json.loads(args.parameters) except json.JSONDecodeError: # If JSON parsing fails, fall back to the original method params = shlex.split(args.parameters) parsed_params = {} for param in params: key, value = param.split('=') parsed_params[key] = value else: parsed_params = None if hasattr(args, 'worker_nodes') and args.worker_nodes is not None: worker_nodes = args.worker_nodes.split(',') else: worker_nodes = None await run(naptha, args.module, parsed_params, worker_nodes, args.file) elif args.command == "read_storage": await read_storage(naptha, args.module_run_id, args.output_dir, args.ipfs) elif args.command == "write_storage": await write_storage(naptha, args.storage_input, args.ipfs, args.publish_to_ipns, args.update_ipns_name) else: parser.print_help() def cli(): asyncio.run(main()) if __name__ == "__main__": asyncio.run(main())