36 use mpi
, only : mpi_comm_world, mpi_statuses_ignore, mpi_byte
48 logical,
volatile ::
contine_poll_messages, & !< Whether to continue waiting command messages from any MONC processes
64 subroutine io_server_run(options_database, io_communicator_arg, &
65 provided_threading, total_global_processes, continuation_run, io_configuration_file)
67 integer,
intent(in) :: io_communicator_arg, provided_threading, total_global_processes
68 logical,
intent(in) :: continuation_run
69 character(len=LONG_STRING_LENGTH),
intent(in) :: io_configuration_file
71 integer :: command, source, my_rank, ierr
72 character,
dimension(:),
allocatable :: data_buffer, io_xml_configuration
75 if (continuation_run)
then 78 io_xml_configuration, io_communicator_arg)
81 if (.not.
allocated(io_xml_configuration))
then 82 io_xml_configuration=
get_io_xml(io_configuration_file)
83 if (continuation_run)
then 84 call mpi_comm_rank(io_communicator_arg, my_rank, ierr)
85 if (my_rank == 0)
then 86 call log_log(
log_warn,
"No IO server configuration in checkpoint file - starting from XML provided file instead")
92 deallocate(io_xml_configuration)
110 call c_free(diagnostic_generation_frequency)
140 integer,
intent(out) :: command, source
141 character,
dimension(:),
allocatable :: data_buffer
143 logical :: completed, inter_io_complete
147 do while(.not. completed)
158 if (inter_io_complete)
then 181 real(DEFAULT_PRECISION),
dimension(:) :: values
182 character(len=STRING_LENGTH) :: field_name
192 integer,
intent(in) :: command, source
193 character,
dimension(:),
allocatable,
intent(inout) :: data_buffer
201 deallocate(data_buffer)
210 integer,
dimension(:),
intent(in) :: arguments
211 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
224 integer :: i, specific_monc_data_type
233 if (
allocated(
io_configuration%registered_moncs(i)%field_start_locations)) &
246 integer,
dimension(:),
intent(in) :: arguments
247 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
249 integer :: monc_location, source
254 do while (
io_configuration%registered_moncs(monc_location)%active_threads .gt. 0)
270 integer,
intent(in) :: source, data_set
272 integer :: specific_monc_data_type, specific_monc_buffer_size, recv_count, monc_location
273 character,
dimension(:),
allocatable :: data_buffer
283 allocate(data_buffer(specific_monc_buffer_size))
284 recv_count=
data_receive(specific_monc_data_type, 1, source, dump_data=data_buffer, data_dump_id=data_set)
288 deallocate(data_buffer)
296 integer,
dimension(:),
intent(in) :: arguments
297 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
299 integer :: monc_location, data_set, source, matched_datadefn_index
302 data_set=arguments(2)
313 io_configuration%registered_moncs(monc_location)%definition_names(data_set))
315 if (matched_datadefn_index .gt. 0)
then 321 call log_log(
log_warn,
"IO server can not find matching data definition with name "&
322 //
io_configuration%registered_moncs(monc_location)%definition_names(data_set))
338 integer,
dimension(:),
intent(in) :: arguments
339 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
341 integer :: configuration_send_request(2), ierr, number_data_definitions, this_monc_index, source
352 call log_log(
log_error,
"You have a high ratio of computational cores to IO servers, the limit is currently 100")
366 io_configuration%registered_moncs(this_monc_index)%deactivate_condition_variable, -1))
370 allocate(
io_configuration%registered_moncs(this_monc_index)%field_start_locations(number_data_definitions), &
371 io_configuration%registered_moncs(this_monc_index)%field_end_locations(number_data_definitions), &
372 io_configuration%registered_moncs(this_monc_index)%definition_names(number_data_definitions), &
373 io_configuration%registered_moncs(this_monc_index)%dimensions(number_data_definitions))
384 integer,
intent(in) :: source
385 integer :: send_configuration_to_registree(2)
387 integer :: ierr, srequest(2)
391 source,
data_tag, mpi_comm_world, srequest(1), ierr)
393 source,
data_tag, mpi_comm_world, srequest(2), ierr)
396 send_configuration_to_registree=srequest
407 integer,
intent(in) :: source
411 integer :: created_mpi_type, data_size, recv_count, i
413 logical :: field_found
416 source, description_data=data_description)
422 monc_defn%field_start_locations(i), monc_defn%field_end_locations(i), monc_defn%dimensions(i))
442 integer,
intent(in) :: source
444 character,
dimension(:),
allocatable :: buffer
445 character(len=STRING_LENGTH) :: q_field_name
446 integer :: buffer_size, z_size, num_q_fields, n, current_point, recv_count
448 real(kind=DEFAULT_PRECISION) :: dreal
449 logical :: field_found
455 buffer_size=(kind(dreal)*z_size) + (
string_length * num_q_fields)
456 allocate(buffer(buffer_size))
457 recv_count=
data_receive(mpi_byte, buffer_size, source, buffer)
464 current_point=(kind(dreal)*z_size)
465 if (num_q_fields .gt. 0)
then 467 q_field_name=transfer(buffer(current_point+1:current_point+
string_length), q_field_name)
487 integer,
intent(in) :: recv_count
494 call c_add_string(present_field_names, data_description(i)%field_name)
509 call c_free(present_field_names)
510 call c_free(diagnostics_field_names_and_roots)
522 logical :: field_found
525 if (.not. field_found)
call log_log(
log_error,
"Malformed MONC registration, no local size information")
527 monc_defn%local_dim_sizes(i)=field_description%dim_sizes(i)
530 if (.not. field_found)
call log_log(
log_error,
"Malformed MONC registration, no local start point information")
532 monc_defn%local_dim_starts(i)=field_description%dim_sizes(i)
535 if (.not. field_found)
call log_log(
log_error,
"Malformed MONC registration, no local end point information")
537 monc_defn%local_dim_ends(i)=field_description%dim_sizes(i)
Configuration of a specific data definition.
Retrieves the integer value held at the specific map index or null if index > map elements...
logical function await_command(command, source, data_buffer)
Awaits a command or shutdown from MONC processes and other IO servers.
subroutine handle_monc_registration(arguments, data_buffer)
Handles registration from some MONC process. The source process sends some data description to this I...
integer, parameter, public long_string_length
Length of longer strings.
type(hashmap_type) function, public determine_diagnostics_fields_available(monc_field_names)
Determines the diagnostics fields that are available based upon the input MONC fields on registration...
integer function forthread_rwlock_init(rwlock_id, attr_id)
Returns whether a collection is empty.
Puts an integer key-value pair into the map.
integer function forthread_mutex_unlock(mutex_id)
integer function forthread_cond_signal(cond_id)
integer function, dimension(2) send_configuration_to_registree(source)
Sends the data and field descriptions to the MONC process that just registered with the IO server...
Overall IO configuration.
subroutine, public inform_writer_federator_fields_present(io_configuration, field_names, diag_field_names_and_roots)
Informs the writer federator that specific fields are present and should be reflected in the diagnost...
integer function, public get_number_io_servers(io_comm)
Retrieves the number of IO servers that are running in total.
logical function, public test_for_command(command, source)
Tests for a command message based upon the request already registered.
integer, parameter, public log_error
Only log ERROR messages.
subroutine handle_monc_dimension_information(data_description, monc_defn)
Handles the provided local MONC dimension and data layout information.
subroutine, public extend_registered_moncs_array(io_configuration)
Extends the data definitions array from the current size to the current size + data size stride...
character(len=string_length) function, public options_get_string(options_database, key, index)
Retrieves a string value from the database that matches the provided key.
subroutine, public threadpool_init(io_configuration)
Initialises the thread pool and marks each thread as idle.
subroutine, public register_command_receive()
Registers a request for receiving a command from any MONC process on the command channel.
logical, volatile contine_poll_interio_messages
Reads the IO server state that was stored in a NetCDF checkpoint file.
String utility functionality that is commonly used throughout MONC.
character(len=string_length), parameter, public local_end_points_key
integer, parameter, public register_command
subroutine register_present_field_names_to_federators(data_description, recv_count)
Registers with the writer federator the set of fields (prognostic and diagnostic) that are available...
type(definition_description_type), dimension(:), allocatable registree_definition_descriptions
integer, parameter, public data_size_stride
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
The writer field manager will manage aspects of the fields being provided to the writer federator...
logical function, public threadpool_is_idle()
Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work) ...
Abstraction layer around MPI, this issues and marshals the lower level communication details...
subroutine pull_back_data_message_and_handle(source, data_set)
Retrieves the message from MONC off the data channel and throws this to a thread in the thread pool t...
integer function forthread_rwlock_rdlock(lock_id)
Contains common definitions for the data and datatypes used by MONC.
type(io_configuration_type), save, volatile io_configuration
Internal representation of the IO configuration.
logical function, public check_diagnostic_federator_for_completion(io_configuration)
Checks whether the diagnostics federator has completed or not, this is really checking all the underl...
integer mpi_type_field_description
The MPI data type for field descriptions sent to MONCs.
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
subroutine, public inform_writer_federator_time_point(io_configuration, source, data_id, data_dump)
integer, volatile monc_registration_lock
subroutine, public free_mpi_type(the_type)
Frees an MPI type, used in clean up.
subroutine, public threadpool_deactivate()
This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation an...
integer function forthread_mutex_init(mutex_id, attr_id)
Gets a specific integer element out of the list, stack, queue or map with the corresponding key...
Conversion between common inbuilt FORTRAN data types.
character(len=string_length), parameter, public local_sizes_key
integer function, public data_receive(mpi_datatype, num_elements, source, dump_data, data_dump_id, description_data)
Awaits some data on the data channel. This is of the type, size from the source provided and can eith...
subroutine, public initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
Initialises the write federator and configures it based on the user configuration. Also initialises the time manipulations.
integer function forthread_cond_init(cond_id, attr_id)
Converts data types to strings.
subroutine free_individual_registered_monc_aspects()
Frees up the memory associated with individual registered MONCs. This is done at the end for all MONC...
integer function forthread_rwlock_wrlock(lock_id)
subroutine, public perform_global_callback(io_configuration, field_name, timestep, completion_procedure)
Performs a global callback.
logical function, public test_for_inter_io(inter_io_communications, number_of_inter_io, io_communicator, command, source, data_buffer)
Tests for inter IO server communication.
subroutine, public io_server_run(options_database, io_communicator_arg, provided_threading, total_global_processes, continuation_run, io_configuration_file)
Called to start the IO server and once this subroutine returns then it indicates that the IO server h...
subroutine get_monc_information_data(source)
Retrieves MONC information data, this is sent by MONC (and received) regardless, but only actioned if...
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
integer, parameter, public inter_io_communication
Field type identifiers.
subroutine handle_data_message(arguments, data_buffer)
Handles the command for data download from a specific process. This will allocate the receive buffer ...
This defines some constants and procedures that are useful to the IO server and clients that call it...
Map data structure that holds string (length 20 maximum) key value pairs.
subroutine handle_inter_io_communication_command(arguments, data_buffer)
Handles inter IO server communications.
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
subroutine, public threadpool_finalise()
Finalises the thread pool.
Global callback inter IO, which registers the callback with identifiers and then the procedure is act...
type(field_description_type), dimension(:), allocatable registree_field_descriptions
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
integer mpi_type_data_sizing_description
The MPI type for field sizing (i.e. array size etc send when MONCs register)
integer function forthread_mutex_lock(mutex_id)
subroutine handle_command_message(command, source, data_buffer)
Called to handle a specific command that has been recieved.
integer function forthread_rwlock_tryrdlock(lock_id)
logical, volatile contine_poll_messages
Whether to continue waiting command messages from any MONC processes.
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
integer function, public get_monc_location(io_configuration, source)
A helper function to get the location of a MONC's configuration in the IO data structure.
subroutine, public initialise_mpi_communication(provided_threading)
Initialises MPI communication.
integer, parameter, public data_command_start
subroutine, public check_writer_for_trigger(io_configuration, source, data_id, data_dump)
Checks all writer entries for any trigger fires and issues the underlying file storage.
subroutine, public configuration_parse(provided_options_database, raw_configuration, parsed_configuration)
This will parse an XML string into the IO configuration.
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Configuration that representes the state of a registered MONC process.
integer, parameter, public string_length
Default length of strings.
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
subroutine, public initialise_writer_field_manager(io_configuration, continuation_run)
Initialises the writer field manager.
subroutine, public initialise_logging(pid)
Initialises the logging. This is done to make it easier for master logging only, so that we don't hav...
This federates over the writing of diagnostic and prognostic data to the file system. It also manages the time manipulation of fields and groups.
subroutine, public cancel_requests()
Cancels all outstanding communication requests.
logical function, public get_data_description_from_name(descriptions, name, field_description)
Look up the data description that corresponds to a specific field keyed by its name.
integer function, public retrieve_data_definition(io_configuration, key)
Retrieves a specific data definition from the configuration which matches a key.
integer function forthread_rwlock_destroy(rwlock_id)
integer function, public build_mpi_type_definition_description()
Builds the MPI data type for sending data descriptions to registree MONCs.
subroutine, public read_io_server_configuration(checkpoint_filename, io_xml_configuration, io_communicator_arg)
Reads the IO server configuration, which is the XML configuration initially run with and stored in th...
integer function, public build_mpi_datatype(data_definition, data_size_info, data_size, field_start_locations, field_end_locations, field_dimensions)
Builds the MPI type that corresponds to the data which will be received from a specific MONC process...
subroutine, public provide_monc_data_to_writer_federator(io_configuration, source, data_id, data_dump)
Data communicated from MONC is provided to this write federator and then included if the configuratio...
integer function forthread_cond_wait(cond_id, mutex_id)
Manages the options database. Contains administration functions and deduce runtime options from the c...
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
The main IO server functionality which handles waiting for commands and data both of which are delt w...
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
subroutine, public pass_fields_to_diagnostics_federator(io_configuration, source, data_id, data_dump)
Entry point into the diagnostics federator this runs the diagnostics, executing the defined rules bas...
subroutine, public provide_q_field_names_to_writer_federator(q_provided_field_names)
Provides the Q field names to the write federator, this is required as on initialisation we don't kno...
integer function forthread_rwlock_unlock(lock_id)
type(hashmap_type) function, public initialise_diagnostic_federator(io_configuration)
Initialises the diagnostics action and sets up the diagnostics master definitions.
recursive character function, dimension(:), allocatable, public get_io_xml(filename, funit_num)
Reads in textual data from a file and returns this, used to read the IO server XML configuration file...
subroutine handle_deregistration_command(arguments, data_buffer)
Deregisteres a specific MONC source process.
subroutine, public finalise_writer_field_manager()
Finalises the writer field manager.
Hashset structure which will store unique strings. The hashing aspect means that lookup is very fast ...
subroutine, public pause_for_mpi_interleaving()
Pauses for a specific number of ms to allow for MPI interleaving, this is to avoid starvation...
character(len=string_length), parameter, public number_q_indicies_key
subroutine, public replace_character(str, src_char, tgt_char)
Replaces all occurances of a character in a string with another character.
logical, volatile already_registered_finishing_call
integer mpi_type_definition_description
The MPI data type for data descriptions sent to MONCs.
subroutine, public finalise_writer_federator()
Finalises the write federator and the manipulations.
integer, parameter, public scalar_field_type
Adds a string to the end of the list.
integer function, public build_mpi_type_data_sizing_description()
Builds the MPI type used for sending to the IO server a description of the data, namely the size of t...
subroutine, public finalise_diagnostic_federator(io_configuration)
Finalises the diagnostics federator, waiting for all outstanding requests and then freeing data...
subroutine init_data_definition(source, monc_defn)
Initialise the sizing of data definitions from a MONC process. The IO server determines, from configuration, the structure of each data definition but the size of the arrays depends upon the MONC process (due to uneven distribution of data etc...) This receives the sizing message and then builds the MPI datatype for each data definition that the IO server will receive from that specific MONC process. The field sizings are for all fields in every data definition, and these are applied to each data definition which will simply ignore non matching fields.
integer function, public build_mpi_type_field_description()
Builds the MPI data type for sending field descriptions to registree MONCs.
integer, parameter, public data_tag
Parses the XML configuration file to produce the io configuration description which contains the data...
type(field_description_type) function, dimension(:), allocatable, public build_field_description_type_from_configuration(io_configuration)
Builds up the field definition description type from the structured definitions in the IO configurati...
integer function, public get_my_io_rank(io_comm)
Retrieves my IO server rank out of the number of IO servers that are running.
This diagnostics federator will take in data fields sent from a MONC, perform operators on these as r...
integer, parameter, public deregister_command
logical, volatile initialised_present_data
Removes a specific element from the list or map.
type(definition_description_type) function, dimension(:), allocatable, public build_definition_description_type_from_configuration(io_configuration)
Builds up the data definition description type from the structured definitions in the IO configuratio...
character(len=string_length), parameter, public local_start_points_key
subroutine termination_callback(io_configuration, values, field_name, timestep)
This is the termination callback which is called once all MONCs have deregistered, no sends are active by inter IO communications and all threads are idle. This shuts down the inter IO listening and kickstarts finalisation and closure.