DiCE API nvidia_logo_transpbg.gif Up
Example for Multiple Clusters
[Previous] [Next] [Up]

This example demonstrates the use of additional clusters. It is heavily based on the five preceding examples and extends the concepts shown there to additional clusters.

New Topics

  • Joining multiple clusters
  • Distributing data and jobs in multiple clusters

Detailed Description

Joining multiple clusters


The examples for networking, fragmented jobs and the distributed cache demonstrate how to create a cluster and how to distribute jobs or data in the cluster. The cluster that is implicitly joined via mi::neuraylib::INeuray::start() and left via mi::neuraylib::INeuray::shutdown() is also called the main cluster.

In addition, it is also possible to dynamically join and leave other clusters, henceforth referred to as additional clusters. All clusters, the main cluster and additional clusters, are independent from each other.

Note that a priori there is no difference between the main cluster and additional clusters. The differences (and hence the different terms) are a consequence of how these clusters are used by the different hosts. A certain cluster might be the main cluster for one host, but an additional cluster for a different host. A host may be part of an additional cluster without being part of a main cluster.

Noteworthy differences between main and additional clusters are:

  • additional clusters can be dynamically joined and left (and re-joined) while the main cluster is joined and left during startup and shutdown, and
  • cluster-wide logging is only available via the main cluster.

The main() function of the example shows how to use an additional cluster: The API component mi::neuraylib::ICluster_factory is used to create an instance of mi::neuraylib::ICluster for each additional UDP multicast address given on the command line. This interface is used to configure the cluster-specific functionality for the additional cluster (while the instance of mi::neuraylib::INeuray is used to configure the cluster-specific functionality for the main cluster as well as non-cluster-specific functionality). Finally, join() is called to join the cluster.

Distributing data and jobs in multiple clusters


The additional clusters are used in the same way as the main cluster. Just make sure that the API components for the database and the distributed cache are obtained from the correct instance of mi::neuraylib::ICluster instead of from mi::neuraylib::INeuray as for the main cluster.

The example runs the tests from the previous examples for each cluster in turn, starting with the main cluster and then looping through the additional clusters. One host, the client, triggers the jobs and distributed cache actions, while all other hosts, the workers, just wait some time to execute jobs or to receive data via the distributed cache.

Example Source

Source Code Location: examples/example_multiple_clusters.cpp

/******************************************************************************
* Copyright 2023 NVIDIA Corporation. All rights reserved.
*****************************************************************************/
// examples/example_multiple_clusters.cpp
//
// Demonstrates usage of multiple clusters (main cluster and additional clusters).
//
// The example expects the following command line arguments:
//
// example_multiple_clusters <mode> <multicast_address0> [<multicast_address1> [...]]
//
// mode either "client" or "worker"
// multicast_address0 UDP multicast address for the main cluster
// multicast_address1 UDP multicast address for the first additional cluster
// multicast_address2 UDP multicast address for the second additional cluster
// ...
//
// Examples:
//
// Single host
//
// @192.168.1.1: ./example_multiple_clusters client 224.1.1.1:1111
//
// One cluster with two hosts
//
// @192.168.1.1: ./example_multiple_clusters worker 224.1.1.1:1111
// @192.168.1.2: ./example_multiple_clusters client 224.1.1.1:1111
//
// Two clusters with two hosts each
//
// @192.168.1.1: ./example_multiple_clusters worker 224.1.1.1:1111
// @192.168.1.2: ./example_multiple_clusters worker 224.2.2.2:2222
// @192.168.1.3: ./example_multiple_clusters client 224.1.1.1:1111 224.2.2.2:2222
#include <iostream>
#include <vector>
#include <mi/dice.h>
// Include code shared by all examples.
#include "example_shared.h"
// Include definition of My_element and My_job.
#include "example_element_and_job.h"
// Include definition of My_fragmented_job.
#include "example_fragmented_job.h"
// Include definition of My_serializable_1, My_serializable_2, and Object_receiver.
#include "example_distributed_cache.h"
// This example implementation of the IHost_callback interface just prints a line
// for every event.
class Host_callback : public mi::base::Interface_implement<mi::neuraylib::IHost_callback>
{
public:
void connection_callback( mi::Uint32 host_id, bool flag)
{
if( flag) {
fprintf( stderr,
"The connection to the cluster was established. Own host id is %u.\n", host_id);
m_own_host_id = host_id;
}
else
fprintf( stderr, "The connection to the cluster was lost.\n");
}
void membership_callback( mi::Uint32 host_id, bool flag)
{
if( flag)
fprintf( stderr, "Host %u joined the cluster.\n", host_id);
else
fprintf( stderr, "Host %u left the cluster.\n", host_id);
}
void property_callback( mi::Uint32 host_id, const mi::neuraylib::IHost_properties* properties)
{
fprintf( stderr, "Host %u communicated its properties:\n", host_id);
mi::base::Handle<const mi::IString> value( properties->get_property( "application_name"));
if( value.is_valid_interface())
fprintf( stderr, " application_name: %s\n", value->get_c_str());
}
void synchronizer_callback( mi::Uint32 host_id)
{
fprintf( stderr, "The synchronizer is now host %u", host_id);
if( m_own_host_id == host_id)
fprintf( stderr, " (this host)");
fprintf( stderr, ".\n");
}
void database_status_callback( const char* status)
{
fprintf( stderr, "The database reports its status as \"%s\".\n", status);
}
private:
mi::Uint32 m_own_host_id;
};
// Register the serializable classes.
void configuration( mi::neuraylib::INeuray* neuray)
{
// Register the classes My_element, My_job, and My_fragmented_job with DiCE.
check_success( dice_configuration.is_valid_interface());
check_success( dice_configuration->register_serializable_class<My_element>());
check_success( dice_configuration->register_serializable_class<My_job>());
check_success( dice_configuration->register_serializable_class<My_fragmented_job>());
}
// Configure a cluster (main cluster or additional cluster).
void cluster_configuration(
mi::neuraylib::INetwork_configuration* network_configuration,
const char* multicast_address)
{
// Set networking mode
check_success( network_configuration->set_multicast_address( multicast_address) == 0);
// Register callback handler
network_configuration->register_host_callback( host_callback);
}
// Test the classes My_element and My_job.
void test_my_element_and_my_job( mi::neuraylib::IDice_transaction* transaction)
{
// Create a new database element.
mi::base::Handle<My_element> element( new My_element( 42));
// Check that there is no database element yet with the name "the_element". Database elements
// are typically identified via their tags. Names are just for convenience of the user.
mi::neuraylib::Tag element_tag = transaction->name_to_tag( "the_element");
check_success( !element_tag);
// Store the database element and obtain the resulting tag. Note that after the element is
// stored in the database the pointer to it may *not* be used anymore. Access the element via
// the database if you need it later.
element_tag = transaction->store( element.get(), mi::neuraylib::Tag(), "the_element");
check_success( element_tag);
// Check that there is now a database element with the name "the_element".
mi::neuraylib::Tag element_tag2 = transaction->name_to_tag( "the_element");
check_success( element_tag2 == element_tag);
// Access the database element and check the value.
mi::base::Handle<const My_element> access1( transaction->access<My_element>( element_tag));
check_success( access1->get_data() == 42);
// Edit the stored database element. The will create a new version of the database element,
// such that access1 is still valid.
mi::base::Handle<My_element> edit( transaction->edit<My_element>( element_tag));
edit->set_data( 2);
// Commit the edited database element. The same can be achieved by letting the object go out
// of scope (or by calling edit.reset()).
edit = 0;
// Check that the initial access is unchanged.
check_success( access1->get_data() == 42);
// Re-access the database element and check that is shows the new value.
mi::base::Handle<const My_element> access2( transaction->access<My_element>( element_tag));
check_success( access2->get_data() == 2);
// Create a job and reference the database element stored above. The job will access it during
// its execution. The execution may be done on a different host. In this case the database
// element will be transported to that host.
mi::base::Handle<My_job> job( new My_job( element_tag));
// Check that there is no job yet with the name "the_job". Jobs are typically identified via
// their tags. Names are just for convenience of the user.
mi::neuraylib::Tag job_tag = transaction->name_to_tag( "the_job");
check_success( !job_tag);
// Store the job in the database. As for the database element do *not* use the pointer to the
// job anymore.
job_tag = transaction->store( job.get(), mi::neuraylib::Tag(), "the_job");
check_success( job_tag);
// Check that there is now a job with the name "the_job".
mi::neuraylib::Tag job_tag2 = transaction->name_to_tag( "the_job");
check_success( job_tag2 == job_tag);
// Advise the job. This tells the scheduler that the job result will be needed soon. Advising
// the job instead of directly accessing its result allows the scheduler to execute it already
// asynchronously in the background.
transaction->advise( job_tag);
// Access the job result. This method does not return before the job was executed. It returns a
// database element of type My_element.
mi::base::Handle<const My_element> access3( transaction->access<My_element>( job_tag));
// Check the result of the job execution.
check_success( access3->get_data() == 4);
}
// Test the My_fragmented_job class.
void test_my_fragmented_job( mi::neuraylib::IDice_transaction* transaction)
{
// Execute an instance of the job in n fragments which are delegated to other threads and hosts.
My_fragmented_job fragmented_job;
mi::Size n = My_fragmented_job::N_FRAGMENTS;
transaction->execute_fragmented( &fragmented_job, n);
// Compare the result with the expected result.
mi::Uint64 s = My_fragmented_job::FRAGMENT_SIZE;
mi::Uint64 expected_result = n*s/2 * (n*s-1);
check_success( fragmented_job.sum_results() == expected_result);
}
// Test the distributed cache by adding some objects to the distributed cache and removing them
// again.
void test_distributed_cache( mi::neuraylib::IDistributed_cache* distributed_cache)
{
mi::Uint64 N = 5;
// Generate N objects and store them in the distributed cache.
// Alternate between both serializable classes. We use the index squared as example data.
IMy_serializable** objects = new IMy_serializable*[N];
for( mi::Uint64 i = 0; i < N; ++i) {
IMy_serializable* object = ( (i+1)%2 )
? static_cast<IMy_serializable*>( new My_serializable_1)
: static_cast<IMy_serializable*>( new My_serializable_2);
object->set_data( i*i);
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Storing object with index %llu at address %p with data %llu.",
i, object, object->get_data());
distributed_cache->store_object( object);
objects[i] = object;
sleep_seconds( 1);
}
// Remove objects from the distributed cache and release them.
for( mi::Uint64 i = 0; i < N; ++i) {
IMy_serializable* object = objects[i];
objects[i] = 0;
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Removing object with index %llu at address %p with data %llu.",
i, object, object->get_data());
distributed_cache->remove_object( object);
object->release();
sleep_seconds( 1);
}
delete[] objects;
}
// Run the tests for elements, jobs, fragmented jobs, and the distributed cache on the clusters
// identified by \p database and \p distributed cache.
void run_tests(
{
// Get the global scope of the database and create a transaction in the global scope.
scope->create_transaction<mi::neuraylib::IDice_transaction>());
check_success( transaction.is_valid_interface());
// Test the My_element and My_job class.
test_my_element_and_my_job( transaction.get());
// Test the My_fragmented_job class.
test_my_fragmented_job( transaction.get());
transaction->commit();
// Test the distributed cache.
test_distributed_cache( distributed_cache);
}
// On the client call run_tests() for the main cluster as well as for all additional clusters.
void run_client(
{
g_logger->message(
mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN", "This host acts as client.");
g_logger->message(
mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN", "Running tests on main cluster.");
check_success( database.is_valid_interface());
check_success( distributed_cache.is_valid_interface());
run_tests( database.get(), distributed_cache.get());
for( mi::Size i = 0; i < clusters.size(); ++i) {
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Running tests on additional cluster %llu.", i);
cluster->get_api_component<mi::neuraylib::IDatabase>());
check_success( cluster_database.is_valid_interface());
cluster->get_api_component<mi::neuraylib::IDistributed_cache>());
check_success( cluster_distributed_cache.is_valid_interface());
run_tests( cluster_database.get(), cluster_distributed_cache.get());
}
}
// On the workers just wait some time for jobs or fragmented jobs to execute. Also monitor the
// objects added to and removed from the distributed cache.
void run_worker(
{
g_logger->message(
mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN", "This host acts as worker.");
// Create a receiver instance for each serializable class and register them with
// the corresponding class IDs with each distributed cache.
Object_receiver object_receiver_1( 1);
Object_receiver object_receiver_2( 2);
check_success( distributed_cache.is_valid_interface());
distributed_cache->register_receiver( My_serializable_1::IID(), &object_receiver_1);
distributed_cache->register_receiver( My_serializable_2::IID(), &object_receiver_2);
for( mi::Size i = 0; i < clusters.size(); ++i) {
cluster->get_api_component<mi::neuraylib::IDistributed_cache>());
check_success( cluster_distributed_cache.is_valid_interface());
cluster_distributed_cache->register_receiver( My_serializable_1::IID(), &object_receiver_1);
cluster_distributed_cache->register_receiver( My_serializable_2::IID(), &object_receiver_2);
}
// Wait for some time
sleep_seconds( 30);
// Unregister receiver instances
distributed_cache->unregister_receiver( My_serializable_1::IID());
distributed_cache->unregister_receiver( My_serializable_2::IID());
for( mi::Size i = 0; i < clusters.size(); ++i) {
cluster->get_api_component<mi::neuraylib::IDistributed_cache>());
check_success( cluster_distributed_cache.is_valid_interface());
cluster_distributed_cache->unregister_receiver( My_serializable_1::IID());
cluster_distributed_cache->unregister_receiver( My_serializable_2::IID());
}
}
int main( int argc, char* argv[])
{
// Check command line parameters
if( argc < 3 || ((strcmp( argv[1], "client") != 0) && (strcmp( argv[1], "worker") != 0))) {
std::cerr << "Usage: example_networking <mode> <multicast_address0> \\\n"
<< " [<multicast_address1> [...]]" << std::endl;
keep_console_open();
return EXIT_FAILURE;
}
// Indicates whether this hosts acts as client or worker.
bool is_worker = strcmp( argv[1], "worker") == 0;
// Access the DiCE library
mi::base::Handle<mi::neuraylib::INeuray> neuray( load_and_get_ineuray());
check_success( neuray.is_valid_interface());
// Configure the DiCE library
configuration( neuray.get());
// Create callback handler
mi::base::Handle<mi::neuraylib::IHost_callback> host_callback( new Host_callback());
// Configure the main cluster
cluster_configuration( network_configuration.get(), host_callback.get(), argv[2]);
g_logger = logging_configuration->get_forwarding_logger();
// Increase logging priority on the client such at least it does not send its own log messages
// to some other node.
if( !is_worker)
logging_configuration->set_log_priority( 1);
logging_configuration = 0;
// Start the DiCE library
mi::Sint32 result = neuray->start();
check_start_success( result);
// For each optional multicast address configure and join an additional cluster.
std::vector<mi::base::Handle<mi::neuraylib::ICluster> > clusters;
for( mi::Size i = 0; i < static_cast<mi::Size>( argc-3); ++i) {
mi::base::Handle<mi::neuraylib::ICluster> cluster( cluster_factory->create_cluster());
cluster->get_api_component<mi::neuraylib::INetwork_configuration>());
cluster_configuration( cluster_network_configuration.get(), host_callback.get(), argv[i+3]);
check_success( cluster->join() == 0);
clusters.push_back( cluster);
}
// Run the tests.
is_worker ? run_worker( neuray.get(), clusters) : run_client( neuray.get(), clusters);
// For each optional multicast address leave the additional cluster and unregister the host
// callback.
for( mi::Size i = 0; i < static_cast<mi::Size>( argc-3); ++i) {
cluster->get_api_component<mi::neuraylib::INetwork_configuration>());
check_success( cluster->leave() == 0);
cluster_network_configuration->unregister_host_callback( host_callback.get());
}
clusters.clear();
// Shut down the DiCE library
check_success( neuray->shutdown() == 0);
g_logger = 0;
// Unregister the callback handler for the main cluster
network_configuration->unregister_host_callback( host_callback.get());
network_configuration = 0;
neuray = 0;
// Unload the DiCE library
check_success( unload());
keep_console_open();
return EXIT_SUCCESS;
}
Mixin class template for deriving interface implementations.
Definition: interface_implement.h:41
This API component is used to create additional clusters.
Definition: icluster_factory.h:33
This interface is used to interact with the distributed database.
Definition: idatabase.h:293
virtual IScope * get_global_scope() const =0
Returns the global scope which is the root of a tree of scopes.
This interface allows configuration of DiCE.
Definition: dice.h:82
A transaction provides a consistent view on the database.
Definition: dice.h:272
virtual Tag_struct name_to_tag(const char *name)=0
Returns the tag associated with a name in the database.
virtual base::IInterface * edit(Tag_struct tag)=0
Retrieves an element from the database and returns it ready for editing.
virtual const base::IInterface * access(Tag_struct tag)=0
Retrieves an element from the database.
virtual Tag_struct store(IElement *element, Tag_struct tag=NULL_TAG, const char *name=0, Privacy_level privacy_level=LOCAL_SCOPE)=0
Stores a new database element in the database.
virtual void advise(Tag_struct tag)=0
Advises the database that a given tag is required soon.
virtual Sint32 commit()=0
Commits the transaction.
virtual Sint32 execute_fragmented(IFragmented_job *job, Size count)=0
Executes a fragmented job synchronously.
The distributed cache service can be used to create temporary objects in a cluster.
Definition: idistcache.h:72
virtual Sint32 store_object(ISerializable *serializable, bool cluster_only=true)=0
Stores an object in the distributed cache.
virtual void unregister_receiver(base::Uuid class_id)=0
Unregisters a receiver with the distributed cache.
virtual void register_receiver(base::Uuid class_id, IObject_receiver *receiver)=0
Registers a receiver with the distributed cache.
virtual void remove_object(ISerializable *serializable)=0
Removes an object from the cache.
Abstract interface to report cluster status changes.
Definition: ihost_callback.h:35
This interface contains information about a local or remote host.
Definition: ihost_properties.h:27
virtual const IString * get_property(const char *key) const =0
Generic access to properties.
This interface is used for configuring the logging for the DiCE library.
Definition: ilogging_configuration.h:68
This interface is used to query and change the networking configuration.
Definition: inetwork_configuration.h:33
@ MODE_UDP
Networking is switched to UDP mode with multicast.
Definition: inetwork_configuration.h:53
virtual Sint32 set_multicast_address(const char *address)=0
Sets the base multicast address to be used for networking.
virtual Sint32 unregister_host_callback(IHost_callback *callback)=0
Unregisters a callback for cluster changes.
virtual Sint32 register_host_callback(IHost_callback *callback)=0
Registers a callback for cluster changes.
virtual Sint32 set_mode(Mode mode)=0
Sets the networking mode.
This is an object representing the DiCE library.
Definition: ineuray.h:44
virtual Sint32 shutdown(bool blocking=true)=0
Shuts down the library.
virtual base::IInterface * get_api_component(const base::Uuid &uuid) const =0
Returns an API component from the DiCE API.
virtual Sint32 start(bool blocking=true)=0
Starts the operation of the DiCE library.
A tag represents a unique identifier for database elements in the database.
Definition: iserializer.h:54
DiCE API.
void printf(Message_severity level, const char *module_category, const char *message,...) __attribute__((format(printf
Emits a message to the application's log.
virtual void message(Message_severity level, const char *module_category, const char *message)
Emits a message to the application's log.
Definition: ilogger.h:215
@ MESSAGE_SEVERITY_INFO
This is a normal operational message.
Definition: enums.h:39
unsigned long long Uint64
64-bit unsigned integer.
Definition: types.h:62
unsigned int Uint32
32-bit unsigned integer.
Definition: types.h:49
Uint64 Size
Unsigned integral type that is large enough to hold the size of all types.
Definition: types.h:112
signed int Sint32
32-bit signed integer.
Definition: types.h:46
[Previous] [Next] [Up]