DiCE API nvidia_logo_transpbg.gif Up
Example for Fragmented Jobs (distributed)
[Previous] [Next] [Up]

This example demonstrates the implementation of fragmented jobs. It is based on the previous example and adds distribution of fragments to other hosts in the cluster.

New Topics

  • Implementation of fragmented jobs
  • Execution of fragmented jobs

Detailed Description

Implementation of fragmented jobs


As in the previous example the fragmented job class computes the sum from 0 to some fixed large number. This computation is splitted into fragments such that each fragments computes the partial sum of a range of numbers. Fragments are executed concurrently in different threads and/or on different hosts in the cluster. Note that the number of fragments as well as the size of each fragment are constants defined in the class itself. Of course, this is not a necessity and was done purely for simplicity.

The actual work of a fragmented job happens in the do_execution() method. The method execute_fragment() provides the necessary context for local execution of fragments. The method sum_results() combines the results of all fragments. These methods methods are identical to the previous example.

To support remote execution of fragments you need to implement the methods execute_fragment_remote() and receive_remote_result() in addition to execute_fragment(). These two methods provide the necessary context for remote execution of fragments. To actually enable remote execution of fragments is_local_only() is overridden to return false. There are several other methods for which the mi::neuraylib::Fragmented_job mixin provides default implementations.

Execution of fragmented jobs


The example demonstrates a simple usage scenario for the fragmented jobs: hosts are either clients or workers. The client executes a fragmented job and processes some of its fragments. The workers are passive and only process fragments of the fragmented job.

Note that the distinction between client and worker in this examples has been made for the sake of simplicity. Any host can act as client and worker simultaneously.

If you want to execute fragmented jobs in the cluster you need to register your classes with the DiCE API as explained in this example. (For fragmented jobs this step is only needed if is_local_only() returns false.)

A fragmented job is executed by passing it to the mi::neuraylib::IDice_transaction::execute_fragmented() method.

Example Source

Source Code Location: examples/example_fragmented_job.h

/******************************************************************************
* Copyright 2024 NVIDIA Corporation. All rights reserved.
*****************************************************************************/
// examples/example_fragmented_job.h
//
// An user-defined fragmented job shared by several examples.
#ifndef EXAMPLE_FRAGMENTED_JOB_H
#define EXAMPLE_FRAGMENTED_JOB_H
#include <iostream>
#include <mi/dice.h>
// A simple fragmented job class that computes the sum from 0 to N_FRAGMENTS * FRAGMENT_SIZE - 1.
// Each fragment computes the partial sum of FRAGMENT_SIZE consecutive numbers. To avoid the need
// for synchronization primitives, the final sum is built at the end after all threads finished.
class My_fragmented_job : public
mi::neuraylib::Fragmented_job<0x2f0d0ac7,0x53d6,0x4bc3,0x9c,0x21,0x85,0x4a,0xca,0xa3,0x97,0x7f>
{
public:
static const mi::Size N_FRAGMENTS = 32;
static const mi::Uint64 FRAGMENT_SIZE = 10000000;
My_fragmented_job()
{
for( mi::Uint64 s = 0; s < N_FRAGMENTS; s++)
m_results[s] = 0;
}
// The fragmented identified by "index" computes the sum from index * FRAGMENT_SIZE to
// (index+1) * FRAGMENT_SIZE -1.
mi::Uint64 do_execution( mi::Size index, mi::Size /*count*/)
{
mi::Uint64 result = 0;
for( mi::Uint64 s = index * FRAGMENT_SIZE; s < (index+1) * FRAGMENT_SIZE; ++s)
result += s;
return result;
}
// If the fragment is executed locally the result is simply stored in the result array.
void execute_fragment(
mi::Size index,
mi::Size count,
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Executing fragment %llu of %llu on client.", index, count);
m_results[index] = do_execution( index, count);
}
// If the fragment is executed remotely the result is written to the serializer.
mi::Size index,
mi::Size count,
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Executing fragment %llu of %llu on a worker.", index, count);
mi::Uint64 result = do_execution( index, count);
serializer->write( &result);
}
// On the local host, the result from the fragment is read from the deserializer
// and stored in the result array.
mi::Size index,
mi::Size count)
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Receiving fragment %llu of %llu from a worker.", index, count);
mi::Uint64 result;
deserializer->read( &result);
m_results[index] = result;
}
mi::neuraylib::IRDMA_context* rdma_context, mi::Size /*index*/)
{
return rdma_context->get_read_memory( sizeof( mi::Uint64));
}
mi::Size index,
mi::Size count,
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Executing fragment %llu of %llu on a worker (RDMA variant).", index, count);
mi::Uint64 result = do_execution( index, count);
mi::neuraylib::IRDMA_buffer* buffer = rdma_context->get_write_memory( sizeof( result));
if( !buffer)
return 0;
memcpy( buffer->get_data(), &result, sizeof( result));
return buffer;
}
mi::Size index,
mi::Size count)
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Receiving fragment %llu of %llu from a worker (RDMA variant).", index, count);
mi::Uint64 result;
memcpy( &result, buffer->get_data(), sizeof( result));
m_results[index] = result;
}
// Returns the sum of the results of all fragments.
mi::Uint64 sum_results() const
{
mi::Uint64 result = 0;
for( mi::Size s = 0; s < N_FRAGMENTS; ++s)
result += m_results[s];
return result;
}
// Since we want to distribute the fragmented jobs also to other hosts in the cluster we need
// to override the default implementation (which returns LOCAL).
{
}
private:
// The result array has one slot for the result of each fragment.
mi::Uint64 m_results[N_FRAGMENTS];
};
#endif // EXAMPLE_FRAGMENTED_JOB_H
This mixin class can be used to implement the mi::neuraylib::IFragmented_job interface.
Definition: dice.h:1688
virtual void receive_remote_result(IDeserializer *deserializer, IDice_transaction *transaction, Size index, Size count)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1738
virtual IRDMA_buffer * get_rdma_result_buffer(IRDMA_context *rdma_context, Size index)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1752
virtual IRDMA_buffer * execute_fragment_remote_rdma(IDice_transaction *transaction, Size index, Size count, IRDMA_context *rdma_context, const IJob_execution_context *context)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1761
virtual void execute_fragment_remote(ISerializer *serializer, IDice_transaction *transaction, Size index, Size count, const IJob_execution_context *context)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1722
virtual void receive_remote_result_rdma(IRDMA_buffer *buffer, IDice_transaction *transaction, Size index, Size count)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1778
virtual IFragmented_job::Scheduling_mode get_scheduling_mode() const
Returns mi::neuraylib::IFragmented_job::LOCAL.
Definition: dice.h:1691
Source for deserializing objects from byte streams.
Definition: ideserializer.h:35
virtual bool read(bool *value, Size count=1)=0
Reads values of type bool from the deserializer.
A transaction provides a consistent view on the database.
Definition: dice.h:272
Scheduling_mode
Constants for possible scheduling modes.
Definition: dice.h:1364
@ CLUSTER
The fragments will be spread across all hosts in the cluster.
Definition: dice.h:1375
Provides information about the context in which a job is executed.
Definition: iserializer.h:291
An RDMA buffer represents a piece of pinned memory which can be used to transfer data over RDMA.
Definition: irdma_context.h:52
virtual const Uint8 * get_data() const =0
Returns a const pointer to the data in this buffer.
The RDMA context works as a cache for RDMA buffers.
Definition: irdma_context.h:121
virtual IRDMA_buffer * get_write_memory(Size size, Sint32 gpu_id=-1)=0
Allocates an RDMA buffer which can be used to write locally to and to read from on a remote host.
virtual IRDMA_buffer * get_read_memory(Size size, Sint32 gpu_id=-1)=0
Allocates an RDMA buffer which can be used to read locally from a buffer that was written on a remote...
Target for serializing objects to byte streams.
Definition: iserializer.h:171
virtual bool write(const bool *value, Size count=1)=0
Writes values of type bool to the serializer.
DiCE API.
void printf(Message_severity level, const char *module_category, const char *message,...) __attribute__((format(printf
Emits a message to the application's log.
@ MESSAGE_SEVERITY_INFO
This is a normal operational message.
Definition: enums.h:39
unsigned long long Uint64
64-bit unsigned integer.
Definition: types.h:62
Uint64 Size
Unsigned integral type that is large enough to hold the size of all types.
Definition: types.h:112

Source Code Location: examples/example_fragmented_job_networking.cpp

/******************************************************************************
* Copyright 2023 NVIDIA Corporation. All rights reserved.
*****************************************************************************/
// examples/example_fragmented_job_networking.cpp
//
// The example expects the following command line arguments:
//
// example_fragmented_job_networking <mode> <multicast_address> [<cluster_interface> [<use_rdma>]]
//
// mode either "client" or "worker"
// multicast_address UDP multicast address
// cluster_interface IPv4 or IPv6 address, or "."
// use_rdma arbitrary string to enable RDMA if available
#include <iostream>
#include <mi/dice.h>
// Include code shared by all examples.
#include "example_shared.h"
// Include definition of My_fragmented_job.
#include "example_fragmented_job.h"
// Test the My_fragmented_job class.
void test_my_fragmented_job( mi::neuraylib::IDice_transaction* transaction)
{
// Execute an instance of the job in n fragments which are delegated to other threads and hosts.
My_fragmented_job fragmented_job;
mi::Size n = My_fragmented_job::N_FRAGMENTS;
transaction->execute_fragmented( &fragmented_job, n);
// Compare the result with the expected result.
mi::Uint64 s = My_fragmented_job::FRAGMENT_SIZE;
mi::Uint64 expected_result = n*s/2 * (n*s-1);
check_success( fragmented_job.sum_results() == expected_result);
}
// Set up network and register the serializable classes.
void configuration(
const char* multicast_address,
const char* cluster_interface,
bool use_rdma)
{
// Set UDP networking mode and the given multicast address.
check_success( network_configuration->set_mode(
check_success( network_configuration->set_multicast_address( multicast_address) == 0);
if( strcmp( cluster_interface, ".") != 0)
check_success( network_configuration->set_cluster_interface( cluster_interface) == 0);
check_success( network_configuration->set_use_rdma( use_rdma) == 0);
// Register the class My_fragmented_job with DiCE.
check_success( dice_configuration.is_valid_interface());
check_success( dice_configuration->register_serializable_class<My_fragmented_job>());
}
// Obtain a DiCE transaction and run the test for the My_fragmented_job class.
void run_client( mi::neuraylib::INeuray* neuray)
{
g_logger->message( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN", "This host acts as client");
// Get the database, the global scope of the database and create a transaction in the global
// scope.
check_success( database.is_valid_interface());
mi::base::Handle<mi::neuraylib::IScope> scope( database->get_global_scope());
scope->create_transaction<mi::neuraylib::IDice_transaction>());
check_success( transaction.is_valid_interface());
// Test the My_fragmented_job class.
test_my_fragmented_job( transaction.get());
transaction->commit();
}
void run_worker()
{
// As worker, just wait some time for jobs to execute
g_logger->message( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN", "This host acts as worker");
sleep_seconds( 30);
}
int main( int argc, char* argv[])
{
// Check command line parameters
if( argc < 3 || argc > 5
|| ((strcmp( argv[1], "client") != 0) && (strcmp( argv[1], "worker") != 0))) {
std::cerr << "Usage: example_fragmented_job_networking <mode> <multicast_address> \\\n"
<< " [<cluster_interface> [<use_rdma>]]" << std::endl;
keep_console_open();
return EXIT_FAILURE;
}
bool is_worker = strcmp( argv[1], "worker") == 0;
const char* multicast_address = argv[2];
const char* cluster_interface = argc > 3 ? argv[3] : ".";
bool use_rdma = argc > 4;
// Access the DiCE library
mi::base::Handle<mi::neuraylib::INeuray> neuray( load_and_get_ineuray());
check_success( neuray.is_valid_interface());
// Configure the DiCE library
configuration( neuray.get(), multicast_address, cluster_interface, use_rdma);
g_logger = logging_configuration->get_forwarding_logger();
logging_configuration = 0;
// Start the DiCE library
mi::Sint32 result = neuray->start();
check_start_success( result);
// Run the tests for My_fragmented_job
is_worker ? run_worker() : run_client( neuray.get());
// Shut down the DiCE library
check_success( neuray->shutdown() == 0);
g_logger = 0;
neuray = 0;
// Unload the DiCE library
check_success( unload());
keep_console_open();
return EXIT_SUCCESS;
}
This interface is used to interact with the distributed database.
Definition: idatabase.h:293
This interface allows configuration of DiCE.
Definition: dice.h:82
virtual Sint32 commit()=0
Commits the transaction.
virtual Sint32 execute_fragmented(IFragmented_job *job, Size count)=0
Executes a fragmented job synchronously.
This interface is used for configuring the logging for the DiCE library.
Definition: ilogging_configuration.h:68
This interface is used to query and change the networking configuration.
Definition: inetwork_configuration.h:33
@ MODE_UDP
Networking is switched to UDP mode with multicast.
Definition: inetwork_configuration.h:53
This is an object representing the DiCE library.
Definition: ineuray.h:44
virtual Sint32 shutdown(bool blocking=true)=0
Shuts down the library.
virtual base::IInterface * get_api_component(const base::Uuid &uuid) const =0
Returns an API component from the DiCE API.
virtual Sint32 start(bool blocking=true)=0
Starts the operation of the DiCE library.
virtual void message(Message_severity level, const char *module_category, const char *message)
Emits a message to the application's log.
Definition: ilogger.h:215
signed int Sint32
32-bit signed integer.
Definition: types.h:46
[Previous] [Next] [Up]