DiCE API nvidia_logo_transpbg.gif Up
Example for the Node Manager
[Previous] [Up]

This example demonstrates the use of the node manager. See Node manager for a general introduction to key concepts of the node manager.

New Topics

  • Node manager process on worker nodes
  • Node manager process on client nodes

Detailed Description

Node manager process on worker nodes


The node manager process running on the worker nodes is quite simple. Its main purpose is to make that node available to the node manager and to execute child processes as requested by the node manager. This processes can be generic, independent of the application logic (whereas the executed child process is typically specific to the application).

The example below is very simple: it allows to configure the networking mode (UDP or TCP), the node manager address, and the address range used for child processes. In addition, one can set custom properties via the command line.

In this example the node manager process on the worker node(s) should be started first. The node manager process on the client node(s) should be started afterwards, e.g., in parallel or one after another.

Node manager process on client nodes


The node manager process running on the client nodes is a bit more complex. Typically, this process is your application that contains additional code to make use of the worker nodes managed by the node manager.

This example for the client node process is based on Example for Fragmented Jobs (distributed) : the "client" part is contained in this example itself, the "worker" part of that example is launched as child process by the node manager. Additionally, the example shows how to enumerate the known worker nodes and their properties, and how to install callbacks to monitor various events.

In this example the node manager process on the worker node(s) should be started first. The node manager process on the client node(s) should be started afterwards, e.g., in parallel or one after another.

Example Source

Source Code Location: examples/example_node_manager_worker.cpp

/******************************************************************************
* Copyright 2023 NVIDIA Corporation. All rights reserved.
*****************************************************************************/
// examples/example_node_manager_worker.cpp
//
// Demonstrates a simple node manager process for worker nodes.
//
// The example expects the following command line arguments:
//
// example_node_manager_worker <mode> <nm_address> <base_address> [<key=value> [...]]
//
// mode either "UDP or "TCP"
// nm_address UDP multicast address or TCP unicast address used by the node manager
// base_address UDP multicast address range used by child processes
// key=value one or more properties in the form key=value
//
// Examples:
//
// UDP multicast
//
// @192.168.1.1: ./example_node_manager_worker UDP 224.1.1.1:1111 224.2.2.2:2222 some_property=foo
// @192.168.1.2: ./example_node_manager_worker UDP 224.1.1.1:1111 224.2.2.2:2222 some_property=bar
//
// TCP with head node
//
// @192.168.1.1: ./example_node_manager_worker TCP 192.168.1.1:1111 224.2.2.2:2222 ...
// @192.168.1.2: ./example_node_manager_worker TCP 192.168.1.1:1111 224.2.2.2:2222 ...
#include <iostream>
#include <string>
#include <mi/dice.h>
// Include code shared by all examples.
#include "example_shared.h"
// An implementation of IChild_process_resolver that accepts all command strings to start child
// processes. This is also the default behavior if no child process resolver is installed.
class Default_child_process_resolver
: public mi::base::Interface_implement<mi::neuraylib::IChild_process_resolver>
{
public:
Default_child_process_resolver( mi::neuraylib::IFactory* factory)
: m_factory( make_handle_dup( factory)) { }
const mi::IString* resolve_process( const char* program_name, const char* /*program_arguments*/)
{
mi::IString* result = m_factory->create<mi::IString>();
result->set_c_str( program_name);
return result;
}
private:
};
int main( int argc, char* argv[])
{
// Check command line parameters
if( argc < 4 || (strcmp( argv[1], "UDP") != 0 && strcmp( argv[1], "TCP") != 0)) {
std::cerr << "Usage: example_node_manager_worker UDP|TCP <nm_address> <base_address> \\\n"
" [<key=value> ...]" << std::endl;
keep_console_open();
return EXIT_FAILURE;
}
const char* mode = argv[1];
const char* node_manager_address = argv[2];
const char* multicast_base_address = argv[3];
// Access the DiCE library
mi::base::Handle<mi::neuraylib::INeuray> neuray( load_and_get_ineuray());
check_success( neuray.is_valid_interface());
// Create an instance of a node manager worker
neuray->get_api_component<mi::neuraylib::INode_manager_factory>());
node_manager_factory->create_worker());
node_manager_factory = 0;
// Install a child process resolver. The implementation used here is equivalent to not using
// a child process resolver at all and is merely used for demonstration purposes.
neuray->get_api_component<mi::neuraylib::IFactory>());
new Default_child_process_resolver( factory.get()));
worker->set_child_process_resolver( child_process_resolver.get());
child_process_resolver = 0;
factory = 0;
// Set worker node properties specified on the command line
for( int i = 4; i < argc; i++) {
const char* equal = strchr( argv[i], '=');
check_success( equal);
std::string key( argv[i], static_cast<mi::Size>( equal-argv[i]));
std::string value( equal+1);
worker->set_property( key.c_str(), value.c_str());
}
// Configure the networking mode and address, start the node manager worker
worker->set_multicast_base_address( multicast_base_address);
mi::Sint32 result = worker->start( node_manager_address, strcmp( mode, "TCP") == 0);
check_success( result == 0);
std::cerr << "Worker node successfully started." << std::endl;
// Wait for other hosts to join/leave the cluster
sleep_seconds( 30);
// Shut down the node manager worker
worker->shutdown();
worker = 0;
neuray = 0;
// Unload the DiCE library
check_success( unload());
keep_console_open();
return EXIT_SUCCESS;
}
A simple string class.
Definition: istring.h:22
virtual void set_c_str(const char *str)=0
Sets the content via a C-style string.
Handle class template for interfaces, automatizing the lifetime control via reference counting.
Definition: handle.h:113
Mixin class template for deriving interface implementations.
Definition: interface_implement.h:41
This API component allows the creation, assignment, and cloning of instances of types.
Definition: ifactory.h:35
Factory to create node manager client and worker instances.
Definition: inode_manager.h:1090
DiCE API.
Handle<Interface> make_handle_dup(Interface *iptr)
Converts passed-in interface pointer to a handle, without taking interface over.
Definition: handle.h:439
Uint64 Size
Unsigned integral type that is large enough to hold the size of all types.
Definition: types.h:112
signed int Sint32
32-bit signed integer.
Definition: types.h:46

Source Code Location: examples/example_node_manager_client.cpp

/******************************************************************************
* Copyright 2023 NVIDIA Corporation. All rights reserved.
*****************************************************************************/
// examples/example_node_manager_client.cpp
//
// Demonstrates a simple node manager process for client nodes.
//
// The example expects the following command line arguments:
//
// example_node_manager_client <mode> <nm_address> <base_address>
//
// mode either "UDP or "TCP"
// nm_address UDP multicast address or TCP unicast address used by the node manager
// base_address UDP multicast address range used by child processes
//
// Examples:
//
// UDP multicast
//
// @192.168.1.1: ./example_node_manager_client UDP 224.1.1.1:1111 224.2.2.2:2222
// @192.168.1.2: ./example_node_manager_client UDP 224.1.1.1:1111 224.2.2.2:2222
//
// TCP with head node
//
// @192.168.1.1: ./example_node_manager_client TCP 192.168.1.1:1111 224.2.2.2:2222
// @192.168.1.2: ./example_node_manager_client TCP 192.168.1.1:1111 224.2.2.2:2222
#include <iostream>
#include <string>
#include <mi/dice.h>
// Include code shared by all examples.
#include "example_shared.h"
// Include definition of My_fragmented_job.
#include "example_fragmented_job.h"
// Dumps a map of properties in the form key=value, one item per line.
void dump_properties( const mi::IMap* properties)
{
mi::Size n_properties = properties->get_length();
for( mi::Size j = 0; j < n_properties; ++j) {
const char* key = properties->get_key( j);
std::cerr << " " << key << "=" << value->get_c_str() << std::endl;
}
}
// An implementation of IWorker_node_filter that accepts all worker nodes.
class Accept_all_worker_nodes
: public mi::base::Interface_implement<mi::neuraylib::IWorker_node_filter>
{
public:
bool is_eligible( const mi::neuraylib::IWorker_node_descriptor*) { return true; }
};
// An implementation of ICluster_filter that accepts all clusters.
class Accept_all_clusters
: public mi::base::Interface_implement<mi::neuraylib::ICluster_filter>
{
public:
bool is_eligible( const mi::neuraylib::ICluster_descriptor*) { return true; }
};
// An implementation of IWorker_node_property_callback that outputs the changed property (or all
// properties if no single change is signalled).
class Worker_node_property_callback
: public mi::base::Interface_implement<mi::neuraylib::IWorker_node_property_callback>
{
public:
void property_callback(
const mi::neuraylib::IWorker_node_descriptor* worker_descriptor,
const char* changed_property_name)
{
mi::base::Handle<const mi::IString> address( worker_descriptor->get_address());
mi::base::Handle<const mi::IMap> properties( worker_descriptor->get_properties());
if( changed_property_name) {
properties->get_value<mi::IString>( changed_property_name));
std::cerr << "Worker node " << address->get_c_str() << " changed property \""
<< changed_property_name << "\" to \"" << property->get_c_str() << "\"."
<< std::endl;
} else {
std::cerr << "Worker node " << address->get_c_str() << " communicated its properties:"
<< std::endl;
dump_properties( properties.get());
}
}
};
// An implementation of ICluster_property_callback that outputs the changed property (or all
// properties if no single change is signalled).
class Cluster_property_callback
: public mi::base::Interface_implement<mi::neuraylib::ICluster_property_callback>
{
public:
void property_callback(
const mi::neuraylib::ICluster_descriptor* cluster_descriptor,
const char* changed_property_name)
{
mi::base::Handle<const mi::IMap> properties( cluster_descriptor->get_properties());
if( changed_property_name) {
properties->get_value<mi::IString>( changed_property_name));
std::cerr << "Cluster " << address->get_c_str() << " changed property \""
<< changed_property_name << "\" to \"" << property->get_c_str() << "\"."
<< std::endl;
} else {
std::cerr << "Cluster " << address->get_c_str() << " communicated its properties:"
<< std::endl;
dump_properties( properties.get());
}
}
};
// An implementation of IClient_node_callback that outputs the client node that joined or left the
// cluster.
class Client_node_callback
: public mi::base::Interface_implement<mi::neuraylib::IClient_node_callback>
{
public:
void membership_callback( const char* address, bool flag)
{
std::cerr << "Client node " << address << (flag ? "joined" : "left")
<< " the cluster." << std::endl;
}
};
// An implementation of IWorker_node_callback that outputs the worker node that joined or left the
// cluster.
class Worker_node_callback
: public mi::base::Interface_implement<mi::neuraylib::IWorker_node_callback>
{
public:
void membership_callback( mi::neuraylib::IWorker_node_descriptor* worker_descriptor, bool flag)
{
mi::base::Handle<const mi::IString> address( worker_descriptor->get_address());
std::cerr << "Worker node " << address->get_c_str() << (flag ? " joined" : " left")
<< " the cluster." << std::endl;
}
};
// Set up network and register the serializable classes.
void configuration( mi::neuraylib::INeuray* neuray, const char* address)
{
// Set UDP networking mode and the given multicast address.
check_success( network_configuration->set_mode(
check_success( network_configuration->set_multicast_address( address) == 0);
// Register the class My_fragmented_job with DiCE.
check_success( dice_configuration.is_valid_interface());
check_success( dice_configuration->register_serializable_class<My_fragmented_job>());
}
// Obtain a DiCE transaction and run the test for the My_fragmented_job class.
void run( mi::neuraylib::INeuray* neuray)
{
g_logger->message( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN", "This host acts as client");
// Get the database, the global scope of the database and create a transaction in the global
// scope.
check_success( database.is_valid_interface());
mi::base::Handle<mi::neuraylib::IScope> scope( database->get_global_scope());
scope->create_transaction<mi::neuraylib::IDice_transaction>());
check_success( transaction.is_valid_interface());
// Execute an instance of the job in n fragments which are delegated to other threads and hosts.
My_fragmented_job fragmented_job;
mi::Size n = My_fragmented_job::N_FRAGMENTS;
transaction->execute_fragmented( &fragmented_job, n);
// Compare the result with the expected result.
mi::Uint64 s = My_fragmented_job::FRAGMENT_SIZE;
mi::Uint64 expected_result = n*s/2 * (n*s-1);
check_success( fragmented_job.sum_results() == expected_result);
transaction->commit();
}
int main( int argc, char* argv[])
{
// Check command line parameters
if( argc != 4 || (strcmp( argv[1], "UDP") != 0 && strcmp( argv[1], "TCP") != 0)) {
std::cerr << "Usage: example_node_manager_client UDP|TCP <nm_address> <base_address>"
<< std::endl;
keep_console_open();
return EXIT_FAILURE;
}
const char* mode = argv[1];
const char* node_manager_address = argv[2];
const char* multicast_base_address = argv[3];
// Access the DiCE library
mi::base::Handle<mi::neuraylib::INeuray> neuray( load_and_get_ineuray());
check_success( neuray.is_valid_interface());
// Create an instance of a node manager client
node_manager_factory->create_client());
node_manager_factory = 0;
// Configure the networking mode and address, start the node manager client
mi::Sint32 result = client->start( node_manager_address, strcmp( mode, "TCP") == 0);
check_success( result == 0);
std::cerr << "Client node successfully started." << std::endl;
client->set_multicast_base_address( multicast_base_address);
// Output information about known worker nodes
std::cerr << "Currently known worker nodes:" << std::endl;
mi::Size n_workers = client->get_number_of_worker_nodes();
for( mi::Size i = 0; i < n_workers; ++i) {
client->get_worker_node( i));
if( !desc.is_valid_interface())
continue;
mi::base::Handle<const mi::IString> address( desc->get_address());
std::cerr << "Address: " << address->get_c_str()
<< ", in cluster: " << (desc->is_in_cluster() ? "yes": "no")
<< ", properties:" << std::endl;
mi::base::Handle<const mi::IMap> properties( desc->get_properties());
dump_properties( properties.get());
}
// Attempt to join an existing cluster of at least one worker node. If there is no such cluster
// yet, create a cluster of one arbitrary worker node and execute the example for fragmented
// jobs in the network as child process on the worker node.
Accept_all_clusters accept_all_clusters;
Accept_all_worker_nodes accept_all_worker_nodes;
client->join_or_create_cluster( 1, 1, &accept_all_clusters, &accept_all_worker_nodes,
"./example_fragmented_job_networking", "worker %m"));
check_success( cluster.is_valid_interface());
check_success( cluster->get_cluster_status()
// Install our callbacks
new Worker_node_callback);
cluster->add_worker_node_callback( worker_node_callback.get());
new Client_node_callback);
cluster->add_client_node_callback( client_node_callback.get());
new Worker_node_property_callback);
cluster->add_worker_property_callback( worker_node_property_callback.get());
new Cluster_property_callback);
cluster->add_cluster_property_callback( cluster_property_callback.get());
// Use the cluster. The following section including the called methods is essentially the
// "client" part of the example for fragmented job executed in the network.
{
// Configure the DiCE library
cluster->get_cluster_descriptor());
configuration( neuray.get(), address->get_c_str());
g_logger = logging_configuration->get_forwarding_logger();
logging_configuration = 0;
// Start the DiCE library
mi::Sint32 result = neuray->start();
check_start_success( result);
// Run the tests for My_fragmented_job
run( neuray.get());
// Shut down the DiCE library
check_success( neuray->shutdown() == 0);
g_logger = 0;
}
// Leave cluster
cluster->shutdown();
cluster = 0;
// Shut down the node manager client
client->shutdown();
client = 0;
neuray = 0;
// Unload the DiCE library
check_success( unload());
keep_console_open();
return EXIT_SUCCESS;
}
virtual const char * get_key(Size index) const =0
Returns the key corresponding to index.
virtual const base::IInterface * get_value(const char *key) const =0
Returns the value for key key.
This interface represents maps, i.e., a key-value based data structure.
Definition: imap.h:41
virtual Size get_length() const =0
Returns the size of the map.
This interface describes a cluster and its properties.
Definition: inode_manager.h:118
virtual const IString * get_multicast_address() const =0
Returns the multicast address reserved for the cluster.
virtual const IMap * get_properties() const =0
Returns the properties of the cluster.
This interface is used to interact with the distributed database.
Definition: idatabase.h:293
This interface allows configuration of DiCE.
Definition: dice.h:82
A transaction provides a consistent view on the database.
Definition: dice.h:272
This interface is used for configuring the logging for the DiCE library.
Definition: ilogging_configuration.h:68
This interface is used to query and change the networking configuration.
Definition: inetwork_configuration.h:33
@ MODE_UDP
Networking is switched to UDP mode with multicast.
Definition: inetwork_configuration.h:53
This is an object representing the DiCE library.
Definition: ineuray.h:44
virtual Sint32 shutdown(bool blocking=true)=0
Shuts down the library.
virtual base::IInterface * get_api_component(const base::Uuid &uuid) const =0
Returns an API component from the DiCE API.
virtual Sint32 start(bool blocking=true)=0
Starts the operation of the DiCE library.
@ CLUSTER_ESTABLISHED
The cluster has been successfully established.
Definition: inode_manager.h:367
This interface describes a worker node and its properties.
Definition: inode_manager.h:87
virtual const IString * get_address() const =0
Returns the IP address of the worker node.
virtual const IMap * get_properties() const =0
Returns the properties of the worker node.
virtual void message(Message_severity level, const char *module_category, const char *message)
Emits a message to the application's log.
Definition: ilogger.h:215
@ MESSAGE_SEVERITY_INFO
This is a normal operational message.
Definition: enums.h:39
unsigned long long Uint64
64-bit unsigned integer.
Definition: types.h:62
[Previous] [Up]