MONC
Functions/Subroutines | Variables
io_server_mod Module Reference

The main IO server functionality which handles waiting for commands and data both of which are delt with. The lower level details of the communication, configuration parsing etc are all held elsewhere. The server can be thought of similar to a bus, with command and data channels. The command gives context to what is on the data channel and not all commands require data (such as deregistration of MONC process) More...

Functions/Subroutines

subroutine, public io_server_run (options_database, io_communicator_arg, provided_threading, total_global_processes, continuation_run, io_configuration_file)
 Called to start the IO server and once this subroutine returns then it indicates that the IO server has finished. The runtime is spent in here awaiting commands and then dealing with them. Termination occurs when all MONC processes have deregistered, note that to trigger this then at least one MONC process must first register. More...
 
logical function await_command (command, source, data_buffer)
 Awaits a command or shutdown from MONC processes and other IO servers. More...
 
subroutine termination_callback (io_configuration, values, field_name, timestep)
 This is the termination callback which is called once all MONCs have deregistered, no sends are active by inter IO communications and all threads are idle. This shuts down the inter IO listening and kickstarts finalisation and closure. More...
 
subroutine handle_command_message (command, source, data_buffer)
 Called to handle a specific command that has been recieved. More...
 
subroutine handle_inter_io_communication_command (arguments, data_buffer)
 Handles inter IO server communications. More...
 
subroutine free_individual_registered_monc_aspects ()
 Frees up the memory associated with individual registered MONCs. This is done at the end for all MONCs as we can't deallocate dynamically in a threaded environment without excessive ordering and locking in case some data processing is queued or in progress. More...
 
subroutine handle_deregistration_command (arguments, data_buffer)
 Deregisteres a specific MONC source process. More...
 
subroutine pull_back_data_message_and_handle (source, data_set)
 Retrieves the message from MONC off the data channel and throws this to a thread in the thread pool to actually process We do it this way to enforce ordering between the command (including the data set ID) and the raw data itself. More...
 
subroutine handle_data_message (arguments, data_buffer)
 Handles the command for data download from a specific process. This will allocate the receive buffer and then call to get the data. Once it has been received then the data is run against handling rules. More...
 
subroutine handle_monc_registration (arguments, data_buffer)
 Handles registration from some MONC process. The source process sends some data description to this IO server which basically tells the IO server the size of the array datas (which might be different on different processes in the case of uneven decomposition.) Based upon this a communication (MPI) data type is constructed and the data size in bytes determined. More...
 
integer function, dimension(2) send_configuration_to_registree (source)
 Sends the data and field descriptions to the MONC process that just registered with the IO server. More...
 
subroutine init_data_definition (source, monc_defn)
 Initialise the sizing of data definitions from a MONC process. The IO server determines, from configuration, the structure of each data definition but the size of the arrays depends upon the MONC process (due to uneven distribution of data etc...) This receives the sizing message and then builds the MPI datatype for each data definition that the IO server will receive from that specific MONC process. The field sizings are for all fields in every data definition, and these are applied to each data definition which will simply ignore non matching fields. More...
 
subroutine get_monc_information_data (source)
 Retrieves MONC information data, this is sent by MONC (and received) regardless, but only actioned if the data has not already been set. More...
 
subroutine register_present_field_names_to_federators (data_description, recv_count)
 Registers with the writer federator the set of fields (prognostic and diagnostic) that are available, this is based on the array/optional fields present from MONC and the non-optional scalars. This is quite an expensive operation, so only done once. More...
 
subroutine handle_monc_dimension_information (data_description, monc_defn)
 Handles the provided local MONC dimension and data layout information. More...
 

Variables

integer mpi_type_data_sizing_description
 The MPI type for field sizing (i.e. array size etc send when MONCs register) More...
 
integer mpi_type_definition_description
 The MPI data type for data descriptions sent to MONCs. More...
 
integer mpi_type_field_description
 The MPI data type for field descriptions sent to MONCs. More...
 
type(io_configuration_type), save, volatile io_configuration
 Internal representation of the IO configuration. More...
 
logical, volatile contine_poll_messages
 Whether to continue waiting command messages from any MONC processes. More...
 
logical, volatile initialised_present_data
 
logical, volatile contine_poll_interio_messages
 
logical, volatile already_registered_finishing_call
 
type(field_description_type), dimension(:), allocatable registree_field_descriptions
 
type(definition_description_type), dimension(:), allocatable registree_definition_descriptions
 
integer, volatile monc_registration_lock
 

Detailed Description

The main IO server functionality which handles waiting for commands and data both of which are delt with. The lower level details of the communication, configuration parsing etc are all held elsewhere. The server can be thought of similar to a bus, with command and data channels. The command gives context to what is on the data channel and not all commands require data (such as deregistration of MONC process)

Function/Subroutine Documentation

◆ await_command()

logical function io_server_mod::await_command ( integer, intent(out)  command,
integer, intent(out)  source,
character, dimension(:), allocatable  data_buffer 
)
private

Awaits a command or shutdown from MONC processes and other IO servers.

Parameters
commandThe command received is output
sourceThe source process received is output
Returns
Whether to continue polling for commands (and whether to process the current output)

Definition at line 140 of file ioserver.F90.

140  integer, intent(out) :: command, source
141  character, dimension(:), allocatable :: data_buffer
142 
143  logical :: completed, inter_io_complete
144 
145  completed=.false.
146  await_command=.false.
147  do while(.not. completed)
148  if (.not. contine_poll_messages .and. .not. contine_poll_interio_messages) return
149  if (contine_poll_messages) then
150  if (test_for_command(command, source)) then
151  await_command=.true.
152  return
153  end if
154  end if
155  if (contine_poll_interio_messages .and. allocated(io_configuration%inter_io_communications)) then
156  inter_io_complete=test_for_inter_io(io_configuration%inter_io_communications, &
157  io_configuration%number_inter_io_communications, io_configuration%io_communicator, command, source, data_buffer)
158  if (inter_io_complete) then
159  await_command=.true.
160  return
161  end if
162  end if
163  if (.not. contine_poll_messages .and. .not. already_registered_finishing_call) then
164  if (check_diagnostic_federator_for_completion(io_configuration) .and. threadpool_is_idle()) then
165  already_registered_finishing_call=.true.
166  call perform_global_callback(io_configuration, "termination", 1, termination_callback)
167  end if
168  end if
169  if (.not. completed) call pause_for_mpi_interleaving()
170  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ free_individual_registered_monc_aspects()

subroutine io_server_mod::free_individual_registered_monc_aspects ( )
private

Frees up the memory associated with individual registered MONCs. This is done at the end for all MONCs as we can't deallocate dynamically in a threaded environment without excessive ordering and locking in case some data processing is queued or in progress.

Definition at line 224 of file ioserver.F90.

224  integer :: i, specific_monc_data_type
225  type(iterator_type) :: types_iterator
226 
227  do i=1, size(io_configuration%registered_moncs)
228  types_iterator=c_get_iterator(io_configuration%registered_moncs(i)%registered_monc_types)
229  do while (c_has_next(types_iterator))
230  specific_monc_data_type=c_get_integer(c_next_mapentry(types_iterator))
231  call free_mpi_type(specific_monc_data_type)
232  end do
233  if (allocated(io_configuration%registered_moncs(i)%field_start_locations)) &
234  deallocate(io_configuration%registered_moncs(i)%field_start_locations)
235  if (allocated(io_configuration%registered_moncs(i)%field_end_locations)) &
236  deallocate(io_configuration%registered_moncs(i)%field_end_locations)
237  if (allocated(io_configuration%registered_moncs(i)%definition_names)) &
238  deallocate(io_configuration%registered_moncs(i)%definition_names)
239  if (allocated(io_configuration%registered_moncs(i)%dimensions)) deallocate(io_configuration%registered_moncs(i)%dimensions)
240  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_monc_information_data()

subroutine io_server_mod::get_monc_information_data ( integer, intent(in)  source)
private

Retrieves MONC information data, this is sent by MONC (and received) regardless, but only actioned if the data has not already been set.

Parameters
sourceMONC source process

Definition at line 442 of file ioserver.F90.

442  integer, intent(in) :: source
443 
444  character, dimension(:), allocatable :: buffer
445  character(len=STRING_LENGTH) :: q_field_name
446  integer :: buffer_size, z_size, num_q_fields, n, current_point, recv_count
447  type(data_sizing_description_type) :: field_description
448  real(kind=DEFAULT_PRECISION) :: dreal
449  logical :: field_found
450 
451 
452  z_size=c_get_integer(io_configuration%dimension_sizing, "z")
453  num_q_fields=c_get_integer(io_configuration%dimension_sizing, "qfields")
454 
455  buffer_size=(kind(dreal)*z_size) + (string_length * num_q_fields)
456  allocate(buffer(buffer_size))
457  recv_count=data_receive(mpi_byte, buffer_size, source, buffer)
458  if (.not. io_configuration%general_info_set) then
459  call check_thread_status(forthread_mutex_lock(io_configuration%general_info_mutex))
460  if (.not. io_configuration%general_info_set) then
461  io_configuration%general_info_set=.true.
462  allocate(io_configuration%zn_field(z_size))
463  io_configuration%zn_field=transfer(buffer(1:kind(dreal)*z_size), io_configuration%zn_field)
464  current_point=(kind(dreal)*z_size)
465  if (num_q_fields .gt. 0) then
466  do n=1, num_q_fields
467  q_field_name=transfer(buffer(current_point+1:current_point+string_length), q_field_name)
468  current_point=current_point+string_length
469  call replace_character(q_field_name, " ", "_")
470  call c_add_string(io_configuration%q_field_names, q_field_name)
471  end do
472  end if
473  end if
474  call provide_q_field_names_to_writer_federator(io_configuration%q_field_names)
475  call check_thread_status(forthread_mutex_unlock(io_configuration%general_info_mutex))
476  end if
477  deallocate(buffer)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_command_message()

subroutine io_server_mod::handle_command_message ( integer, intent(in)  command,
integer, intent(in)  source,
character, dimension(:), intent(inout), allocatable  data_buffer 
)
private

Called to handle a specific command that has been recieved.

Parameters
commandThe command which has been received from some process
sourceThe PID of the source (MONC) process

Definition at line 192 of file ioserver.F90.

192  integer, intent(in) :: command, source
193  character, dimension(:), allocatable, intent(inout) :: data_buffer
194 
195  if (command == register_command) then
196  call threadpool_start_thread(handle_monc_registration, (/ source /))
197  else if (command == deregister_command) then
198  call threadpool_start_thread(handle_deregistration_command, (/ source /))
199  else if (command == inter_io_communication) then
200  call threadpool_start_thread(handle_inter_io_communication_command, (/ source /), data_buffer=data_buffer)
201  deallocate(data_buffer)
202  else if (command .ge. data_command_start) then
203  call pull_back_data_message_and_handle(source, command-data_command_start)
204  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_data_message()

subroutine io_server_mod::handle_data_message ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Handles the command for data download from a specific process. This will allocate the receive buffer and then call to get the data. Once it has been received then the data is run against handling rules.

Parameters
arguments,element1 is the source & element 2 is the data_set
data_bufferThe actual data from MONC read from the data channel

Definition at line 296 of file ioserver.F90.

296  integer, dimension(:), intent(in) :: arguments
297  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
298 
299  integer :: monc_location, data_set, source, matched_datadefn_index
300 
301  source=arguments(1)
302  data_set=arguments(2)
303 
304  call check_thread_status(forthread_rwlock_rdlock(monc_registration_lock))
305  monc_location=get_monc_location(io_configuration, source)
306 
307  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
308  io_configuration%registered_moncs(monc_location)%active_threads=&
309  io_configuration%registered_moncs(monc_location)%active_threads+1
310  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
311 
312  matched_datadefn_index=retrieve_data_definition(io_configuration, &
313  io_configuration%registered_moncs(monc_location)%definition_names(data_set))
314 
315  if (matched_datadefn_index .gt. 0) then
316  call inform_writer_federator_time_point(io_configuration, source, data_set, data_buffer)
317  call pass_fields_to_diagnostics_federator(io_configuration, source, data_set, data_buffer)
318  call provide_monc_data_to_writer_federator(io_configuration, source, data_set, data_buffer)
319  call check_writer_for_trigger(io_configuration, source, data_set, data_buffer)
320  else
321  call log_log(log_warn, "IO server can not find matching data definition with name "&
322  //io_configuration%registered_moncs(monc_location)%definition_names(data_set))
323  end if
324 
325  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
326  io_configuration%registered_moncs(monc_location)%active_threads=&
327  io_configuration%registered_moncs(monc_location)%active_threads-1
328  call check_thread_status(forthread_cond_signal(io_configuration%registered_moncs(monc_location)%deactivate_condition_variable))
329  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
330  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_deregistration_command()

subroutine io_server_mod::handle_deregistration_command ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Deregisteres a specific MONC source process.

Parameters
sourceThe MONC process PID that we are deregistering

Definition at line 246 of file ioserver.F90.

246  integer, dimension(:), intent(in) :: arguments
247  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
248 
249  integer :: monc_location, source
250 
251  source=arguments(1)
252  monc_location=get_monc_location(io_configuration, source)
253  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
254  do while (io_configuration%registered_moncs(monc_location)%active_threads .gt. 0)
255  call check_thread_status(forthread_cond_wait(io_configuration%registered_moncs(monc_location)%deactivate_condition_variable,&
256  io_configuration%registered_moncs(monc_location)%active_mutex))
257  end do
258  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
259  call check_thread_status(forthread_rwlock_wrlock(monc_registration_lock))
260  io_configuration%active_moncs=io_configuration%active_moncs-1
261  if (io_configuration%active_moncs==0) contine_poll_messages=.false.
262  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_inter_io_communication_command()

subroutine io_server_mod::handle_inter_io_communication_command ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Handles inter IO server communications.

Parameters
argumentsThe thread based arguments, this is the index of the inter IO server description

Definition at line 210 of file ioserver.F90.

210  integer, dimension(:), intent(in) :: arguments
211  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
212 
213  integer :: source
214 
215  source=arguments(1)
216 
217  call io_configuration%inter_io_communications(source)%handling_procedure(io_configuration, data_buffer, source)
Here is the caller graph for this function:

◆ handle_monc_dimension_information()

subroutine io_server_mod::handle_monc_dimension_information ( type(data_sizing_description_type), dimension(:)  data_description,
type(io_configuration_registered_monc_type), intent(inout)  monc_defn 
)
private

Handles the provided local MONC dimension and data layout information.

Parameters
data_descriptionThe data descriptions sent over from MONC
monc_defnThe corresponding MONC definition data structure

Definition at line 517 of file ioserver.F90.

517  type(io_configuration_registered_monc_type), intent(inout) :: monc_defn
518  type(data_sizing_description_type), dimension(:) :: data_description
519 
520  type(data_sizing_description_type) :: field_description
521  integer :: i
522  logical :: field_found
523 
524  field_found=get_data_description_from_name(data_description, local_sizes_key, field_description)
525  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local size information")
526  do i=1,3
527  monc_defn%local_dim_sizes(i)=field_description%dim_sizes(i)
528  end do
529  field_found=get_data_description_from_name(data_description, local_start_points_key, field_description)
530  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local start point information")
531  do i=1,3
532  monc_defn%local_dim_starts(i)=field_description%dim_sizes(i)
533  end do
534  field_found=get_data_description_from_name(data_description, local_end_points_key, field_description)
535  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local end point information")
536  do i=1,3
537  monc_defn%local_dim_ends(i)=field_description%dim_sizes(i)
538  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_monc_registration()

subroutine io_server_mod::handle_monc_registration ( integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(inout), optional, allocatable  data_buffer 
)
private

Handles registration from some MONC process. The source process sends some data description to this IO server which basically tells the IO server the size of the array datas (which might be different on different processes in the case of uneven decomposition.) Based upon this a communication (MPI) data type is constructed and the data size in bytes determined.

Parameters
sourceThe PID of the MONC process that is registering itself

Definition at line 338 of file ioserver.F90.

338  integer, dimension(:), intent(in) :: arguments
339  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
340 
341  integer :: configuration_send_request(2), ierr, number_data_definitions, this_monc_index, source
342 
343  source=arguments(1)
344  configuration_send_request=send_configuration_to_registree(source)
345  number_data_definitions=io_configuration%number_of_data_definitions
346 
347  call check_thread_status(forthread_rwlock_wrlock(monc_registration_lock))
348 
349  io_configuration%number_of_moncs=io_configuration%number_of_moncs+1
350  this_monc_index=io_configuration%number_of_moncs
351  if (io_configuration%number_of_moncs .gt. size(io_configuration%registered_moncs)) then
352  call log_log(log_error, "You have a high ratio of computational cores to IO servers, the limit is currently 100")
353  ! The extension of the MONC registration array is broken as the pointers involved in the map does not get copied across
354  ! we could manually do this, but that is for another day! If you need to extend these limits either increase the constants
355  ! or fix the extension, I don't think it will be too hard to fix the extension bit (copy the maps manually)
356  call extend_registered_moncs_array(io_configuration)
357  end if
358 
359  io_configuration%active_moncs=io_configuration%active_moncs+1
360  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
361 
362  call c_put_integer(io_configuration%monc_to_index, conv_to_string(source), this_monc_index)
363 
364  call check_thread_status(forthread_mutex_init(io_configuration%registered_moncs(this_monc_index)%active_mutex, -1))
365  call check_thread_status(forthread_cond_init(&
366  io_configuration%registered_moncs(this_monc_index)%deactivate_condition_variable, -1))
367  io_configuration%registered_moncs(this_monc_index)%active_threads=0
368  io_configuration%registered_moncs(this_monc_index)%source_id=source
369 
370  allocate(io_configuration%registered_moncs(this_monc_index)%field_start_locations(number_data_definitions), &
371  io_configuration%registered_moncs(this_monc_index)%field_end_locations(number_data_definitions), &
372  io_configuration%registered_moncs(this_monc_index)%definition_names(number_data_definitions), &
373  io_configuration%registered_moncs(this_monc_index)%dimensions(number_data_definitions))
374 
375  ! Wait for configuration to have been sent to registree
376  call waitall_for_mpi_requests(configuration_send_request, 2)
377  call init_data_definition(source, io_configuration%registered_moncs(this_monc_index))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ init_data_definition()

subroutine io_server_mod::init_data_definition ( integer, intent(in)  source,
type(io_configuration_registered_monc_type), intent(inout)  monc_defn 
)
private

Initialise the sizing of data definitions from a MONC process. The IO server determines, from configuration, the structure of each data definition but the size of the arrays depends upon the MONC process (due to uneven distribution of data etc...) This receives the sizing message and then builds the MPI datatype for each data definition that the IO server will receive from that specific MONC process. The field sizings are for all fields in every data definition, and these are applied to each data definition which will simply ignore non matching fields.

Parameters
sourceThe source MONC PID
monc_defnThe corresponding MONC definition data structure

Definition at line 407 of file ioserver.F90.

407  integer, intent(in) :: source
408  type(io_configuration_registered_monc_type), intent(inout) :: monc_defn
409 
410  type(data_sizing_description_type) :: data_description(io_configuration%number_of_distinct_data_fields+4)
411  integer :: created_mpi_type, data_size, recv_count, i
412  type(data_sizing_description_type) :: field_description
413  logical :: field_found
414 
415  recv_count=data_receive(mpi_type_data_sizing_description, io_configuration%number_of_distinct_data_fields+4, &
416  source, description_data=data_description)
417 
418  call handle_monc_dimension_information(data_description, monc_defn)
419 
420  do i=1, io_configuration%number_of_data_definitions
421  created_mpi_type=build_mpi_datatype(io_configuration%data_definitions(i), data_description, data_size, &
422  monc_defn%field_start_locations(i), monc_defn%field_end_locations(i), monc_defn%dimensions(i))
423 
424  call c_put_integer(monc_defn%registered_monc_types, conv_to_string(i), created_mpi_type)
425  call c_put_integer(monc_defn%registered_monc_buffer_sizes, conv_to_string(i), data_size)
426 
427  monc_defn%definition_names(i)=io_configuration%data_definitions(i)%name
428  end do
429  if (.not. initialised_present_data) then
430  initialised_present_data=.true.
431  field_found=get_data_description_from_name(data_description, number_q_indicies_key, field_description)
432  call c_put_integer(io_configuration%dimension_sizing, "active_q_indicies", field_description%dim_sizes(1))
433  call register_present_field_names_to_federators(data_description, recv_count)
434  end if
435  call get_monc_information_data(source)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ io_server_run()

subroutine, public io_server_mod::io_server_run ( type(hashmap_type), intent(inout)  options_database,
integer, intent(in)  io_communicator_arg,
integer, intent(in)  provided_threading,
integer, intent(in)  total_global_processes,
logical, intent(in)  continuation_run,
character(len=long_string_length), intent(in)  io_configuration_file 
)

Called to start the IO server and once this subroutine returns then it indicates that the IO server has finished. The runtime is spent in here awaiting commands and then dealing with them. Termination occurs when all MONC processes have deregistered, note that to trigger this then at least one MONC process must first register.

Parameters
io_communicator_argThe IO communicator containing just the IO servers
io_xml_configurationTextual XML configuration that is used to set up the IO server

Definition at line 66 of file ioserver.F90.

66  type(hashmap_type), intent(inout) :: options_database
67  integer, intent(in) :: io_communicator_arg, provided_threading, total_global_processes
68  logical, intent(in) :: continuation_run
69  character(len=LONG_STRING_LENGTH), intent(in) :: io_configuration_file
70 
71  integer :: command, source, my_rank, ierr
72  character, dimension(:), allocatable :: data_buffer, io_xml_configuration
73  type(hashmap_type) :: diagnostic_generation_frequency
74 
75  if (continuation_run) then
76  ! Handle case where we need to allocate this due to no IO server config
77  call read_io_server_configuration(options_get_string(options_database, "checkpoint"), &
78  io_xml_configuration, io_communicator_arg)
79  end if
80 
81  if (.not. allocated(io_xml_configuration)) then
82  io_xml_configuration=get_io_xml(io_configuration_file)
83  if (continuation_run) then
84  call mpi_comm_rank(io_communicator_arg, my_rank, ierr)
85  if (my_rank == 0) then
86  call log_log(log_warn, "No IO server configuration in checkpoint file - starting from XML provided file instead")
87  end if
88  end if
89  end if
90 
91  call configuration_parse(options_database, io_xml_configuration, io_configuration)
92  deallocate(io_xml_configuration)
93  call threadpool_init(io_configuration)
94  call initialise_mpi_communication(provided_threading)
95  call check_thread_status(forthread_rwlock_init(monc_registration_lock, -1))
96  call check_thread_status(forthread_mutex_init(io_configuration%general_info_mutex, -1))
97  initialised_present_data=.false.
98  contine_poll_messages=.true.
99  contine_poll_interio_messages=.true.
100  already_registered_finishing_call=.false.
101  io_configuration%io_communicator=io_communicator_arg
102  io_configuration%number_of_io_servers=get_number_io_servers(io_communicator_arg)
103  io_configuration%number_of_global_moncs=total_global_processes-io_configuration%number_of_io_servers
104  io_configuration%my_io_rank=get_my_io_rank(io_communicator_arg)
105  call initialise_logging(io_configuration%my_io_rank)
106  registree_definition_descriptions=build_definition_description_type_from_configuration(io_configuration)
107  registree_field_descriptions=build_field_description_type_from_configuration(io_configuration)
108  diagnostic_generation_frequency=initialise_diagnostic_federator(io_configuration)
109  call initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
110  call c_free(diagnostic_generation_frequency)
111  call initialise_writer_field_manager(io_configuration, continuation_run)
112 
113  mpi_type_data_sizing_description=build_mpi_type_data_sizing_description()
114  mpi_type_definition_description=build_mpi_type_definition_description()
115  mpi_type_field_description=build_mpi_type_field_description()
116 
117  call register_command_receive()
118 
119  do while (await_command(command, source, data_buffer))
120  call handle_command_message(command, source, data_buffer)
121  end do
122  call threadpool_deactivate()
123  call finalise_writer_field_manager()
124  call finalise_writer_federator()
125  call finalise_diagnostic_federator(io_configuration)
126  call check_thread_status(forthread_rwlock_destroy(monc_registration_lock))
127  call free_individual_registered_monc_aspects()
128  call cancel_requests()
129  call free_mpi_type(mpi_type_data_sizing_description)
130  call free_mpi_type(mpi_type_definition_description)
131  call free_mpi_type(mpi_type_field_description)
132  call threadpool_finalise()
Here is the call graph for this function:
Here is the caller graph for this function:

◆ pull_back_data_message_and_handle()

subroutine io_server_mod::pull_back_data_message_and_handle ( integer, intent(in)  source,
integer, intent(in)  data_set 
)
private

Retrieves the message from MONC off the data channel and throws this to a thread in the thread pool to actually process We do it this way to enforce ordering between the command (including the data set ID) and the raw data itself.

Parameters
sourceSource PID of the MONC process
data_setID of the data set being communicated

Definition at line 270 of file ioserver.F90.

270  integer, intent(in) :: source, data_set
271 
272  integer :: specific_monc_data_type, specific_monc_buffer_size, recv_count, monc_location
273  character, dimension(:), allocatable :: data_buffer
274 
275  call check_thread_status(forthread_rwlock_rdlock(monc_registration_lock))
276  monc_location=get_monc_location(io_configuration, source)
277 
278  specific_monc_data_type=c_get_integer(io_configuration%registered_moncs(monc_location)%registered_monc_types, &
279  conv_to_string(data_set))
280  specific_monc_buffer_size=c_get_integer(io_configuration%registered_moncs(monc_location)%registered_monc_buffer_sizes, &
281  conv_to_string(data_set))
282 
283  allocate(data_buffer(specific_monc_buffer_size))
284  recv_count=data_receive(specific_monc_data_type, 1, source, dump_data=data_buffer, data_dump_id=data_set)
285 
286  call check_thread_status(forthread_rwlock_unlock(monc_registration_lock))
287  call threadpool_start_thread(handle_data_message, (/ source, data_set /), data_buffer=data_buffer)
288  deallocate(data_buffer)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ register_present_field_names_to_federators()

subroutine io_server_mod::register_present_field_names_to_federators ( type(data_sizing_description_type), dimension(:), intent(in)  data_description,
integer, intent(in)  recv_count 
)
private

Registers with the writer federator the set of fields (prognostic and diagnostic) that are available, this is based on the array/optional fields present from MONC and the non-optional scalars. This is quite an expensive operation, so only done once.

Parameters
data_descriptionArray of data descriptions from MONC
recv_countNumber of data descriptions

Definition at line 486 of file ioserver.F90.

486  type(data_sizing_description_type), dimension(:), intent(in) :: data_description
487  integer, intent(in) :: recv_count
488 
489  type(hashset_type) :: present_field_names
490  type(hashmap_type) :: diagnostics_field_names_and_roots
491  integer :: i, j
492 
493  do i=1, recv_count
494  call c_add_string(present_field_names, data_description(i)%field_name)
495  end do
496  do i=1, io_configuration%number_of_data_definitions
497  do j=1, io_configuration%data_definitions(i)%number_of_data_fields
498  if (io_configuration%data_definitions(i)%fields(j)%field_type == scalar_field_type .and. .not. &
499  io_configuration%data_definitions(i)%fields(j)%optional) then
500  call c_add_string(present_field_names, io_configuration%data_definitions(i)%fields(j)%name)
501  end if
502  end do
503  end do
504  call c_add_string(present_field_names, "time")
505  call c_add_string(present_field_names, "timestep")
506  call inform_writer_federator_fields_present(io_configuration, present_field_names)
507  diagnostics_field_names_and_roots=determine_diagnostics_fields_available(present_field_names)
508  call inform_writer_federator_fields_present(io_configuration, diag_field_names_and_roots=diagnostics_field_names_and_roots)
509  call c_free(present_field_names)
510  call c_free(diagnostics_field_names_and_roots)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ send_configuration_to_registree()

integer function, dimension(2) io_server_mod::send_configuration_to_registree ( integer, intent(in)  source)
private

Sends the data and field descriptions to the MONC process that just registered with the IO server.

Parameters
sourceThe MPI rank (MPI_COMM_WORLD) of the registree
Returns
The nonblocking send request handles which can be waited for completion later (overlap compute and communication)

Definition at line 384 of file ioserver.F90.

384  integer, intent(in) :: source
385  integer :: send_configuration_to_registree(2)
386 
387  integer :: ierr, srequest(2)
388 
389  call lock_mpi()
390  call mpi_isend(registree_definition_descriptions, size(registree_definition_descriptions), mpi_type_definition_description, &
391  source, data_tag, mpi_comm_world, srequest(1), ierr)
392  call mpi_isend(registree_field_descriptions, size(registree_field_descriptions), mpi_type_field_description, &
393  source, data_tag, mpi_comm_world, srequest(2), ierr)
394  call unlock_mpi()
395 
396  send_configuration_to_registree=srequest
Here is the call graph for this function:
Here is the caller graph for this function:

◆ termination_callback()

subroutine io_server_mod::termination_callback ( type(io_configuration_type), intent(inout)  io_configuration,
real(default_precision), dimension(:)  values,
character(len=string_length)  field_name,
integer  timestep 
)
private

This is the termination callback which is called once all MONCs have deregistered, no sends are active by inter IO communications and all threads are idle. This shuts down the inter IO listening and kickstarts finalisation and closure.

Parameters
io_configurationThe IO server configuration
valuesValues (ignored)
field_nameField name identifier
timestepTimestep identifier

Definition at line 180 of file ioserver.F90.

180  type(io_configuration_type), intent(inout) :: io_configuration
181  real(DEFAULT_PRECISION), dimension(:) :: values
182  character(len=STRING_LENGTH) :: field_name
183  integer :: timestep
184 
185  contine_poll_interio_messages=.false.
Here is the caller graph for this function:

Variable Documentation

◆ already_registered_finishing_call

logical, volatile io_server_mod::already_registered_finishing_call
private

Definition at line 50 of file ioserver.F90.

◆ contine_poll_interio_messages

logical, volatile io_server_mod::contine_poll_interio_messages
private

Definition at line 50 of file ioserver.F90.

50  logical, volatile :: contine_poll_interio_messages, already_registered_finishing_call

◆ contine_poll_messages

logical, volatile io_server_mod::contine_poll_messages
private

Whether to continue waiting command messages from any MONC processes.

Definition at line 48 of file ioserver.F90.

48  logical, volatile :: contine_poll_messages, & !< Whether to continue waiting command messages from any MONC processes
49  initialised_present_data

◆ initialised_present_data

logical, volatile io_server_mod::initialised_present_data
private

Definition at line 48 of file ioserver.F90.

◆ io_configuration

type(io_configuration_type), save, volatile io_server_mod::io_configuration
private

Internal representation of the IO configuration.

Definition at line 47 of file ioserver.F90.

47  type(io_configuration_type), volatile, save :: io_configuration

◆ monc_registration_lock

integer, volatile io_server_mod::monc_registration_lock
private

Definition at line 54 of file ioserver.F90.

54  integer, volatile :: monc_registration_lock

◆ mpi_type_data_sizing_description

integer io_server_mod::mpi_type_data_sizing_description
private

The MPI type for field sizing (i.e. array size etc send when MONCs register)

Definition at line 44 of file ioserver.F90.

44  integer :: mpi_type_data_sizing_description, & !< The MPI type for field sizing (i.e. array size etc send when MONCs register)
45  mpi_type_definition_description, & !< The MPI data type for data descriptions sent to MONCs
46  mpi_type_field_description

◆ mpi_type_definition_description

integer io_server_mod::mpi_type_definition_description
private

The MPI data type for data descriptions sent to MONCs.

Definition at line 44 of file ioserver.F90.

◆ mpi_type_field_description

integer io_server_mod::mpi_type_field_description
private

The MPI data type for field descriptions sent to MONCs.

Definition at line 44 of file ioserver.F90.

◆ registree_definition_descriptions

type(definition_description_type), dimension(:), allocatable io_server_mod::registree_definition_descriptions
private

Definition at line 52 of file ioserver.F90.

52  type(definition_description_type), dimension(:), allocatable :: registree_definition_descriptions

◆ registree_field_descriptions

type(field_description_type), dimension(:), allocatable io_server_mod::registree_field_descriptions
private

Definition at line 51 of file ioserver.F90.

51  type(field_description_type), dimension(:), allocatable :: registree_field_descriptions