This example demonstrates the implementation of fragmented jobs. It is based on the previous example and adds distribution of fragments to other hosts in the cluster.
#ifndef EXAMPLE_FRAGMENTED_JOB_H
#define EXAMPLE_FRAGMENTED_JOB_H
#include <iostream>
class My_fragmented_job : public
mi::neuraylib::Fragmented_job<0x2f0d0ac7,0x53d6,0x4bc3,0x9c,0x21,0x85,0x4a,0xca,0xa3,0x97,0x7f>
{
public:
My_fragmented_job()
{
m_results[s] = 0;
}
{
for(
mi::Uint64 s = index *
FRAGMENT_SIZE; s < (index+1) *
FRAGMENT_SIZE; ++s)
result += s;
return result;
}
void execute_fragment(
{
"Executing fragment %llu of %llu on client.", index, count);
m_results[index] = do_execution( index, count);
}
{
"Executing fragment %llu of %llu on a worker.", index, count);
serializer->
write( &result);
}
{
"Receiving fragment %llu of %llu from a worker.", index, count);
deserializer->
read( &result);
m_results[index] = result;
}
{
}
{
"Executing fragment %llu of %llu on a worker (RDMA variant).", index, count);
if( !buffer)
return 0;
memcpy( buffer->
get_data(), &result,
sizeof( result));
return buffer;
}
{
"Receiving fragment %llu of %llu from a worker (RDMA variant).", index, count);
memcpy( &result, buffer->
get_data(),
sizeof( result));
m_results[index] = result;
}
{
for(
mi::Size s = 0; s <
N_FRAGMENTS; ++s)
result += m_results[s];
return result;
}
{
}
private:
};
#endif
This mixin class can be used to implement the mi::neuraylib::IFragmented_job interface.
Definition: dice.h:1688
virtual void receive_remote_result(IDeserializer *deserializer, IDice_transaction *transaction, Size index, Size count)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1738
virtual IRDMA_buffer * get_rdma_result_buffer(IRDMA_context *rdma_context, Size index)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1752
virtual IRDMA_buffer * execute_fragment_remote_rdma(IDice_transaction *transaction, Size index, Size count, IRDMA_context *rdma_context, const IJob_execution_context *context)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1761
virtual void execute_fragment_remote(ISerializer *serializer, IDice_transaction *transaction, Size index, Size count, const IJob_execution_context *context)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1722
virtual void receive_remote_result_rdma(IRDMA_buffer *buffer, IDice_transaction *transaction, Size index, Size count)
Empty body. Not used since get_scheduling_mode() requests local execution.
Definition: dice.h:1778
virtual IFragmented_job::Scheduling_mode get_scheduling_mode() const
Returns mi::neuraylib::IFragmented_job::LOCAL.
Definition: dice.h:1691
Source for deserializing objects from byte streams.
Definition: ideserializer.h:35
virtual bool read(bool *value, Size count=1)=0
Reads values of type bool from the deserializer.
A transaction provides a consistent view on the database.
Definition: dice.h:272
Scheduling_mode
Constants for possible scheduling modes.
Definition: dice.h:1364
@ CLUSTER
The fragments will be spread across all hosts in the cluster.
Definition: dice.h:1375
Provides information about the context in which a job is executed.
Definition: iserializer.h:291
An RDMA buffer represents a piece of pinned memory which can be used to transfer data over RDMA.
Definition: irdma_context.h:52
virtual const Uint8 * get_data() const =0
Returns a const pointer to the data in this buffer.
The RDMA context works as a cache for RDMA buffers.
Definition: irdma_context.h:121
virtual IRDMA_buffer * get_write_memory(Size size, Sint32 gpu_id=-1)=0
Allocates an RDMA buffer which can be used to write locally to and to read from on a remote host.
virtual IRDMA_buffer * get_read_memory(Size size, Sint32 gpu_id=-1)=0
Allocates an RDMA buffer which can be used to read locally from a buffer that was written on a remote...
Target for serializing objects to byte streams.
Definition: iserializer.h:171
virtual bool write(const bool *value, Size count=1)=0
Writes values of type bool to the serializer.
void printf(Message_severity level, const char *module_category, const char *message,...) __attribute__((format(printf
Emits a message to the application's log.
@ MESSAGE_SEVERITY_INFO
This is a normal operational message.
Definition: enums.h:39
unsigned long long Uint64
64-bit unsigned integer.
Definition: types.h:62
Uint64 Size
Unsigned integral type that is large enough to hold the size of all types.
Definition: types.h:112
#include <iostream>
#include "example_shared.h"
#include "example_fragmented_job.h"
{
My_fragmented_job fragmented_job;
mi::Size n = My_fragmented_job:
:N_FRAGMENTS;
check_success( fragmented_job.sum_results() == expected_result);
}
void configuration(
const char* multicast_address,
const char* cluster_interface,
bool use_rdma)
{
check_success( network_configuration->set_mode(
check_success( network_configuration->set_multicast_address( multicast_address) == 0);
if( strcmp( cluster_interface, ".") != 0)
check_success( network_configuration->set_cluster_interface( cluster_interface) == 0);
check_success( network_configuration->set_use_rdma( use_rdma) == 0);
check_success( dice_configuration.is_valid_interface());
check_success( dice_configuration->register_serializable_class<My_fragmented_job>());
}
{
check_success( database.is_valid_interface());
check_success( transaction.is_valid_interface());
test_my_fragmented_job( transaction.get());
}
void run_worker()
{
sleep_seconds( 30);
}
int main( int argc, char* argv[])
{
if( argc < 3 || argc > 5
|| ((strcmp( argv[1], "client") != 0) && (strcmp( argv[1], "worker") != 0))) {
std::cerr << "Usage: example_fragmented_job_networking <mode> <multicast_address> \\\n"
<< " [<cluster_interface> [<use_rdma>]]" << std::endl;
keep_console_open();
return EXIT_FAILURE;
}
bool is_worker = strcmp( argv[1], "worker") == 0;
const char* multicast_address = argv[2];
const char* cluster_interface = argc > 3 ? argv[3] : ".";
bool use_rdma = argc > 4;
check_success( neuray.is_valid_interface());
configuration( neuray.get(), multicast_address, cluster_interface, use_rdma);
g_logger = logging_configuration->get_forwarding_logger();
logging_configuration = 0;
check_start_success( result);
is_worker ? run_worker() : run_client( neuray.get());
check_success( neuray->
shutdown() == 0);
g_logger = 0;
neuray = 0;
check_success( unload());
keep_console_open();
return EXIT_SUCCESS;
}
This interface is used to interact with the distributed database.
Definition: idatabase.h:293
This interface allows configuration of DiCE.
Definition: dice.h:82
virtual Sint32 commit()=0
Commits the transaction.
virtual Sint32 execute_fragmented(IFragmented_job *job, Size count)=0
Executes a fragmented job synchronously.
This interface is used for configuring the logging for the DiCE library.
Definition: ilogging_configuration.h:68
This interface is used to query and change the networking configuration.
Definition: inetwork_configuration.h:33
@ MODE_UDP
Networking is switched to UDP mode with multicast.
Definition: inetwork_configuration.h:53
This is an object representing the DiCE library.
Definition: ineuray.h:44
virtual Sint32 shutdown(bool blocking=true)=0
Shuts down the library.
virtual base::IInterface * get_api_component(const base::Uuid &uuid) const =0
Returns an API component from the DiCE API.
virtual Sint32 start(bool blocking=true)=0
Starts the operation of the DiCE library.
virtual void message(Message_severity level, const char *module_category, const char *message)
Emits a message to the application's log.
Definition: ilogger.h:215
signed int Sint32
32-bit signed integer.
Definition: types.h:46