MONC
reduction-inter-io.F90
Go to the documentation of this file.
1 
15  use logging_mod, only : log_error, log_log
18  use mpi, only : mpi_double_precision, mpi_int, mpi_any_source, mpi_request_null, mpi_status_ignore, mpi_character, mpi_byte
20  implicit none
21 
22 #ifndef TEST_MODE
23  private
24 #endif
25 
26 
27  integer, parameter :: mean=1, min=2, max=3, sum=4
28  integer, parameter :: my_inter_io_tag=1, perform_clean_every=200
29  character(len=*), parameter :: my_inter_io_name="reductioninterio"
30 
32  real(DEFAULT_PRECISION), dimension(:), allocatable :: values
33  character, dimension(:), allocatable :: send_buffer
34  character(len=STRING_LENGTH) :: field_name
35  integer :: contributed_moncs, contributed_io_servers, timestep, reduction_operator, async_handle, mutex, root
36  procedure(handle_completion), pointer, nopass :: completion_procedure
37  end type reduction_progress_type
38 
42  logical, volatile :: initialised=.false.
43 
46 contains
47 
50  subroutine init_reduction_inter_io(io_configuration)
51  type(io_configuration_type), intent(inout) :: io_configuration
52 
53  if (.not. initialised) then
54  initialised=.true.
62  end if
63  end subroutine init_reduction_inter_io
64 
69  subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
70  type(io_configuration_type), intent(inout) :: io_configuration
71  character, dimension(:), intent(inout) :: data_buffer
72  integer, intent(in) :: inter_io_index
73 
74  call handle_process_recv_from_other_io_server(io_configuration, io_configuration%inter_io_communications(inter_io_index), &
75  io_configuration%my_io_rank, data_buffer, io_configuration%number_of_io_servers)
76  end subroutine handle_recv_data_from_io_server
77 
82  logical function check_reduction_inter_io_for_completion(io_configuration)
83  type(io_configuration_type), intent(inout) :: io_configuration
84 
87 
90  subroutine finalise_reduction_inter_io(io_configuration)
91  type(io_configuration_type), intent(inout) :: io_configuration
92 
93  type(reduction_progress_type) :: progress
94  type(iterator_type) :: iterator
95 
96  if (initialised) then
98  if (.not. c_is_empty(reduction_progresses)) then
100  do while (c_has_next(iterator))
101  progress=retrieve_reduction_progress(c_next_mapentry(iterator))
102  if (progress%async_handle /= mpi_request_null) then
103  call wait_for_mpi_request(progress%async_handle)
104  end if
105  end do
106  end if
112  initialised=.false.
113  end if
114  end subroutine finalise_reduction_inter_io
115 
125  subroutine perform_inter_io_reduction(io_configuration, field_values, field_size, reduction_field_name, reduction_op, &
126  root, timestep, completion_procedure)
127  type(io_configuration_type), intent(inout) :: io_configuration
128  real(kind=DOUBLE_PRECISION), dimension(:) :: field_values
129  integer, intent(in) :: field_size, reduction_op, root, timestep
130  character(len=*), intent(in) :: reduction_field_name
131  procedure(handle_completion) :: completion_procedure
132 
133  type(reduction_progress_type), pointer :: reduction_progress
134  logical :: collective_values_new
135 
136  call clean_progress(io_configuration%my_io_rank)
137  reduction_progress=>find_or_add_reduction_progress(timestep, reduction_op, root, reduction_field_name, completion_procedure)
138 
139  call check_thread_status(forthread_mutex_lock(reduction_progress%mutex))
140  reduction_progress%contributed_moncs=reduction_progress%contributed_moncs+1
141 
142  collective_values_new=.not. allocated(reduction_progress%values)
143  if (collective_values_new) allocate(reduction_progress%values(field_size))
144 
145  call integrate_io_server_collective_values(reduction_op, reduction_progress, field_values, field_size, collective_values_new)
146  if (reduction_progress%contributed_moncs == io_configuration%number_of_moncs) then
147  reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
148  call handle_local_moncs_completed_collective(io_configuration, reduction_progress)
149  else
150  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
151  end if
152  end subroutine perform_inter_io_reduction
153 
154  subroutine clean_progress(myrank)
155  integer, intent(in) :: myrank
156 
157  logical :: cc_dummy
158 
164  cc_dummy=check_and_clean_progress(myrank)
165  else
167  end if
168  end subroutine clean_progress
169 
174  logical function check_and_clean_progress(myrank)
175  integer, intent(in) :: myrank
176 
177  integer :: i, entries, completed, ierr, num_to_remove, have_lock
178  type(list_type) :: entries_to_remove
179  type(iterator_type) :: iterator
180  type(mapentry_type) :: mapentry
181  type(reduction_progress_type), pointer :: specific_reduction_progress
182  character(len=STRING_LENGTH) :: entry_key
183  class(*), pointer :: generic
184  logical :: destroy_lock
185 
188  if (have_lock == 0) then
191  do while (c_has_next(iterator))
192  mapentry=c_next_mapentry(iterator)
193  destroy_lock=.false.
194  specific_reduction_progress=>retrieve_reduction_progress(mapentry)
195  if (myrank /= specific_reduction_progress%root) then
196  call check_thread_status(forthread_mutex_lock(specific_reduction_progress%mutex))
197  if (specific_reduction_progress%async_handle /= mpi_request_null) then
198  call wait_for_mpi_request(specific_reduction_progress%async_handle)
199  if (completed == 1) then
200  if (allocated(specific_reduction_progress%send_buffer)) deallocate(specific_reduction_progress%send_buffer)
201  destroy_lock=.true.
202  call c_add_string(entries_to_remove, mapentry%key)
203  else
205  end if
206  end if
207  call check_thread_status(forthread_mutex_unlock(specific_reduction_progress%mutex))
208  if (destroy_lock) call check_thread_status(forthread_mutex_destroy(specific_reduction_progress%mutex))
209  end if
210  end do
212 
213  if (.not. c_is_empty(entries_to_remove)) then
215  iterator=c_get_iterator(entries_to_remove)
216  do while (c_has_next(iterator))
217  entry_key=c_next_string(iterator)
218  generic=>c_get_generic(reduction_progresses, entry_key)
219  call c_remove(reduction_progresses, entry_key)
220  deallocate(generic)
221  end do
223  end if
224  call c_free(entries_to_remove)
226  end if
227  end function check_and_clean_progress
228 
236  subroutine integrate_io_server_collective_values(reduction_op, reduction_progress, single_server_values, &
237  number_elements, collective_values_empty)
238  integer, intent(in) :: reduction_op, number_elements
239  logical, intent(in) :: collective_values_empty
240  type(reduction_progress_type), intent(inout) :: reduction_progress
241  real(kind=DOUBLE_PRECISION), dimension(:), intent(in) :: single_server_values
242 
243  integer :: k
244 
245  if (collective_values_empty) then
246  reduction_progress%values=single_server_values
247  else
248  if (reduction_op == mean .or. reduction_op == sum) then
249  reduction_progress%values=reduction_progress%values+single_server_values
250  else if (reduction_op == min .or. reduction_op == max) then
251  do k=1, number_elements
252  if (reduction_op == min) then
253  if (single_server_values(k) .lt. reduction_progress%values(k)) &
254  reduction_progress%values(k)=single_server_values(k)
255  else if (reduction_op == max) then
256  if (single_server_values(k) .gt. reduction_progress%values(k)) &
257  reduction_progress%values(k)=single_server_values(k)
258  end if
259  end do
260  end if
261  end if
263 
271  subroutine handle_local_moncs_completed_collective(io_configuration, reduction_progress)
272  type(io_configuration_type), intent(inout) :: io_configuration
273  type(reduction_progress_type), intent(inout) :: reduction_progress
274 
275  integer :: ierr, inter_io_comm_index
276 
277  if (io_configuration%my_io_rank == reduction_progress%root .and. &
278  reduction_progress%contributed_io_servers == io_configuration%number_of_io_servers) then
279  call handle_collective_completed(io_configuration, reduction_progress)
280  else
281  if (io_configuration%my_io_rank /= reduction_progress%root) then
282  inter_io_comm_index=find_inter_io_from_name(io_configuration, my_inter_io_name)
283 
284  reduction_progress%send_buffer=package_inter_io_communication_message(reduction_progress%field_name, &
285  reduction_progress%timestep, reduction_progress%values, reduction_progress%reduction_operator)
286  call lock_mpi()
287  call mpi_isend(reduction_progress%send_buffer, size(reduction_progress%send_buffer), &
288  mpi_byte, reduction_progress%root, &
289  io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
290  io_configuration%io_communicator, reduction_progress%async_handle, ierr)
291  call unlock_mpi()
292  ! Deallocate the current value as this is finished with and has been packed into the send buffer
293  if (allocated(reduction_progress%values)) deallocate(reduction_progress%values)
294  end if
295  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
296  end if
298 
303  subroutine handle_process_recv_from_other_io_server(io_configuration, inter_io_comm, myrank, data_buffer, number_io_servers)
304  type(io_configuration_type), intent(inout) :: io_configuration
305  type(io_configuration_inter_communication_description), intent(inout) :: inter_io_comm
306  character, dimension(:), intent(inout) :: data_buffer
307  integer, intent(in) :: number_io_servers, myrank
308 
309  type(reduction_progress_type), pointer :: reduction_progress
310  character(len=STRING_LENGTH) :: field_name
311  integer :: timestep, reduction_op
312  real(kind=DEFAULT_PRECISION), dimension(:), allocatable :: field_values
313  logical :: collective_values_new
314 
315  call unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, reduction_op)
316  reduction_progress=>find_or_add_reduction_progress(timestep, reduction_op, myrank, field_name)
317 
318  call check_thread_status(forthread_mutex_lock(reduction_progress%mutex))
319  collective_values_new=.not. allocated(reduction_progress%values)
320  if (collective_values_new) allocate(reduction_progress%values(size(field_values)))
321 
322  reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
323  call integrate_io_server_collective_values(reduction_op, reduction_progress, &
324  field_values, size(field_values), collective_values_new)
325  if (reduction_progress%contributed_io_servers == number_io_servers) then
326  call handle_collective_completed(io_configuration, reduction_progress)
327  deallocate(field_values)
328  return
329  end if
330  deallocate(field_values)
331  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
333 
337  subroutine handle_collective_completed(io_configuration, reduction_progress)
338  type(io_configuration_type), intent(inout) :: io_configuration
339  type(reduction_progress_type), intent(inout) :: reduction_progress
340 
341  if (reduction_progress%reduction_operator == mean) then
342  reduction_progress%values=reduction_progress%values/io_configuration%number_of_global_moncs
343  end if
344  call reduction_progress%completion_procedure(io_configuration, reduction_progress%values, &
345  reduction_progress%field_name, reduction_progress%timestep)
346  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
347  call check_thread_status(forthread_mutex_destroy(reduction_progress%mutex))
348  if (allocated(reduction_progress%values)) deallocate(reduction_progress%values)
349  call remove_reduction_progress(reduction_progress)
350  end subroutine handle_collective_completed
351 
359  function find_or_add_reduction_progress(timestep, reduction_operator, root, field_name, completion_procedure)
360  integer, intent(in) :: timestep, reduction_operator, root
361  type(reduction_progress_type), pointer :: find_or_add_reduction_progress
362  character(len=*), intent(in) :: field_name
363  procedure(handle_completion), optional :: completion_procedure
364 
365  class(*), pointer :: generic
366  type(reduction_progress_type), pointer :: new_progress
367 
368  find_or_add_reduction_progress=>find_reduction_progress(timestep, reduction_operator, field_name)
369  if (.not. associated(find_or_add_reduction_progress)) then
371  find_or_add_reduction_progress=>find_reduction_progress(timestep, reduction_operator, field_name, issue_read_lock=.false.)
372  if (.not. associated(find_or_add_reduction_progress)) then
373  allocate(new_progress)
374  call check_thread_status(forthread_mutex_init(new_progress%mutex, -1))
375  new_progress%timestep=timestep
376  new_progress%reduction_operator=reduction_operator
377  new_progress%contributed_moncs=0
378  new_progress%contributed_io_servers=0
379  new_progress%root=root
380  new_progress%async_handle=mpi_request_null
381  new_progress%field_name=field_name
382  if (present(completion_procedure)) then
383  new_progress%completion_procedure=>completion_procedure
384  else
385  new_progress%completion_procedure=>null()
386  end if
387  generic=>new_progress
388  call c_put_generic(reduction_progresses, generate_reduction_key(field_name, timestep, reduction_operator), &
389  generic, .false.)
390  find_or_add_reduction_progress=>new_progress
391  end if
393  end if
394  if (.not. associated(find_or_add_reduction_progress%completion_procedure) .and. present(completion_procedure)) then
395  find_or_add_reduction_progress%completion_procedure=>completion_procedure
396  end if
397  end function find_or_add_reduction_progress
398 
405  function find_reduction_progress(timestep, reduction_operator, field_name, issue_read_lock)
406  integer, intent(in) :: timestep, reduction_operator
407  logical, intent(in), optional :: issue_read_lock
408  type(reduction_progress_type), pointer :: find_reduction_progress
409  character(len=*), intent(in) :: field_name
410 
411  class(*), pointer :: generic
412  logical :: do_read_lock
413 
414  if (present(issue_read_lock)) then
415  do_read_lock=issue_read_lock
416  else
417  do_read_lock=.true.
418  end if
419 
421  generic=>c_get_generic(reduction_progresses, generate_reduction_key(field_name, timestep, reduction_operator))
423  if (associated(generic)) then
424  select type(generic)
425  type is (reduction_progress_type)
426  find_reduction_progress=>generic
427  end select
428  else
429  find_reduction_progress=>null()
430  end if
431  end function find_reduction_progress
432 
435  subroutine remove_reduction_progress(reduction_progress)
436  type(reduction_progress_type), intent(in) :: reduction_progress
437 
438  class(*), pointer :: generic
439  character(len=STRING_LENGTH) :: specific_key
440 
441  specific_key=generate_reduction_key(reduction_progress%field_name, reduction_progress%timestep,&
442  reduction_progress%reduction_operator)
444  generic=>c_get_generic(reduction_progresses, specific_key)
445  call c_remove(reduction_progresses, specific_key)
447  if (associated(generic)) deallocate(generic)
448  end subroutine remove_reduction_progress
449 
454  character(len=STRING_LENGTH) function generate_reduction_key(field_name, timestep, reduction_operator)
455  character(len=*), intent(in) :: field_name
456  integer, intent(in) :: timestep, reduction_operator
457 
458  generate_reduction_key=trim(field_name)//"#"//trim(conv_to_string(timestep))//"#"// trim(conv_to_string(reduction_operator))
459  end function generate_reduction_key
460 
464  function retrieve_reduction_progress(mapentry)
465  type(mapentry_type), intent(in) :: mapentry
466  type(reduction_progress_type), pointer :: retrieve_reduction_progress
467 
468  class(*), pointer :: generic
469 
470  generic=>c_get_generic(mapentry)
471 
472  if (associated(generic)) then
473  select type(generic)
474  type is (reduction_progress_type)
475  retrieve_reduction_progress=>generic
476  end select
477  else
478  retrieve_reduction_progress=>null()
479  end if
480  end function retrieve_reduction_progress
481 
486  integer function get_reduction_operator(op_string)
487  character(len=*), intent(in) :: op_string
488 
489  if (op_string .eq. "mean") then
491  else if (op_string .eq. "min") then
493  else if (op_string .eq. "max") then
495  else if (op_string .eq. "sum") then
497  else
498  call log_log(log_error, "Reduction operator '"//trim(op_string)//"' not recognised")
499  end if
500  end function get_reduction_operator
501 end module reduction_inter_io_mod
integer, volatile previous_clean_reduction_count
integer, volatile clean_progress_mutex
integer function forthread_rwlock_init(rwlock_id, attr_id)
Definition: forthread.F90:504
Gets a specific generic element out of the list, stack, queue or map with the corresponding key...
logical, volatile initialised
Returns whether a collection is empty.
integer function forthread_mutex_unlock(mutex_id)
Definition: forthread.F90:302
subroutine, public init_reduction_inter_io(io_configuration)
Initialises the reduction action.
subroutine integrate_io_server_collective_values(reduction_op, reduction_progress, single_server_values, number_elements, collective_values_empty)
Integrates the collective values from another IO server into the currently stored values...
integer, parameter my_inter_io_tag
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
integer function forthread_mutex_destroy(mutex_id)
Definition: forthread.F90:265
subroutine handle_process_recv_from_other_io_server(io_configuration, inter_io_comm, myrank, data_buffer, number_io_servers)
Handles the data received from another IO server, locates the correct reduction progress, appends the information and then checks for & deals with the situation where that reduction is completed.
subroutine, public perform_inter_io_reduction(io_configuration, field_values, field_size, reduction_field_name, reduction_op, root, timestep, completion_procedure)
Actually handles the processing for this data wrt the vertical reduction.
character function, dimension(:), allocatable, public package_inter_io_communication_message(field_name, timestep, field_values, other_int)
Packages up fields into an io binary message (allocated here) which is used for sending.
subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
Handles the receiving of data from some other IO server. This is issued call back style within a thre...
logical function, public check_reduction_inter_io_for_completion(io_configuration)
Checks this action for completion, when all are completed then the IO server can shutdown as this is ...
Logging utility.
Definition: logging.F90:2
subroutine handle_local_moncs_completed_collective(io_configuration, reduction_progress)
Handles the case where the local MONC processes have completed their collective operation for a speci...
integer, volatile reduction_count_mutex
integer function forthread_rwlock_trywrlock(lock_id)
Definition: forthread.F90:541
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17
Abstraction layer around MPI, this issues and marshals the lower level communication details...
integer function forthread_rwlock_rdlock(lock_id)
Definition: forthread.F90:514
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
Definition: collections.F90:94
integer function forthread_mutex_init(mutex_id, attr_id)
Definition: forthread.F90:274
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
subroutine, public wait_for_mpi_request(request, status)
Waits for a specific MPI request to complete, either by managing thread safety and interleaving or ju...
Converts data types to strings.
Definition: conversions.F90:36
integer function forthread_rwlock_wrlock(lock_id)
Definition: forthread.F90:532
character(len= *), parameter my_inter_io_name
integer function forthread_mutex_trylock(mutex_id)
Definition: forthread.F90:293
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
subroutine handle_collective_completed(io_configuration, reduction_progress)
Handles the situation where collective communication for a specific reduction has completed across al...
integer, parameter, public double_precision
Double precision (64 bit) kind.
Definition: datadefn.F90:14
Map data structure that holds string (length 20 maximum) key value pairs.
Definition: collections.F90:86
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Definition: threadpool.F90:5
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
Definition: threadpool.F90:229
integer function forthread_mutex_lock(mutex_id)
Definition: forthread.F90:284
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
Definition: collections.F90:7
character(len=string_length) function generate_reduction_key(field_name, timestep, reduction_operator)
Generates the lookup key that is used for the map storage of reduction progresses.
type(hashmap_type), volatile reduction_progresses
integer, parameter, public string_length
Default length of strings.
Definition: datadefn.F90:10
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
subroutine, public unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, other_int)
Unpackages some binary data into its individual fields. The field values are allocated here and the s...
Inter IO server communication specific functionality. This manages all of the communication that migh...
integer, volatile reduction_count
List data structure which implements a doubly linked list. This list will preserve its order...
Definition: collections.F90:60
logical function check_and_clean_progress(myrank)
Checks all the reduction progresses and will remove any that have completed. This is designed to be c...
integer function forthread_rwlock_destroy(rwlock_id)
Definition: forthread.F90:495
integer function, public find_inter_io_from_name(io_configuration, name)
Locates a the index of an inter IO entry from the operator name or returns 0 if none is found...
integer, volatile reduction_progress_rwlock
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
Definition: threadpool.F90:102
subroutine, public finalise_reduction_inter_io(io_configuration)
Finalises the reduction action, waiting for all outstanding requests and then freeing data...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
integer function forthread_rwlock_unlock(lock_id)
Definition: forthread.F90:550
subroutine remove_reduction_progress(reduction_progress)
Removes a specific reduction progress.
Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as...
Puts a generic key-value pair into the map.
subroutine, public register_inter_io_communication(io_configuration, message_tag, handling_procedure, name)
Registers an inter IO communication operation.
integer function, public get_reduction_operator(op_string)
Given the map of action attributes this procedure will identify the reduction operator that has been ...
Retrieves the generic value held at the specific map index or null if index > map elements...
type(reduction_progress_type) function, pointer find_reduction_progress(timestep, reduction_operator, field_name, issue_read_lock)
Locates a specific reduction progress based upon the timestep, operator and field name...
type(reduction_progress_type) function, pointer retrieve_reduction_progress(mapentry)
Helper function to retrieve the reduction progress from a mapentry.
Determines whether or not a map contains a specific key.
Adds a string to the end of the list.
subroutine clean_progress(myrank)
Gets a specific string element out of the list, stack, queue or map with the corresponding key...
Parses the XML configuration file to produce the io configuration description which contains the data...
integer, parameter perform_clean_every
type(reduction_progress_type) function, pointer find_or_add_reduction_progress(timestep, reduction_operator, root, field_name, completion_procedure)
Finds or adds a specific reduction progress based upon the timestep and reduction operator...
integer, volatile inter_io_description_mutex
Removes a specific element from the list or map.