MONC
ioserver.F90
Go to the documentation of this file.
1 
36  use mpi, only : mpi_comm_world, mpi_statuses_ignore, mpi_byte
38  implicit none
39 
40 #ifndef TEST_MODE
41  private
42 #endif
43 
44  integer :: mpi_type_data_sizing_description, & !< The MPI type for field sizing (i.e. array size etc send when MONCs register)
45  mpi_type_definition_description, & !< The MPI data type for data descriptions sent to MONCs
47  type(io_configuration_type), volatile, save :: io_configuration
48  logical, volatile :: contine_poll_messages, & !< Whether to continue waiting command messages from any MONC processes
51  type(field_description_type), dimension(:), allocatable :: registree_field_descriptions
53 
54  integer, volatile :: monc_registration_lock
55 
56  public io_server_run
57 contains
58 
64  subroutine io_server_run(options_database, io_communicator_arg, &
65  provided_threading, total_global_processes, continuation_run, io_configuration_file)
66  type(hashmap_type), intent(inout) :: options_database
67  integer, intent(in) :: io_communicator_arg, provided_threading, total_global_processes
68  logical, intent(in) :: continuation_run
69  character(len=LONG_STRING_LENGTH), intent(in) :: io_configuration_file
70 
71  integer :: command, source, my_rank, ierr
72  character, dimension(:), allocatable :: data_buffer, io_xml_configuration
73  type(hashmap_type) :: diagnostic_generation_frequency
74 
75  if (continuation_run) then
76  ! Handle case where we need to allocate this due to no IO server config
77  call read_io_server_configuration(options_get_string(options_database, "checkpoint"), &
78  io_xml_configuration, io_communicator_arg)
79  end if
80 
81  if (.not. allocated(io_xml_configuration)) then
82  io_xml_configuration=get_io_xml(io_configuration_file)
83  if (continuation_run) then
84  call mpi_comm_rank(io_communicator_arg, my_rank, ierr)
85  if (my_rank == 0) then
86  call log_log(log_warn, "No IO server configuration in checkpoint file - starting from XML provided file instead")
87  end if
88  end if
89  end if
90 
91  call configuration_parse(options_database, io_xml_configuration, io_configuration)
92  deallocate(io_xml_configuration)
94  call initialise_mpi_communication(provided_threading)
96  call check_thread_status(forthread_mutex_init(io_configuration%general_info_mutex, -1))
101  io_configuration%io_communicator=io_communicator_arg
102  io_configuration%number_of_io_servers=get_number_io_servers(io_communicator_arg)
103  io_configuration%number_of_global_moncs=total_global_processes-io_configuration%number_of_io_servers
104  io_configuration%my_io_rank=get_my_io_rank(io_communicator_arg)
105  call initialise_logging(io_configuration%my_io_rank)
108  diagnostic_generation_frequency=initialise_diagnostic_federator(io_configuration)
109  call initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
110  call c_free(diagnostic_generation_frequency)
111  call initialise_writer_field_manager(io_configuration, continuation_run)
112 
116 
118 
119  do while (await_command(command, source, data_buffer))
120  call handle_command_message(command, source, data_buffer)
121  end do
122  call threadpool_deactivate()
128  call cancel_requests()
132  call threadpool_finalise()
133  end subroutine io_server_run
134 
139  logical function await_command(command, source, data_buffer)
140  integer, intent(out) :: command, source
141  character, dimension(:), allocatable :: data_buffer
142 
143  logical :: completed, inter_io_complete
144 
145  completed=.false.
146  await_command=.false.
147  do while(.not. completed)
148  if (.not. contine_poll_messages .and. .not. contine_poll_interio_messages) return
149  if (contine_poll_messages) then
150  if (test_for_command(command, source)) then
151  await_command=.true.
152  return
153  end if
154  end if
155  if (contine_poll_interio_messages .and. allocated(io_configuration%inter_io_communications)) then
156  inter_io_complete=test_for_inter_io(io_configuration%inter_io_communications, &
157  io_configuration%number_inter_io_communications, io_configuration%io_communicator, command, source, data_buffer)
158  if (inter_io_complete) then
159  await_command=.true.
160  return
161  end if
162  end if
163  if (.not. contine_poll_messages .and. .not. already_registered_finishing_call) then
167  end if
168  end if
169  if (.not. completed) call pause_for_mpi_interleaving()
170  end do
171  end function await_command
172 
179  subroutine termination_callback(io_configuration, values, field_name, timestep)
180  type(io_configuration_type), intent(inout) :: io_configuration
181  real(DEFAULT_PRECISION), dimension(:) :: values
182  character(len=STRING_LENGTH) :: field_name
183  integer :: timestep
184 
186  end subroutine termination_callback
187 
191  subroutine handle_command_message(command, source, data_buffer)
192  integer, intent(in) :: command, source
193  character, dimension(:), allocatable, intent(inout) :: data_buffer
194 
195  if (command == register_command) then
197  else if (command == deregister_command) then
199  else if (command == inter_io_communication) then
200  call threadpool_start_thread(handle_inter_io_communication_command, (/ source /), data_buffer=data_buffer)
201  deallocate(data_buffer)
202  else if (command .ge. data_command_start) then
204  end if
205  end subroutine handle_command_message
206 
209  subroutine handle_inter_io_communication_command(arguments, data_buffer)
210  integer, dimension(:), intent(in) :: arguments
211  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
212 
213  integer :: source
214 
215  source=arguments(1)
216 
217  call io_configuration%inter_io_communications(source)%handling_procedure(io_configuration, data_buffer, source)
219 
224  integer :: i, specific_monc_data_type
225  type(iterator_type) :: types_iterator
226 
227  do i=1, size(io_configuration%registered_moncs)
228  types_iterator=c_get_iterator(io_configuration%registered_moncs(i)%registered_monc_types)
229  do while (c_has_next(types_iterator))
230  specific_monc_data_type=c_get_integer(c_next_mapentry(types_iterator))
231  call free_mpi_type(specific_monc_data_type)
232  end do
233  if (allocated(io_configuration%registered_moncs(i)%field_start_locations)) &
234  deallocate(io_configuration%registered_moncs(i)%field_start_locations)
235  if (allocated(io_configuration%registered_moncs(i)%field_end_locations)) &
236  deallocate(io_configuration%registered_moncs(i)%field_end_locations)
237  if (allocated(io_configuration%registered_moncs(i)%definition_names)) &
238  deallocate(io_configuration%registered_moncs(i)%definition_names)
239  if (allocated(io_configuration%registered_moncs(i)%dimensions)) deallocate(io_configuration%registered_moncs(i)%dimensions)
240  end do
242 
245  subroutine handle_deregistration_command(arguments, data_buffer)
246  integer, dimension(:), intent(in) :: arguments
247  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
248 
249  integer :: monc_location, source
250 
251  source=arguments(1)
252  monc_location=get_monc_location(io_configuration, source)
253  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
254  do while (io_configuration%registered_moncs(monc_location)%active_threads .gt. 0)
255  call check_thread_status(forthread_cond_wait(io_configuration%registered_moncs(monc_location)%deactivate_condition_variable,&
256  io_configuration%registered_moncs(monc_location)%active_mutex))
257  end do
258  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
260  io_configuration%active_moncs=io_configuration%active_moncs-1
261  if (io_configuration%active_moncs==0) contine_poll_messages=.false.
263  end subroutine handle_deregistration_command
264 
269  subroutine pull_back_data_message_and_handle(source, data_set)
270  integer, intent(in) :: source, data_set
271 
272  integer :: specific_monc_data_type, specific_monc_buffer_size, recv_count, monc_location
273  character, dimension(:), allocatable :: data_buffer
274 
276  monc_location=get_monc_location(io_configuration, source)
277 
278  specific_monc_data_type=c_get_integer(io_configuration%registered_moncs(monc_location)%registered_monc_types, &
279  conv_to_string(data_set))
280  specific_monc_buffer_size=c_get_integer(io_configuration%registered_moncs(monc_location)%registered_monc_buffer_sizes, &
281  conv_to_string(data_set))
282 
283  allocate(data_buffer(specific_monc_buffer_size))
284  recv_count=data_receive(specific_monc_data_type, 1, source, dump_data=data_buffer, data_dump_id=data_set)
285 
287  call threadpool_start_thread(handle_data_message, (/ source, data_set /), data_buffer=data_buffer)
288  deallocate(data_buffer)
289  end subroutine pull_back_data_message_and_handle
290 
295  subroutine handle_data_message(arguments, data_buffer)
296  integer, dimension(:), intent(in) :: arguments
297  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
298 
299  integer :: monc_location, data_set, source, matched_datadefn_index
300 
301  source=arguments(1)
302  data_set=arguments(2)
303 
305  monc_location=get_monc_location(io_configuration, source)
306 
307  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
308  io_configuration%registered_moncs(monc_location)%active_threads=&
309  io_configuration%registered_moncs(monc_location)%active_threads+1
310  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
311 
312  matched_datadefn_index=retrieve_data_definition(io_configuration, &
313  io_configuration%registered_moncs(monc_location)%definition_names(data_set))
314 
315  if (matched_datadefn_index .gt. 0) then
316  call inform_writer_federator_time_point(io_configuration, source, data_set, data_buffer)
317  call pass_fields_to_diagnostics_federator(io_configuration, source, data_set, data_buffer)
318  call provide_monc_data_to_writer_federator(io_configuration, source, data_set, data_buffer)
319  call check_writer_for_trigger(io_configuration, source, data_set, data_buffer)
320  else
321  call log_log(log_warn, "IO server can not find matching data definition with name "&
322  //io_configuration%registered_moncs(monc_location)%definition_names(data_set))
323  end if
324 
325  call check_thread_status(forthread_mutex_lock(io_configuration%registered_moncs(monc_location)%active_mutex))
326  io_configuration%registered_moncs(monc_location)%active_threads=&
327  io_configuration%registered_moncs(monc_location)%active_threads-1
328  call check_thread_status(forthread_cond_signal(io_configuration%registered_moncs(monc_location)%deactivate_condition_variable))
329  call check_thread_status(forthread_mutex_unlock(io_configuration%registered_moncs(monc_location)%active_mutex))
331  end subroutine handle_data_message
332 
337  subroutine handle_monc_registration(arguments, data_buffer)
338  integer, dimension(:), intent(in) :: arguments
339  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
340 
341  integer :: configuration_send_request(2), ierr, number_data_definitions, this_monc_index, source
342 
343  source=arguments(1)
344  configuration_send_request=send_configuration_to_registree(source)
345  number_data_definitions=io_configuration%number_of_data_definitions
346 
348 
349  io_configuration%number_of_moncs=io_configuration%number_of_moncs+1
350  this_monc_index=io_configuration%number_of_moncs
351  if (io_configuration%number_of_moncs .gt. size(io_configuration%registered_moncs)) then
352  call log_log(log_error, "You have a high ratio of computational cores to IO servers, the limit is currently 100")
353  ! The extension of the MONC registration array is broken as the pointers involved in the map does not get copied across
354  ! we could manually do this, but that is for another day! If you need to extend these limits either increase the constants
355  ! or fix the extension, I don't think it will be too hard to fix the extension bit (copy the maps manually)
357  end if
358 
359  io_configuration%active_moncs=io_configuration%active_moncs+1
361 
362  call c_put_integer(io_configuration%monc_to_index, conv_to_string(source), this_monc_index)
363 
364  call check_thread_status(forthread_mutex_init(io_configuration%registered_moncs(this_monc_index)%active_mutex, -1))
366  io_configuration%registered_moncs(this_monc_index)%deactivate_condition_variable, -1))
367  io_configuration%registered_moncs(this_monc_index)%active_threads=0
368  io_configuration%registered_moncs(this_monc_index)%source_id=source
369 
370  allocate(io_configuration%registered_moncs(this_monc_index)%field_start_locations(number_data_definitions), &
371  io_configuration%registered_moncs(this_monc_index)%field_end_locations(number_data_definitions), &
372  io_configuration%registered_moncs(this_monc_index)%definition_names(number_data_definitions), &
373  io_configuration%registered_moncs(this_monc_index)%dimensions(number_data_definitions))
374 
375  ! Wait for configuration to have been sent to registree
376  call waitall_for_mpi_requests(configuration_send_request, 2)
377  call init_data_definition(source, io_configuration%registered_moncs(this_monc_index))
378  end subroutine handle_monc_registration
379 
383  function send_configuration_to_registree(source)
384  integer, intent(in) :: source
385  integer :: send_configuration_to_registree(2)
386 
387  integer :: ierr, srequest(2)
388 
389  call lock_mpi()
391  source, data_tag, mpi_comm_world, srequest(1), ierr)
393  source, data_tag, mpi_comm_world, srequest(2), ierr)
394  call unlock_mpi()
395 
396  send_configuration_to_registree=srequest
397  end function send_configuration_to_registree
398 
406  subroutine init_data_definition(source, monc_defn)
407  integer, intent(in) :: source
408  type(io_configuration_registered_monc_type), intent(inout) :: monc_defn
409 
410  type(data_sizing_description_type) :: data_description(io_configuration%number_of_distinct_data_fields+4)
411  integer :: created_mpi_type, data_size, recv_count, i
412  type(data_sizing_description_type) :: field_description
413  logical :: field_found
414 
415  recv_count=data_receive(mpi_type_data_sizing_description, io_configuration%number_of_distinct_data_fields+4, &
416  source, description_data=data_description)
417 
418  call handle_monc_dimension_information(data_description, monc_defn)
419 
420  do i=1, io_configuration%number_of_data_definitions
421  created_mpi_type=build_mpi_datatype(io_configuration%data_definitions(i), data_description, data_size, &
422  monc_defn%field_start_locations(i), monc_defn%field_end_locations(i), monc_defn%dimensions(i))
423 
424  call c_put_integer(monc_defn%registered_monc_types, conv_to_string(i), created_mpi_type)
425  call c_put_integer(monc_defn%registered_monc_buffer_sizes, conv_to_string(i), data_size)
426 
427  monc_defn%definition_names(i)=io_configuration%data_definitions(i)%name
428  end do
429  if (.not. initialised_present_data) then
431  field_found=get_data_description_from_name(data_description, number_q_indicies_key, field_description)
432  call c_put_integer(io_configuration%dimension_sizing, "active_q_indicies", field_description%dim_sizes(1))
433  call register_present_field_names_to_federators(data_description, recv_count)
434  end if
435  call get_monc_information_data(source)
436  end subroutine init_data_definition
437 
441  subroutine get_monc_information_data(source)
442  integer, intent(in) :: source
443 
444  character, dimension(:), allocatable :: buffer
445  character(len=STRING_LENGTH) :: q_field_name
446  integer :: buffer_size, z_size, num_q_fields, n, current_point, recv_count
447  type(data_sizing_description_type) :: field_description
448  real(kind=DEFAULT_PRECISION) :: dreal
449  logical :: field_found
450 
451 
452  z_size=c_get_integer(io_configuration%dimension_sizing, "z")
453  num_q_fields=c_get_integer(io_configuration%dimension_sizing, "qfields")
454 
455  buffer_size=(kind(dreal)*z_size) + (string_length * num_q_fields)
456  allocate(buffer(buffer_size))
457  recv_count=data_receive(mpi_byte, buffer_size, source, buffer)
458  if (.not. io_configuration%general_info_set) then
460  if (.not. io_configuration%general_info_set) then
461  io_configuration%general_info_set=.true.
462  allocate(io_configuration%zn_field(z_size))
463  io_configuration%zn_field=transfer(buffer(1:kind(dreal)*z_size), io_configuration%zn_field)
464  current_point=(kind(dreal)*z_size)
465  if (num_q_fields .gt. 0) then
466  do n=1, num_q_fields
467  q_field_name=transfer(buffer(current_point+1:current_point+string_length), q_field_name)
468  current_point=current_point+string_length
469  call replace_character(q_field_name, " ", "_")
470  call c_add_string(io_configuration%q_field_names, q_field_name)
471  end do
472  end if
473  end if
476  end if
477  deallocate(buffer)
478  end subroutine get_monc_information_data
479 
485  subroutine register_present_field_names_to_federators(data_description, recv_count)
486  type(data_sizing_description_type), dimension(:), intent(in) :: data_description
487  integer, intent(in) :: recv_count
488 
489  type(hashset_type) :: present_field_names
490  type(hashmap_type) :: diagnostics_field_names_and_roots
491  integer :: i, j
492 
493  do i=1, recv_count
494  call c_add_string(present_field_names, data_description(i)%field_name)
495  end do
496  do i=1, io_configuration%number_of_data_definitions
497  do j=1, io_configuration%data_definitions(i)%number_of_data_fields
498  if (io_configuration%data_definitions(i)%fields(j)%field_type == scalar_field_type .and. .not. &
499  io_configuration%data_definitions(i)%fields(j)%optional) then
500  call c_add_string(present_field_names, io_configuration%data_definitions(i)%fields(j)%name)
501  end if
502  end do
503  end do
504  call c_add_string(present_field_names, "time")
505  call c_add_string(present_field_names, "timestep")
507  diagnostics_field_names_and_roots=determine_diagnostics_fields_available(present_field_names)
508  call inform_writer_federator_fields_present(io_configuration, diag_field_names_and_roots=diagnostics_field_names_and_roots)
509  call c_free(present_field_names)
510  call c_free(diagnostics_field_names_and_roots)
512 
516  subroutine handle_monc_dimension_information(data_description, monc_defn)
517  type(io_configuration_registered_monc_type), intent(inout) :: monc_defn
518  type(data_sizing_description_type), dimension(:) :: data_description
519 
520  type(data_sizing_description_type) :: field_description
521  integer :: i
522  logical :: field_found
523 
524  field_found=get_data_description_from_name(data_description, local_sizes_key, field_description)
525  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local size information")
526  do i=1,3
527  monc_defn%local_dim_sizes(i)=field_description%dim_sizes(i)
528  end do
529  field_found=get_data_description_from_name(data_description, local_start_points_key, field_description)
530  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local start point information")
531  do i=1,3
532  monc_defn%local_dim_starts(i)=field_description%dim_sizes(i)
533  end do
534  field_found=get_data_description_from_name(data_description, local_end_points_key, field_description)
535  if (.not. field_found) call log_log(log_error, "Malformed MONC registration, no local end point information")
536  do i=1,3
537  monc_defn%local_dim_ends(i)=field_description%dim_sizes(i)
538  end do
539  end subroutine handle_monc_dimension_information
540 end module io_server_mod
Retrieves the integer value held at the specific map index or null if index > map elements...
logical function await_command(command, source, data_buffer)
Awaits a command or shutdown from MONC processes and other IO servers.
Definition: ioserver.F90:140
subroutine handle_monc_registration(arguments, data_buffer)
Handles registration from some MONC process. The source process sends some data description to this I...
Definition: ioserver.F90:338
integer, parameter, public long_string_length
Length of longer strings.
Definition: datadefn.F90:11
type(hashmap_type) function, public determine_diagnostics_fields_available(monc_field_names)
Determines the diagnostics fields that are available based upon the input MONC fields on registration...
integer function forthread_rwlock_init(rwlock_id, attr_id)
Definition: forthread.F90:504
Returns whether a collection is empty.
Puts an integer key-value pair into the map.
integer function forthread_mutex_unlock(mutex_id)
Definition: forthread.F90:302
integer function forthread_cond_signal(cond_id)
Definition: forthread.F90:394
integer function, dimension(2) send_configuration_to_registree(source)
Sends the data and field descriptions to the MONC process that just registered with the IO server...
Definition: ioserver.F90:384
subroutine, public inform_writer_federator_fields_present(io_configuration, field_names, diag_field_names_and_roots)
Informs the writer federator that specific fields are present and should be reflected in the diagnost...
integer function, public get_number_io_servers(io_comm)
Retrieves the number of IO servers that are running in total.
logical function, public test_for_command(command, source)
Tests for a command message based upon the request already registered.
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
subroutine handle_monc_dimension_information(data_description, monc_defn)
Handles the provided local MONC dimension and data layout information.
Definition: ioserver.F90:517
subroutine, public extend_registered_moncs_array(io_configuration)
Extends the data definitions array from the current size to the current size + data size stride...
character(len=string_length) function, public options_get_string(options_database, key, index)
Retrieves a string value from the database that matches the provided key.
subroutine, public threadpool_init(io_configuration)
Initialises the thread pool and marks each thread as idle.
Definition: threadpool.F90:51
subroutine, public register_command_receive()
Registers a request for receiving a command from any MONC process on the command channel.
logical, volatile contine_poll_interio_messages
Definition: ioserver.F90:50
Reads the IO server state that was stored in a NetCDF checkpoint file.
String utility functionality that is commonly used throughout MONC.
Definition: string_utils.F90:2
character(len=string_length), parameter, public local_end_points_key
Definition: ioclient.F90:43
integer, parameter, public register_command
Definition: ioclient.F90:34
subroutine register_present_field_names_to_federators(data_description, recv_count)
Registers with the writer federator the set of fields (prognostic and diagnostic) that are available...
Definition: ioserver.F90:486
Logging utility.
Definition: logging.F90:2
type(definition_description_type), dimension(:), allocatable registree_definition_descriptions
Definition: ioserver.F90:52
integer, parameter, public data_size_stride
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17
The writer field manager will manage aspects of the fields being provided to the writer federator...
logical function, public threadpool_is_idle()
Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work) ...
Definition: threadpool.F90:164
Abstraction layer around MPI, this issues and marshals the lower level communication details...
subroutine pull_back_data_message_and_handle(source, data_set)
Retrieves the message from MONC off the data channel and throws this to a thread in the thread pool t...
Definition: ioserver.F90:270
integer function forthread_rwlock_rdlock(lock_id)
Definition: forthread.F90:514
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
type(io_configuration_type), save, volatile io_configuration
Internal representation of the IO configuration.
Definition: ioserver.F90:47
logical function, public check_diagnostic_federator_for_completion(io_configuration)
Checks whether the diagnostics federator has completed or not, this is really checking all the underl...
integer mpi_type_field_description
The MPI data type for field descriptions sent to MONCs.
Definition: ioserver.F90:44
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
Definition: collections.F90:94
subroutine, public inform_writer_federator_time_point(io_configuration, source, data_id, data_dump)
integer, volatile monc_registration_lock
Definition: ioserver.F90:54
subroutine, public free_mpi_type(the_type)
Frees an MPI type, used in clean up.
subroutine, public threadpool_deactivate()
This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation an...
Definition: threadpool.F90:174
integer function forthread_mutex_init(mutex_id, attr_id)
Definition: forthread.F90:274
Gets a specific integer element out of the list, stack, queue or map with the corresponding key...
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
character(len=string_length), parameter, public local_sizes_key
Definition: ioclient.F90:43
integer function, public data_receive(mpi_datatype, num_elements, source, dump_data, data_dump_id, description_data)
Awaits some data on the data channel. This is of the type, size from the source provided and can eith...
subroutine, public initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
Initialises the write federator and configures it based on the user configuration. Also initialises the time manipulations.
integer function forthread_cond_init(cond_id, attr_id)
Definition: forthread.F90:356
Converts data types to strings.
Definition: conversions.F90:36
subroutine free_individual_registered_monc_aspects()
Frees up the memory associated with individual registered MONCs. This is done at the end for all MONC...
Definition: ioserver.F90:224
integer function forthread_rwlock_wrlock(lock_id)
Definition: forthread.F90:532
subroutine, public perform_global_callback(io_configuration, field_name, timestep, completion_procedure)
Performs a global callback.
logical function, public test_for_inter_io(inter_io_communications, number_of_inter_io, io_communicator, command, source, data_buffer)
Tests for inter IO server communication.
subroutine, public io_server_run(options_database, io_communicator_arg, provided_threading, total_global_processes, continuation_run, io_configuration_file)
Called to start the IO server and once this subroutine returns then it indicates that the IO server h...
Definition: ioserver.F90:66
subroutine get_monc_information_data(source)
Retrieves MONC information data, this is sent by MONC (and received) regardless, but only actioned if...
Definition: ioserver.F90:442
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
integer, parameter, public inter_io_communication
Field type identifiers.
Definition: ioclient.F90:34
subroutine handle_data_message(arguments, data_buffer)
Handles the command for data download from a specific process. This will allocate the receive buffer ...
Definition: ioserver.F90:296
This defines some constants and procedures that are useful to the IO server and clients that call it...
Definition: ioclient.F90:3
Map data structure that holds string (length 20 maximum) key value pairs.
Definition: collections.F90:86
subroutine handle_inter_io_communication_command(arguments, data_buffer)
Handles inter IO server communications.
Definition: ioserver.F90:210
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Definition: threadpool.F90:5
subroutine, public threadpool_finalise()
Finalises the thread pool.
Definition: threadpool.F90:192
Global callback inter IO, which registers the callback with identifiers and then the procedure is act...
type(field_description_type), dimension(:), allocatable registree_field_descriptions
Definition: ioserver.F90:51
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
Definition: threadpool.F90:229
integer mpi_type_data_sizing_description
The MPI type for field sizing (i.e. array size etc send when MONCs register)
Definition: ioserver.F90:44
integer function forthread_mutex_lock(mutex_id)
Definition: forthread.F90:284
subroutine handle_command_message(command, source, data_buffer)
Called to handle a specific command that has been recieved.
Definition: ioserver.F90:192
integer function forthread_rwlock_tryrdlock(lock_id)
Definition: forthread.F90:523
logical, volatile contine_poll_messages
Whether to continue waiting command messages from any MONC processes.
Definition: ioserver.F90:48
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
Definition: collections.F90:7
integer function, public get_monc_location(io_configuration, source)
A helper function to get the location of a MONC&#39;s configuration in the IO data structure.
subroutine, public initialise_mpi_communication(provided_threading)
Initialises MPI communication.
integer, parameter, public data_command_start
Definition: ioclient.F90:34
subroutine, public check_writer_for_trigger(io_configuration, source, data_id, data_dump)
Checks all writer entries for any trigger fires and issues the underlying file storage.
subroutine, public configuration_parse(provided_options_database, raw_configuration, parsed_configuration)
This will parse an XML string into the IO configuration.
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Definition: logging.F90:12
Configuration that representes the state of a registered MONC process.
integer, parameter, public string_length
Default length of strings.
Definition: datadefn.F90:10
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
subroutine, public initialise_writer_field_manager(io_configuration, continuation_run)
Initialises the writer field manager.
subroutine, public initialise_logging(pid)
Initialises the logging. This is done to make it easier for master logging only, so that we don&#39;t hav...
Definition: logging.F90:37
This federates over the writing of diagnostic and prognostic data to the file system. It also manages the time manipulation of fields and groups.
subroutine, public cancel_requests()
Cancels all outstanding communication requests.
logical function, public get_data_description_from_name(descriptions, name, field_description)
Look up the data description that corresponds to a specific field keyed by its name.
Definition: ioclient.F90:355
integer function, public retrieve_data_definition(io_configuration, key)
Retrieves a specific data definition from the configuration which matches a key.
integer function forthread_rwlock_destroy(rwlock_id)
Definition: forthread.F90:495
integer function, public build_mpi_type_definition_description()
Builds the MPI data type for sending data descriptions to registree MONCs.
Definition: ioclient.F90:76
subroutine, public read_io_server_configuration(checkpoint_filename, io_xml_configuration, io_communicator_arg)
Reads the IO server configuration, which is the XML configuration initially run with and stored in th...
integer function, public build_mpi_datatype(data_definition, data_size_info, data_size, field_start_locations, field_end_locations, field_dimensions)
Builds the MPI type that corresponds to the data which will be received from a specific MONC process...
subroutine, public provide_monc_data_to_writer_federator(io_configuration, source, data_id, data_dump)
Data communicated from MONC is provided to this write federator and then included if the configuratio...
integer function forthread_cond_wait(cond_id, mutex_id)
Definition: forthread.F90:376
Manages the options database. Contains administration functions and deduce runtime options from the c...
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
Definition: threadpool.F90:102
The main IO server functionality which handles waiting for commands and data both of which are delt w...
Definition: ioserver.F90:5
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
subroutine, public pass_fields_to_diagnostics_federator(io_configuration, source, data_id, data_dump)
Entry point into the diagnostics federator this runs the diagnostics, executing the defined rules bas...
subroutine, public provide_q_field_names_to_writer_federator(q_provided_field_names)
Provides the Q field names to the write federator, this is required as on initialisation we don&#39;t kno...
integer function forthread_rwlock_unlock(lock_id)
Definition: forthread.F90:550
type(hashmap_type) function, public initialise_diagnostic_federator(io_configuration)
Initialises the diagnostics action and sets up the diagnostics master definitions.
recursive character function, dimension(:), allocatable, public get_io_xml(filename, funit_num)
Reads in textual data from a file and returns this, used to read the IO server XML configuration file...
subroutine handle_deregistration_command(arguments, data_buffer)
Deregisteres a specific MONC source process.
Definition: ioserver.F90:246
subroutine, public finalise_writer_field_manager()
Finalises the writer field manager.
Hashset structure which will store unique strings. The hashing aspect means that lookup is very fast ...
subroutine, public pause_for_mpi_interleaving()
Pauses for a specific number of ms to allow for MPI interleaving, this is to avoid starvation...
character(len=string_length), parameter, public number_q_indicies_key
Definition: ioclient.F90:43
subroutine, public replace_character(str, src_char, tgt_char)
Replaces all occurances of a character in a string with another character.
logical, volatile already_registered_finishing_call
Definition: ioserver.F90:50
integer mpi_type_definition_description
The MPI data type for data descriptions sent to MONCs.
Definition: ioserver.F90:44
subroutine, public finalise_writer_federator()
Finalises the write federator and the manipulations.
integer, parameter, public scalar_field_type
Definition: ioclient.F90:38
Adds a string to the end of the list.
integer function, public build_mpi_type_data_sizing_description()
Builds the MPI type used for sending to the IO server a description of the data, namely the size of t...
Definition: ioclient.F90:147
subroutine, public finalise_diagnostic_federator(io_configuration)
Finalises the diagnostics federator, waiting for all outstanding requests and then freeing data...
subroutine init_data_definition(source, monc_defn)
Initialise the sizing of data definitions from a MONC process. The IO server determines, from configuration, the structure of each data definition but the size of the arrays depends upon the MONC process (due to uneven distribution of data etc...) This receives the sizing message and then builds the MPI datatype for each data definition that the IO server will receive from that specific MONC process. The field sizings are for all fields in every data definition, and these are applied to each data definition which will simply ignore non matching fields.
Definition: ioserver.F90:407
integer function, public build_mpi_type_field_description()
Builds the MPI data type for sending field descriptions to registree MONCs.
Definition: ioclient.F90:108
integer, parameter, public data_tag
Definition: ioclient.F90:34
Parses the XML configuration file to produce the io configuration description which contains the data...
type(field_description_type) function, dimension(:), allocatable, public build_field_description_type_from_configuration(io_configuration)
Builds up the field definition description type from the structured definitions in the IO configurati...
integer function, public get_my_io_rank(io_comm)
Retrieves my IO server rank out of the number of IO servers that are running.
This diagnostics federator will take in data fields sent from a MONC, perform operators on these as r...
integer, parameter, public deregister_command
Definition: ioclient.F90:34
logical, volatile initialised_present_data
Definition: ioserver.F90:48
Removes a specific element from the list or map.
type(definition_description_type) function, dimension(:), allocatable, public build_definition_description_type_from_configuration(io_configuration)
Builds up the data definition description type from the structured definitions in the IO configuratio...
character(len=string_length), parameter, public local_start_points_key
Definition: ioclient.F90:43
subroutine termination_callback(io_configuration, values, field_name, timestep)
This is the termination callback which is called once all MONCs have deregistered, no sends are active by inter IO communications and all threads are idle. This shuts down the inter IO listening and kickstarts finalisation and closure.
Definition: ioserver.F90:180