This example demonstrates the use of additional clusters. It is heavily based on the five preceding examples and extends the concepts shown there to additional clusters.
#include <iostream>
#include <vector>
#include "example_shared.h"
#include "example_element_and_job.h"
#include "example_fragmented_job.h"
#include "example_distributed_cache.h"
{
public:
void connection_callback(
mi::Uint32 host_id,
bool flag)
{
if( flag) {
fprintf( stderr,
"The connection to the cluster was established. Own host id is %u.\n", host_id);
m_own_host_id = host_id;
}
else
fprintf( stderr, "The connection to the cluster was lost.\n");
}
void membership_callback(
mi::Uint32 host_id,
bool flag)
{
if( flag)
fprintf( stderr, "Host %u joined the cluster.\n", host_id);
else
fprintf( stderr, "Host %u left the cluster.\n", host_id);
}
{
fprintf( stderr, "Host %u communicated its properties:\n", host_id);
if( value.is_valid_interface())
fprintf( stderr, " application_name: %s\n", value->get_c_str());
}
{
fprintf( stderr, "The synchronizer is now host %u", host_id);
if( m_own_host_id == host_id)
fprintf( stderr, " (this host)");
fprintf( stderr, ".\n");
}
void database_status_callback( const char* status)
{
fprintf( stderr, "The database reports its status as \"%s\".\n", status);
}
private:
};
{
check_success( dice_configuration.is_valid_interface());
check_success( dice_configuration->register_serializable_class<My_element>());
check_success( dice_configuration->register_serializable_class<My_job>());
check_success( dice_configuration->register_serializable_class<My_fragmented_job>());
}
void cluster_configuration(
const char* multicast_address)
{
}
{
check_success( !element_tag);
check_success( element_tag);
check_success( element_tag2 == element_tag);
check_success( access1->get_data() == 42);
edit->set_data( 2);
edit = 0;
check_success( access1->get_data() == 42);
check_success( access2->get_data() == 2);
check_success( !job_tag);
check_success( job_tag);
check_success( job_tag2 == job_tag);
transaction->
advise( job_tag);
check_success( access3->get_data() == 4);
}
{
My_fragmented_job fragmented_job;
mi::Size n = My_fragmented_job:
:N_FRAGMENTS;
check_success( fragmented_job.sum_results() == expected_result);
}
{
IMy_serializable** objects = new IMy_serializable*[N];
IMy_serializable* object = ( (i+1)%2 )
? static_cast<IMy_serializable*>( new My_serializable_1)
: static_cast<IMy_serializable*>( new My_serializable_2);
object->set_data( i*i);
"Storing object with index %llu at address %p with data %llu.",
i, object, object->get_data());
objects[i] = object;
sleep_seconds( 1);
}
IMy_serializable* object = objects[i];
objects[i] = 0;
"Removing object with index %llu at address %p with data %llu.",
i, object, object->get_data());
object->release();
sleep_seconds( 1);
}
delete[] objects;
}
void run_tests(
{
check_success( transaction.is_valid_interface());
test_my_element_and_my_job( transaction.get());
test_my_fragmented_job( transaction.get());
test_distributed_cache( distributed_cache);
}
void run_client(
{
check_success( database.is_valid_interface());
check_success( distributed_cache.is_valid_interface());
run_tests( database.get(), distributed_cache.get());
for(
mi::Size i = 0; i < clusters.size(); ++i) {
"Running tests on additional cluster %llu.", i);
check_success( cluster_database.is_valid_interface());
check_success( cluster_distributed_cache.is_valid_interface());
run_tests( cluster_database.get(), cluster_distributed_cache.get());
}
}
void run_worker(
{
Object_receiver object_receiver_1( 1);
Object_receiver object_receiver_2( 2);
check_success( distributed_cache.is_valid_interface());
for(
mi::Size i = 0; i < clusters.size(); ++i) {
check_success( cluster_distributed_cache.is_valid_interface());
cluster_distributed_cache->register_receiver( My_serializable_1::IID(), &object_receiver_1);
cluster_distributed_cache->register_receiver( My_serializable_2::IID(), &object_receiver_2);
}
sleep_seconds( 30);
for(
mi::Size i = 0; i < clusters.size(); ++i) {
check_success( cluster_distributed_cache.is_valid_interface());
cluster_distributed_cache->unregister_receiver( My_serializable_1::IID());
cluster_distributed_cache->unregister_receiver( My_serializable_2::IID());
}
}
int main( int argc, char* argv[])
{
if( argc < 3 || ((strcmp( argv[1], "client") != 0) && (strcmp( argv[1], "worker") != 0))) {
std::cerr << "Usage: example_networking <mode> <multicast_address0> \\\n"
<< " [<multicast_address1> [...]]" << std::endl;
keep_console_open();
return EXIT_FAILURE;
}
bool is_worker = strcmp( argv[1], "worker") == 0;
check_success( neuray.is_valid_interface());
configuration( neuray.get());
cluster_configuration( network_configuration.get(), host_callback.get(), argv[2]);
g_logger = logging_configuration->get_forwarding_logger();
if( !is_worker)
logging_configuration->set_log_priority( 1);
logging_configuration = 0;
check_start_success( result);
std::vector<mi::base::Handle<mi::neuraylib::ICluster> > clusters;
for(
mi::Size i = 0; i < static_cast<mi::Size>( argc-3); ++i) {
cluster_configuration( cluster_network_configuration.get(), host_callback.get(), argv[i+3]);
check_success( cluster->join() == 0);
clusters.push_back( cluster);
}
is_worker ? run_worker( neuray.get(), clusters) : run_client( neuray.get(), clusters);
for(
mi::Size i = 0; i < static_cast<mi::Size>( argc-3); ++i) {
check_success( cluster->leave() == 0);
cluster_network_configuration->unregister_host_callback( host_callback.get());
}
clusters.clear();
check_success( neuray->
shutdown() == 0);
g_logger = 0;
network_configuration = 0;
neuray = 0;
check_success( unload());
keep_console_open();
return EXIT_SUCCESS;
}
Mixin class template for deriving interface implementations.
Definition: interface_implement.h:41
This API component is used to create additional clusters.
Definition: icluster_factory.h:33
This interface is used to interact with the distributed database.
Definition: idatabase.h:293
virtual IScope * get_global_scope() const =0
Returns the global scope which is the root of a tree of scopes.
This interface allows configuration of DiCE.
Definition: dice.h:82
A transaction provides a consistent view on the database.
Definition: dice.h:272
virtual Tag_struct name_to_tag(const char *name)=0
Returns the tag associated with a name in the database.
virtual base::IInterface * edit(Tag_struct tag)=0
Retrieves an element from the database and returns it ready for editing.
virtual const base::IInterface * access(Tag_struct tag)=0
Retrieves an element from the database.
virtual Tag_struct store(IElement *element, Tag_struct tag=NULL_TAG, const char *name=0, Privacy_level privacy_level=LOCAL_SCOPE)=0
Stores a new database element in the database.
virtual void advise(Tag_struct tag)=0
Advises the database that a given tag is required soon.
virtual Sint32 commit()=0
Commits the transaction.
virtual Sint32 execute_fragmented(IFragmented_job *job, Size count)=0
Executes a fragmented job synchronously.
The distributed cache service can be used to create temporary objects in a cluster.
Definition: idistcache.h:72
virtual Sint32 store_object(ISerializable *serializable, bool cluster_only=true)=0
Stores an object in the distributed cache.
virtual void unregister_receiver(base::Uuid class_id)=0
Unregisters a receiver with the distributed cache.
virtual void register_receiver(base::Uuid class_id, IObject_receiver *receiver)=0
Registers a receiver with the distributed cache.
virtual void remove_object(ISerializable *serializable)=0
Removes an object from the cache.
Abstract interface to report cluster status changes.
Definition: ihost_callback.h:35
This interface contains information about a local or remote host.
Definition: ihost_properties.h:27
virtual const IString * get_property(const char *key) const =0
Generic access to properties.
This interface is used for configuring the logging for the DiCE library.
Definition: ilogging_configuration.h:68
This interface is used to query and change the networking configuration.
Definition: inetwork_configuration.h:33
@ MODE_UDP
Networking is switched to UDP mode with multicast.
Definition: inetwork_configuration.h:53
virtual Sint32 set_multicast_address(const char *address)=0
Sets the base multicast address to be used for networking.
virtual Sint32 unregister_host_callback(IHost_callback *callback)=0
Unregisters a callback for cluster changes.
virtual Sint32 register_host_callback(IHost_callback *callback)=0
Registers a callback for cluster changes.
virtual Sint32 set_mode(Mode mode)=0
Sets the networking mode.
This is an object representing the DiCE library.
Definition: ineuray.h:44
virtual Sint32 shutdown(bool blocking=true)=0
Shuts down the library.
virtual base::IInterface * get_api_component(const base::Uuid &uuid) const =0
Returns an API component from the DiCE API.
virtual Sint32 start(bool blocking=true)=0
Starts the operation of the DiCE library.
A tag represents a unique identifier for database elements in the database.
Definition: iserializer.h:54
void printf(Message_severity level, const char *module_category, const char *message,...) __attribute__((format(printf
Emits a message to the application's log.
virtual void message(Message_severity level, const char *module_category, const char *message)
Emits a message to the application's log.
Definition: ilogger.h:215
@ MESSAGE_SEVERITY_INFO
This is a normal operational message.
Definition: enums.h:39
unsigned long long Uint64
64-bit unsigned integer.
Definition: types.h:62
unsigned int Uint32
32-bit unsigned integer.
Definition: types.h:49
Uint64 Size
Unsigned integral type that is large enough to hold the size of all types.
Definition: types.h:112
signed int Sint32
32-bit signed integer.
Definition: types.h:46