DiCE API nvidia_logo_transpbg.gif Up
Example for Fragmented Jobs (locally)
[Previous] [Next] [Up]

This example demonstrates the implementation of fragmented jobs. This example focuses on a simple setting where fragments are only executed in different threads on the same host. The next example extends this example by distributing the execution of fragments to other hosts in the cluster.

New Topics

  • Implementation of fragmented jobs
  • Execution of fragmented jobs

Detailed Description

Implementation of fragmented jobs


In this example the fragmented job class computes the sum from 0 to some fixed large number. This computation is splitted into fragments such that each fragments computes the partial sum of a range of numbers. Fragments are executed concurrently in different threads. Note that the number of fragments as well as the size of each fragment are constants defined in the class itself. Of course, this is not a necessity and was done purely for simplicity.

The method execute_fragment() provides the necessary context for local execution of fragments. For reasons that will become clear with the next example the actual work is done in the do_execution() method. The method sum_results() combines the results of all fragments. There are several other methods for which the mi::neuraylib::Fragmented_job mixin provides default implementations.

Execution of fragmented jobs


A fragmented job is executed by passing it to the mi::neuraylib::IDice_transaction::execute_fragmented() method.

Example Source

Source Code Location: examples/example_fragmented_job.h

/******************************************************************************
* Copyright 2024 NVIDIA Corporation. All rights reserved.
*****************************************************************************/
// examples/example_fragmented_job.h
//
// An user-defined fragmented job shared by several examples.
#ifndef EXAMPLE_FRAGMENTED_JOB_H
#define EXAMPLE_FRAGMENTED_JOB_H
#include <iostream>
#include <mi/dice.h>
// A simple fragmented job class that computes the sum from 0 to N_FRAGMENTS * FRAGMENT_SIZE - 1.
// Each fragment computes the partial sum of FRAGMENT_SIZE consecutive numbers. To avoid the need
// for synchronization primitives, the final sum is built at the end after all threads finished.
class My_fragmented_job : public
mi::neuraylib::Fragmented_job<0x2f0d0ac7,0x53d6,0x4bc3,0x9c,0x21,0x85,0x4a,0xca,0xa3,0x97,0x7f>
{
public:
static const mi::Size N_FRAGMENTS = 32;
static const mi::Uint64 FRAGMENT_SIZE = 10000000;
My_fragmented_job()
{
for( mi::Uint64 s = 0; s < N_FRAGMENTS; s++)
m_results[s] = 0;
}
// The fragmented identified by "index" computes the sum from index * FRAGMENT_SIZE to
// (index+1) * FRAGMENT_SIZE -1.
mi::Uint64 do_execution( mi::Size index, mi::Size /*count*/)
{
mi::Uint64 result = 0;
for( mi::Uint64 s = index * FRAGMENT_SIZE; s < (index+1) * FRAGMENT_SIZE; ++s)
result += s;
return result;
}
// If the fragment is executed locally the result is simply stored in the result array.
void execute_fragment(
mi::Size index,
mi::Size count,
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Executing fragment %llu of %llu on client.", index, count);
m_results[index] = do_execution( index, count);
}
// If the fragment is executed remotely the result is written to the serializer.
mi::Size index,
mi::Size count,
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Executing fragment %llu of %llu on a worker.", index, count);
mi::Uint64 result = do_execution( index, count);
serializer->write( &result);
}
// On the local host, the result from the fragment is read from the deserializer
// and stored in the result array.
mi::Size index,
mi::Size count)
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Receiving fragment %llu of %llu from a worker.", index, count);
mi::Uint64 result;
deserializer->read( &result);
m_results[index] = result;
}
mi::neuraylib::IRDMA_context* rdma_context, mi::Size /*index*/)
{
return rdma_context->get_read_memory( sizeof( mi::Uint64));
}
mi::Size index,
mi::Size count,
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Executing fragment %llu of %llu on a worker (RDMA variant).", index, count);
mi::Uint64 result = do_execution( index, count);
mi::neuraylib::IRDMA_buffer* buffer = rdma_context->get_write_memory( sizeof( result));
if( !buffer)
return 0;
memcpy( buffer->get_data(), &result, sizeof( result));
return buffer;
}
mi::Size index,
mi::Size count)
{
g_logger->printf( mi::base::MESSAGE_SEVERITY_INFO, "DICE:MAIN",
"Receiving fragment %llu of %llu from a worker (RDMA variant).", index, count);
mi::Uint64 result;
memcpy( &result, buffer->get_data(), sizeof( result));
m_results[index] = result;
}
// Returns the sum of the results of all fragments.
mi::Uint64 sum_results() const
{
mi::Uint64 result = 0;
for( mi::Size s = 0; s < N_FRAGMENTS; ++s)
result += m_results[s];
return result;
}
// Since we want to distribute the fragmented jobs also to other hosts in the cluster we need
// to override the default implementation (which returns LOCAL).
{
}
private:
// The result array has one slot for the result of each fragment.
mi::Uint64 m_results[N_FRAGMENTS];
};
#endif // EXAMPLE_FRAGMENTED_JOB_H
This mixin class can be used to implement the mi::neuraylib::IFragmented_job interface.
Definition: dice.h:1688
virtual void receive_remote_result(IDeserializer *deserializer, IDice_transaction *transaction, Size index, Size count)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1738
virtual IRDMA_buffer * get_rdma_result_buffer(IRDMA_context *rdma_context, Size index)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1752
virtual IRDMA_buffer * execute_fragment_remote_rdma(IDice_transaction *transaction, Size index, Size count, IRDMA_context *rdma_context, const IJob_execution_context *context)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1761
virtual void execute_fragment_remote(ISerializer *serializer, IDice_transaction *transaction, Size index, Size count, const IJob_execution_context *context)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1722
virtual void receive_remote_result_rdma(IRDMA_buffer *buffer, IDice_transaction *transaction, Size index, Size count)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1778
virtual IFragmented_job::Scheduling_mode get_scheduling_mode() const
Returns mi::neuraylib::IFragmented_job::LOCAL.
Definition: dice.h:1691
Source for deserializing objects from byte streams.
Definition: ideserializer.h:35
virtual bool read(bool *value, Size count=1)=0
Reads values of type bool from the deserializer.
A transaction provides a consistent view on the database.
Definition: dice.h:272
Scheduling_mode
Constants for possible scheduling modes.
Definition: dice.h:1364
@ CLUSTER
The fragments will be spread across all hosts in the cluster.
Definition: dice.h:1375
Provides information about the context in which a job is executed.
Definition: iserializer.h:291
An RDMA buffer represents a piece of pinned memory which can be used to transfer data over RDMA.
Definition: irdma_context.h:52
virtual const Uint8 * get_data() const =0
Returns a const pointer to the data in this buffer.
The RDMA context works as a cache for RDMA buffers.
Definition: irdma_context.h:121
virtual IRDMA_buffer * get_write_memory(Size size, Sint32 gpu_id=-1)=0
Allocates an RDMA buffer which can be used to write locally to and to read from on a remote host.
virtual IRDMA_buffer * get_read_memory(Size size, Sint32 gpu_id=-1)=0
Allocates an RDMA buffer which can be used to read locally from a buffer that was written on a remote...
Target for serializing objects to byte streams.
Definition: iserializer.h:171
virtual bool write(const bool *value, Size count=1)=0
Writes values of type bool to the serializer.
DiCE API.
void printf(Message_severity level, const char *module_category, const char *message,...) __attribute__((format(printf
Emits a message to the application's log.
@ MESSAGE_SEVERITY_INFO
This is a normal operational message.
Definition: enums.h:39
unsigned long long Uint64
64-bit unsigned integer.
Definition: types.h:62
Uint64 Size
Unsigned integral type that is large enough to hold the size of all types.
Definition: types.h:112

Source Code Location: examples/example_fragmented_job.cpp

/******************************************************************************
* Copyright 2023 NVIDIA Corporation. All rights reserved.
*****************************************************************************/
// examples/example_fragmented_job.cpp
#include <iostream>
#include <mi/dice.h>
// Include code shared by all examples.
#include "example_shared.h"
// Include definition of My_fragmented_job.
#include "example_fragmented_job.h"
// Test the My_fragmented_job class.
void test_my_fragmented_job( mi::neuraylib::IDice_transaction* transaction)
{
// Execute an instance of the job in n fragments which are delegated to other threads.
My_fragmented_job fragmented_job;
mi::Size n = My_fragmented_job::N_FRAGMENTS;
transaction->execute_fragmented( &fragmented_job, n);
// Compare the result with the expected result.
mi::Uint64 s = My_fragmented_job::FRAGMENT_SIZE;
mi::Uint64 expected_result = n*s/2 * (n*s-1);
check_success( fragmented_job.sum_results() == expected_result);
}
// Obtain a DiCE transaction and run the test for the My_fragmented_job class.
void run( mi::neuraylib::INeuray* neuray)
{
// Get the database, the global scope of the database and create a transaction in the global
// scope.
check_success( database.is_valid_interface());
mi::base::Handle<mi::neuraylib::IScope> scope( database->get_global_scope());
scope->create_transaction<mi::neuraylib::IDice_transaction>());
check_success( transaction.is_valid_interface());
// Test the My_fragmented_job class.
test_my_fragmented_job( transaction.get());
transaction->commit();
}
int main( int /*argc*/, char* /*argv*/[])
{
// Access the DiCE library
mi::base::Handle<mi::neuraylib::INeuray> neuray( load_and_get_ineuray());
check_success( neuray.is_valid_interface());
g_logger = logging_configuration->get_forwarding_logger();
logging_configuration = 0;
// Start the DiCE library
mi::Sint32 result = neuray->start();
check_start_success( result);
// Run the tests for My_fragmented_job
run( neuray.get());
// Shut down the DiCE library
check_success( neuray->shutdown() == 0);
g_logger = 0;
neuray = 0;
// Unload the DiCE library
check_success( unload());
keep_console_open();
return EXIT_SUCCESS;
}
This interface is used to interact with the distributed database.
Definition: idatabase.h:293
virtual Sint32 commit()=0
Commits the transaction.
virtual Sint32 execute_fragmented(IFragmented_job *job, Size count)=0
Executes a fragmented job synchronously.
This interface is used for configuring the logging for the DiCE library.
Definition: ilogging_configuration.h:68
This is an object representing the DiCE library.
Definition: ineuray.h:44
virtual Sint32 shutdown(bool blocking=true)=0
Shuts down the library.
virtual base::IInterface * get_api_component(const base::Uuid &uuid) const =0
Returns an API component from the DiCE API.
virtual Sint32 start(bool blocking=true)=0
Starts the operation of the DiCE library.
signed int Sint32
32-bit signed integer.
Definition: types.h:46
[Previous] [Next] [Up]