MONC
Data Types | Functions/Subroutines | Variables
reduction_inter_io_mod Module Reference

Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as calling the MPI function as it is nondeterministic when messages will arrive and hence when one reduction on a process and a reduction on another should be called. More...

Data Types

type  reduction_progress_type
 

Functions/Subroutines

subroutine, public init_reduction_inter_io (io_configuration)
 Initialises the reduction action. More...
 
subroutine handle_recv_data_from_io_server (io_configuration, data_buffer, inter_io_index)
 Handles the receiving of data from some other IO server. This is issued call back style within a thread to handle that data. More...
 
logical function, public check_reduction_inter_io_for_completion (io_configuration)
 Checks this action for completion, when all are completed then the IO server can shutdown as this is called once all MONC processes have deregistered. More...
 
subroutine, public finalise_reduction_inter_io (io_configuration)
 Finalises the reduction action, waiting for all outstanding requests and then freeing data. More...
 
subroutine, public perform_inter_io_reduction (io_configuration, field_values, field_size, reduction_field_name, reduction_op, root, timestep, completion_procedure)
 Actually handles the processing for this data wrt the vertical reduction. More...
 
subroutine clean_progress (myrank)
 
logical function check_and_clean_progress (myrank)
 Checks all the reduction progresses and will remove any that have completed. This is designed to be called from an IO server other than 0 (the master IO server) and it checks if the outstanding async send handle has completed. Checking on the master IO server or checking any progress that is not currently sending is fine and will not impact the correctness (but obviously the progress wont be freed) More...
 
subroutine integrate_io_server_collective_values (reduction_op, reduction_progress, single_server_values, number_elements, collective_values_empty)
 Integrates the collective values from another IO server into the currently stored values. More...
 
subroutine handle_local_moncs_completed_collective (io_configuration, reduction_progress)
 Handles the case where the local MONC processes have completed their collective operation for a specific reduction and, for this IO server, it either needs to send its value to the master IO server or, if it is the master, check for completion. More...
 
subroutine handle_process_recv_from_other_io_server (io_configuration, inter_io_comm, myrank, data_buffer, number_io_servers)
 Handles the data received from another IO server, locates the correct reduction progress, appends the information and then checks for & deals with the situation where that reduction is completed. More...
 
subroutine handle_collective_completed (io_configuration, reduction_progress)
 Handles the situation where collective communication for a specific reduction has completed across all IO servers. More...
 
type(reduction_progress_type) function, pointer find_or_add_reduction_progress (timestep, reduction_operator, root, field_name, completion_procedure)
 Finds or adds a specific reduction progress based upon the timestep and reduction operator. If none can be found then a new progress is added in. With new progresses this procedure will initialise them. More...
 
type(reduction_progress_type) function, pointer find_reduction_progress (timestep, reduction_operator, field_name, issue_read_lock)
 Locates a specific reduction progress based upon the timestep, operator and field name. More...
 
subroutine remove_reduction_progress (reduction_progress)
 Removes a specific reduction progress. More...
 
character(len=string_length) function generate_reduction_key (field_name, timestep, reduction_operator)
 Generates the lookup key that is used for the map storage of reduction progresses. More...
 
type(reduction_progress_type) function, pointer retrieve_reduction_progress (mapentry)
 Helper function to retrieve the reduction progress from a mapentry. More...
 
integer function, public get_reduction_operator (op_string)
 Given the map of action attributes this procedure will identify the reduction operator that has been selected by the configuration. More...
 

Variables

integer, parameter mean =1
 
integer, parameter min =2
 
integer, parameter max =3
 
integer, parameter sum =4
 
integer, parameter my_inter_io_tag =1
 
integer, parameter perform_clean_every =200
 
character(len= *), parameter my_inter_io_name ="reductioninterio"
 
integer, volatile reduction_progress_rwlock
 
integer, volatile inter_io_description_mutex
 
integer, volatile clean_progress_mutex
 
integer, volatile reduction_count_mutex
 
integer, volatile previous_clean_reduction_count
 
integer, volatile reduction_count
 
type(hashmap_type), volatile reduction_progresses
 
logical, volatile initialised =.false.
 

Detailed Description

Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as calling the MPI function as it is nondeterministic when messages will arrive and hence when one reduction on a process and a reduction on another should be called.

Function/Subroutine Documentation

◆ check_and_clean_progress()

logical function reduction_inter_io_mod::check_and_clean_progress ( integer, intent(in)  myrank)
private

Checks all the reduction progresses and will remove any that have completed. This is designed to be called from an IO server other than 0 (the master IO server) and it checks if the outstanding async send handle has completed. Checking on the master IO server or checking any progress that is not currently sending is fine and will not impact the correctness (but obviously the progress wont be freed)

Definition at line 175 of file reduction-inter-io.F90.

175  integer, intent(in) :: myrank
176 
177  integer :: i, entries, completed, ierr, num_to_remove, have_lock
178  type(list_type) :: entries_to_remove
179  type(iterator_type) :: iterator
180  type(mapentry_type) :: mapentry
181  type(reduction_progress_type), pointer :: specific_reduction_progress
182  character(len=STRING_LENGTH) :: entry_key
183  class(*), pointer :: generic
184  logical :: destroy_lock
185 
186  check_and_clean_progress=.true.
187  have_lock=forthread_mutex_trylock(clean_progress_mutex)
188  if (have_lock == 0) then
189  call check_thread_status(forthread_rwlock_rdlock(reduction_progress_rwlock))
190  iterator=c_get_iterator(reduction_progresses)
191  do while (c_has_next(iterator))
192  mapentry=c_next_mapentry(iterator)
193  destroy_lock=.false.
194  specific_reduction_progress=>retrieve_reduction_progress(mapentry)
195  if (myrank /= specific_reduction_progress%root) then
196  call check_thread_status(forthread_mutex_lock(specific_reduction_progress%mutex))
197  if (specific_reduction_progress%async_handle /= mpi_request_null) then
198  call wait_for_mpi_request(specific_reduction_progress%async_handle)
199  if (completed == 1) then
200  if (allocated(specific_reduction_progress%send_buffer)) deallocate(specific_reduction_progress%send_buffer)
201  destroy_lock=.true.
202  call c_add_string(entries_to_remove, mapentry%key)
203  else
204  check_and_clean_progress=.false.
205  end if
206  end if
207  call check_thread_status(forthread_mutex_unlock(specific_reduction_progress%mutex))
208  if (destroy_lock) call check_thread_status(forthread_mutex_destroy(specific_reduction_progress%mutex))
209  end if
210  end do
211  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
212 
213  if (.not. c_is_empty(entries_to_remove)) then
214  call check_thread_status(forthread_rwlock_wrlock(reduction_progress_rwlock))
215  iterator=c_get_iterator(entries_to_remove)
216  do while (c_has_next(iterator))
217  entry_key=c_next_string(iterator)
218  generic=>c_get_generic(reduction_progresses, entry_key)
219  call c_remove(reduction_progresses, entry_key)
220  deallocate(generic)
221  end do
222  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
223  end if
224  call c_free(entries_to_remove)
225  call check_thread_status(forthread_mutex_unlock(clean_progress_mutex))
226  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ check_reduction_inter_io_for_completion()

logical function, public reduction_inter_io_mod::check_reduction_inter_io_for_completion ( type(io_configuration_type), intent(inout)  io_configuration)

Checks this action for completion, when all are completed then the IO server can shutdown as this is called once all MONC processes have deregistered.

Parameters
io_configurationConfiguration state of the IO server
Returns
Whether the action has completed

Definition at line 83 of file reduction-inter-io.F90.

83  type(io_configuration_type), intent(inout) :: io_configuration
84 
85  check_reduction_inter_io_for_completion=check_and_clean_progress(io_configuration%my_io_rank)
Here is the call graph for this function:

◆ clean_progress()

subroutine reduction_inter_io_mod::clean_progress ( integer, intent(in)  myrank)
private

Definition at line 155 of file reduction-inter-io.F90.

155  integer, intent(in) :: myrank
156 
157  logical :: cc_dummy
158 
159  call check_thread_status(forthread_mutex_lock(reduction_count_mutex))
160  reduction_count=reduction_count+1
161  if (previous_clean_reduction_count + perform_clean_every .lt. reduction_count) then
162  previous_clean_reduction_count=reduction_count
163  call check_thread_status(forthread_mutex_unlock(reduction_count_mutex))
164  cc_dummy=check_and_clean_progress(myrank)
165  else
166  call check_thread_status(forthread_mutex_unlock(reduction_count_mutex))
167  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ finalise_reduction_inter_io()

subroutine, public reduction_inter_io_mod::finalise_reduction_inter_io ( type(io_configuration_type), intent(inout)  io_configuration)

Finalises the reduction action, waiting for all outstanding requests and then freeing data.

Parameters
io_configurationConfiguration state of the IO server

Definition at line 91 of file reduction-inter-io.F90.

91  type(io_configuration_type), intent(inout) :: io_configuration
92 
93  type(reduction_progress_type) :: progress
94  type(iterator_type) :: iterator
95 
96  if (initialised) then
97  call check_thread_status(forthread_rwlock_rdlock(reduction_progress_rwlock))
98  if (.not. c_is_empty(reduction_progresses)) then
99  iterator=c_get_iterator(reduction_progresses)
100  do while (c_has_next(iterator))
101  progress=retrieve_reduction_progress(c_next_mapentry(iterator))
102  if (progress%async_handle /= mpi_request_null) then
103  call wait_for_mpi_request(progress%async_handle)
104  end if
105  end do
106  end if
107  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
108  call check_thread_status(forthread_rwlock_destroy(reduction_progress_rwlock))
109  call check_thread_status(forthread_mutex_destroy(inter_io_description_mutex))
110  call check_thread_status(forthread_mutex_destroy(clean_progress_mutex))
111  call check_thread_status(forthread_mutex_destroy(reduction_count_mutex))
112  initialised=.false.
113  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ find_or_add_reduction_progress()

type(reduction_progress_type) function, pointer reduction_inter_io_mod::find_or_add_reduction_progress ( integer, intent(in)  timestep,
integer, intent(in)  reduction_operator,
integer, intent(in)  root,
character(len=*), intent(in)  field_name,
procedure(handle_completion), optional  completion_procedure 
)
private

Finds or adds a specific reduction progress based upon the timestep and reduction operator. If none can be found then a new progress is added in. With new progresses this procedure will initialise them.

Parameters
timestepThe timestep to match
reduction_operatorThe reduction operator to match
field_nameThe name of the field that the reduction type represents
num_vectorsThe number of reduction vectors (items to reduce) to be stored
Returns
A reduction progress data object

Definition at line 360 of file reduction-inter-io.F90.

360  integer, intent(in) :: timestep, reduction_operator, root
361  type(reduction_progress_type), pointer :: find_or_add_reduction_progress
362  character(len=*), intent(in) :: field_name
363  procedure(handle_completion), optional :: completion_procedure
364 
365  class(*), pointer :: generic
366  type(reduction_progress_type), pointer :: new_progress
367 
368  find_or_add_reduction_progress=>find_reduction_progress(timestep, reduction_operator, field_name)
369  if (.not. associated(find_or_add_reduction_progress)) then
370  call check_thread_status(forthread_rwlock_wrlock(reduction_progress_rwlock))
371  find_or_add_reduction_progress=>find_reduction_progress(timestep, reduction_operator, field_name, issue_read_lock=.false.)
372  if (.not. associated(find_or_add_reduction_progress)) then
373  allocate(new_progress)
374  call check_thread_status(forthread_mutex_init(new_progress%mutex, -1))
375  new_progress%timestep=timestep
376  new_progress%reduction_operator=reduction_operator
377  new_progress%contributed_moncs=0
378  new_progress%contributed_io_servers=0
379  new_progress%root=root
380  new_progress%async_handle=mpi_request_null
381  new_progress%field_name=field_name
382  if (present(completion_procedure)) then
383  new_progress%completion_procedure=>completion_procedure
384  else
385  new_progress%completion_procedure=>null()
386  end if
387  generic=>new_progress
388  call c_put_generic(reduction_progresses, generate_reduction_key(field_name, timestep, reduction_operator), &
389  generic, .false.)
390  find_or_add_reduction_progress=>new_progress
391  end if
392  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
393  end if
394  if (.not. associated(find_or_add_reduction_progress%completion_procedure) .and. present(completion_procedure)) then
395  find_or_add_reduction_progress%completion_procedure=>completion_procedure
396  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ find_reduction_progress()

type(reduction_progress_type) function, pointer reduction_inter_io_mod::find_reduction_progress ( integer, intent(in)  timestep,
integer, intent(in)  reduction_operator,
character(len=*), intent(in)  field_name,
logical, intent(in), optional  issue_read_lock 
)
private

Locates a specific reduction progress based upon the timestep, operator and field name.

Parameters
timestepThe timestep to search for
reduction_operatorThe reduction operator to search for
field_nameThe field name which must match
reduction_progress_locationOptional location which is set to be the index of the matching progress item
Returns
Pointer to the reduction progress or null if none is found

Definition at line 406 of file reduction-inter-io.F90.

406  integer, intent(in) :: timestep, reduction_operator
407  logical, intent(in), optional :: issue_read_lock
408  type(reduction_progress_type), pointer :: find_reduction_progress
409  character(len=*), intent(in) :: field_name
410 
411  class(*), pointer :: generic
412  logical :: do_read_lock
413 
414  if (present(issue_read_lock)) then
415  do_read_lock=issue_read_lock
416  else
417  do_read_lock=.true.
418  end if
419 
420  if (do_read_lock) call check_thread_status(forthread_rwlock_rdlock(reduction_progress_rwlock))
421  generic=>c_get_generic(reduction_progresses, generate_reduction_key(field_name, timestep, reduction_operator))
422  if (do_read_lock) call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
423  if (associated(generic)) then
424  select type(generic)
425  type is (reduction_progress_type)
426  find_reduction_progress=>generic
427  end select
428  else
429  find_reduction_progress=>null()
430  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ generate_reduction_key()

character(len=string_length) function reduction_inter_io_mod::generate_reduction_key ( character(len=*), intent(in)  field_name,
integer, intent(in)  timestep,
integer, intent(in)  reduction_operator 
)
private

Generates the lookup key that is used for the map storage of reduction progresses.

Parameters
field_nameThe field name
timestepThe timestep
reduction_operatorThe reduction operator

Definition at line 455 of file reduction-inter-io.F90.

455  character(len=*), intent(in) :: field_name
456  integer, intent(in) :: timestep, reduction_operator
457 
458  generate_reduction_key=trim(field_name)//"#"//trim(conv_to_string(timestep))//"#"// trim(conv_to_string(reduction_operator))
Here is the caller graph for this function:

◆ get_reduction_operator()

integer function, public reduction_inter_io_mod::get_reduction_operator ( character(len=*), intent(in)  op_string)

Given the map of action attributes this procedure will identify the reduction operator that has been selected by the configuration.

Parameters
action_attributesAction attributes from the IO server configuration
Returns
The reduction operator

Definition at line 487 of file reduction-inter-io.F90.

487  character(len=*), intent(in) :: op_string
488 
489  if (op_string .eq. "mean") then
490  get_reduction_operator=mean
491  else if (op_string .eq. "min") then
492  get_reduction_operator=min
493  else if (op_string .eq. "max") then
494  get_reduction_operator=max
495  else if (op_string .eq. "sum") then
496  get_reduction_operator=sum
497  else
498  call log_log(log_error, "Reduction operator '"//trim(op_string)//"' not recognised")
499  end if
Here is the call graph for this function:

◆ handle_collective_completed()

subroutine reduction_inter_io_mod::handle_collective_completed ( type(io_configuration_type), intent(inout)  io_configuration,
type(reduction_progress_type), intent(inout)  reduction_progress 
)
private

Handles the situation where collective communication for a specific reduction has completed across all IO servers.

Parameters
reduction_progressThe reduction progress data type
number_io_serversThe total number of IO servers

Definition at line 338 of file reduction-inter-io.F90.

338  type(io_configuration_type), intent(inout) :: io_configuration
339  type(reduction_progress_type), intent(inout) :: reduction_progress
340 
341  if (reduction_progress%reduction_operator == mean) then
342  reduction_progress%values=reduction_progress%values/io_configuration%number_of_global_moncs
343  end if
344  call reduction_progress%completion_procedure(io_configuration, reduction_progress%values, &
345  reduction_progress%field_name, reduction_progress%timestep)
346  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
347  call check_thread_status(forthread_mutex_destroy(reduction_progress%mutex))
348  if (allocated(reduction_progress%values)) deallocate(reduction_progress%values)
349  call remove_reduction_progress(reduction_progress)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_local_moncs_completed_collective()

subroutine reduction_inter_io_mod::handle_local_moncs_completed_collective ( type(io_configuration_type), intent(inout)  io_configuration,
type(reduction_progress_type), intent(inout)  reduction_progress 
)
private

Handles the case where the local MONC processes have completed their collective operation for a specific reduction and, for this IO server, it either needs to send its value to the master IO server or, if it is the master, check for completion.

Parameters
io_configurationConfiguration state of the IO server
reduction_progressThe specific reduction progress data item that represents this reduction
z_sizeSize in Z
reduction_progress_locationLocation in the reduction progresses list that this single progress item resides at

Definition at line 272 of file reduction-inter-io.F90.

272  type(io_configuration_type), intent(inout) :: io_configuration
273  type(reduction_progress_type), intent(inout) :: reduction_progress
274 
275  integer :: ierr, inter_io_comm_index
276 
277  if (io_configuration%my_io_rank == reduction_progress%root .and. &
278  reduction_progress%contributed_io_servers == io_configuration%number_of_io_servers) then
279  call handle_collective_completed(io_configuration, reduction_progress)
280  else
281  if (io_configuration%my_io_rank /= reduction_progress%root) then
282  inter_io_comm_index=find_inter_io_from_name(io_configuration, my_inter_io_name)
283 
284  reduction_progress%send_buffer=package_inter_io_communication_message(reduction_progress%field_name, &
285  reduction_progress%timestep, reduction_progress%values, reduction_progress%reduction_operator)
286  call lock_mpi()
287  call mpi_isend(reduction_progress%send_buffer, size(reduction_progress%send_buffer), &
288  mpi_byte, reduction_progress%root, &
289  io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
290  io_configuration%io_communicator, reduction_progress%async_handle, ierr)
291  call unlock_mpi()
292  ! Deallocate the current value as this is finished with and has been packed into the send buffer
293  if (allocated(reduction_progress%values)) deallocate(reduction_progress%values)
294  end if
295  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
296  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_process_recv_from_other_io_server()

subroutine reduction_inter_io_mod::handle_process_recv_from_other_io_server ( type(io_configuration_type), intent(inout)  io_configuration,
type(io_configuration_inter_communication_description), intent(inout)  inter_io_comm,
integer, intent(in)  myrank,
character, dimension(:), intent(inout)  data_buffer,
integer, intent(in)  number_io_servers 
)
private

Handles the data received from another IO server, locates the correct reduction progress, appends the information and then checks for & deals with the situation where that reduction is completed.

Parameters
number_io_serversThe total number of IO servers
z_sizeNumber of levels in the vertical

Definition at line 304 of file reduction-inter-io.F90.

304  type(io_configuration_type), intent(inout) :: io_configuration
305  type(io_configuration_inter_communication_description), intent(inout) :: inter_io_comm
306  character, dimension(:), intent(inout) :: data_buffer
307  integer, intent(in) :: number_io_servers, myrank
308 
309  type(reduction_progress_type), pointer :: reduction_progress
310  character(len=STRING_LENGTH) :: field_name
311  integer :: timestep, reduction_op
312  real(kind=DEFAULT_PRECISION), dimension(:), allocatable :: field_values
313  logical :: collective_values_new
314 
315  call unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, reduction_op)
316  reduction_progress=>find_or_add_reduction_progress(timestep, reduction_op, myrank, field_name)
317 
318  call check_thread_status(forthread_mutex_lock(reduction_progress%mutex))
319  collective_values_new=.not. allocated(reduction_progress%values)
320  if (collective_values_new) allocate(reduction_progress%values(size(field_values)))
321 
322  reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
323  call integrate_io_server_collective_values(reduction_op, reduction_progress, &
324  field_values, size(field_values), collective_values_new)
325  if (reduction_progress%contributed_io_servers == number_io_servers) then
326  call handle_collective_completed(io_configuration, reduction_progress)
327  deallocate(field_values)
328  return
329  end if
330  deallocate(field_values)
331  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ handle_recv_data_from_io_server()

subroutine reduction_inter_io_mod::handle_recv_data_from_io_server ( type(io_configuration_type), intent(inout)  io_configuration,
character, dimension(:), intent(inout)  data_buffer,
integer, intent(in)  inter_io_index 
)
private

Handles the receiving of data from some other IO server. This is issued call back style within a thread to handle that data.

Parameters
io_configurationConfiguration state of the IO server
inter_io_indexIndex of the inter IO communication description

Definition at line 70 of file reduction-inter-io.F90.

70  type(io_configuration_type), intent(inout) :: io_configuration
71  character, dimension(:), intent(inout) :: data_buffer
72  integer, intent(in) :: inter_io_index
73 
74  call handle_process_recv_from_other_io_server(io_configuration, io_configuration%inter_io_communications(inter_io_index), &
75  io_configuration%my_io_rank, data_buffer, io_configuration%number_of_io_servers)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ init_reduction_inter_io()

subroutine, public reduction_inter_io_mod::init_reduction_inter_io ( type(io_configuration_type), intent(inout)  io_configuration)

Initialises the reduction action.

Parameters
io_configurationThe IO server configuration

Definition at line 51 of file reduction-inter-io.F90.

51  type(io_configuration_type), intent(inout) :: io_configuration
52 
53  if (.not. initialised) then
54  initialised=.true.
55  previous_clean_reduction_count=0
56  reduction_count=0
57  call check_thread_status(forthread_rwlock_init(reduction_progress_rwlock, -1))
58  call check_thread_status(forthread_mutex_init(inter_io_description_mutex, -1))
59  call check_thread_status(forthread_mutex_init(clean_progress_mutex, -1))
60  call check_thread_status(forthread_mutex_init(reduction_count_mutex, -1))
61  call register_inter_io_communication(io_configuration, my_inter_io_tag, handle_recv_data_from_io_server, my_inter_io_name)
62  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ integrate_io_server_collective_values()

subroutine reduction_inter_io_mod::integrate_io_server_collective_values ( integer, intent(in)  reduction_op,
type(reduction_progress_type), intent(inout)  reduction_progress,
real(kind=double_precision), dimension(:), intent(in)  single_server_values,
integer, intent(in)  number_elements,
logical, intent(in)  collective_values_empty 
)
private

Integrates the collective values from another IO server into the currently stored values.

Parameters
reduction_opThe reduction operator to perform
reduction_progressThe progress data type which is updated
single_server_valuesThe values from the IO server which need to be integrated
dim_one_sizeSize in first dimension
target_indexThe index where we are putting the values into the current value array of reduction progress
collective_values_emptyWhether the collective values is empty

Definition at line 238 of file reduction-inter-io.F90.

238  integer, intent(in) :: reduction_op, number_elements
239  logical, intent(in) :: collective_values_empty
240  type(reduction_progress_type), intent(inout) :: reduction_progress
241  real(kind=DOUBLE_PRECISION), dimension(:), intent(in) :: single_server_values
242 
243  integer :: k
244 
245  if (collective_values_empty) then
246  reduction_progress%values=single_server_values
247  else
248  if (reduction_op == mean .or. reduction_op == sum) then
249  reduction_progress%values=reduction_progress%values+single_server_values
250  else if (reduction_op == min .or. reduction_op == max) then
251  do k=1, number_elements
252  if (reduction_op == min) then
253  if (single_server_values(k) .lt. reduction_progress%values(k)) &
254  reduction_progress%values(k)=single_server_values(k)
255  else if (reduction_op == max) then
256  if (single_server_values(k) .gt. reduction_progress%values(k)) &
257  reduction_progress%values(k)=single_server_values(k)
258  end if
259  end do
260  end if
261  end if
Here is the caller graph for this function:

◆ perform_inter_io_reduction()

subroutine, public reduction_inter_io_mod::perform_inter_io_reduction ( type(io_configuration_type), intent(inout)  io_configuration,
real(kind=double_precision), dimension(:)  field_values,
integer, intent(in)  field_size,
character(len=*), intent(in)  reduction_field_name,
integer, intent(in)  reduction_op,
integer, intent(in)  root,
integer, intent(in)  timestep,
procedure(handle_completion completion_procedure 
)

Actually handles the processing for this data wrt the vertical reduction.

Parameters
io_configurationConfiguration of the IO server
field_valuesThe values to communicate
field_sizeNumber of elements to communicate
reduction_field_nameField name that the reduction will be performed over
reduction_opThe reduction operator to use
rootThe root IO server process
timestepThe timestep this is issued at
completion_procedureCallback completion procedure

Definition at line 127 of file reduction-inter-io.F90.

127  type(io_configuration_type), intent(inout) :: io_configuration
128  real(kind=DOUBLE_PRECISION), dimension(:) :: field_values
129  integer, intent(in) :: field_size, reduction_op, root, timestep
130  character(len=*), intent(in) :: reduction_field_name
131  procedure(handle_completion) :: completion_procedure
132 
133  type(reduction_progress_type), pointer :: reduction_progress
134  logical :: collective_values_new
135 
136  call clean_progress(io_configuration%my_io_rank)
137  reduction_progress=>find_or_add_reduction_progress(timestep, reduction_op, root, reduction_field_name, completion_procedure)
138 
139  call check_thread_status(forthread_mutex_lock(reduction_progress%mutex))
140  reduction_progress%contributed_moncs=reduction_progress%contributed_moncs+1
141 
142  collective_values_new=.not. allocated(reduction_progress%values)
143  if (collective_values_new) allocate(reduction_progress%values(field_size))
144 
145  call integrate_io_server_collective_values(reduction_op, reduction_progress, field_values, field_size, collective_values_new)
146  if (reduction_progress%contributed_moncs == io_configuration%number_of_moncs) then
147  reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
148  call handle_local_moncs_completed_collective(io_configuration, reduction_progress)
149  else
150  call check_thread_status(forthread_mutex_unlock(reduction_progress%mutex))
151  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ remove_reduction_progress()

subroutine reduction_inter_io_mod::remove_reduction_progress ( type(reduction_progress_type), intent(in)  reduction_progress)
private

Removes a specific reduction progress.

Parameters
reduction_progressThe reduction progress to remove from the list

Definition at line 436 of file reduction-inter-io.F90.

436  type(reduction_progress_type), intent(in) :: reduction_progress
437 
438  class(*), pointer :: generic
439  character(len=STRING_LENGTH) :: specific_key
440 
441  specific_key=generate_reduction_key(reduction_progress%field_name, reduction_progress%timestep,&
442  reduction_progress%reduction_operator)
443  call check_thread_status(forthread_rwlock_wrlock(reduction_progress_rwlock))
444  generic=>c_get_generic(reduction_progresses, specific_key)
445  call c_remove(reduction_progresses, specific_key)
446  call check_thread_status(forthread_rwlock_unlock(reduction_progress_rwlock))
447  if (associated(generic)) deallocate(generic)
Here is the call graph for this function:
Here is the caller graph for this function:

◆ retrieve_reduction_progress()

type(reduction_progress_type) function, pointer reduction_inter_io_mod::retrieve_reduction_progress ( type(mapentry_type), intent(in)  mapentry)
private

Helper function to retrieve the reduction progress from a mapentry.

Parameters
mapentryThe map entry to retrieve from
Returns
The progress data object in the map entry or null if none is found

Definition at line 465 of file reduction-inter-io.F90.

465  type(mapentry_type), intent(in) :: mapentry
466  type(reduction_progress_type), pointer :: retrieve_reduction_progress
467 
468  class(*), pointer :: generic
469 
470  generic=>c_get_generic(mapentry)
471 
472  if (associated(generic)) then
473  select type(generic)
474  type is (reduction_progress_type)
475  retrieve_reduction_progress=>generic
476  end select
477  else
478  retrieve_reduction_progress=>null()
479  end if
Here is the caller graph for this function:

Variable Documentation

◆ clean_progress_mutex

integer, volatile reduction_inter_io_mod::clean_progress_mutex
private

Definition at line 39 of file reduction-inter-io.F90.

◆ initialised

logical, volatile reduction_inter_io_mod::initialised =.false.
private

Definition at line 42 of file reduction-inter-io.F90.

42  logical, volatile :: initialised=.false.

◆ inter_io_description_mutex

integer, volatile reduction_inter_io_mod::inter_io_description_mutex
private

Definition at line 39 of file reduction-inter-io.F90.

◆ max

integer, parameter reduction_inter_io_mod::max =3
private

Definition at line 27 of file reduction-inter-io.F90.

◆ mean

integer, parameter reduction_inter_io_mod::mean =1
private

Definition at line 27 of file reduction-inter-io.F90.

27  integer, parameter :: mean=1, min=2, max=3, sum=4

◆ min

integer, parameter reduction_inter_io_mod::min =2
private

Definition at line 27 of file reduction-inter-io.F90.

◆ my_inter_io_name

character(len=*), parameter reduction_inter_io_mod::my_inter_io_name ="reductioninterio"
private

Definition at line 29 of file reduction-inter-io.F90.

29  character(len=*), parameter :: my_inter_io_name="reductioninterio"

◆ my_inter_io_tag

integer, parameter reduction_inter_io_mod::my_inter_io_tag =1
private

Definition at line 28 of file reduction-inter-io.F90.

28  integer, parameter :: my_inter_io_tag=1, perform_clean_every=200

◆ perform_clean_every

integer, parameter reduction_inter_io_mod::perform_clean_every =200
private

Definition at line 28 of file reduction-inter-io.F90.

◆ previous_clean_reduction_count

integer, volatile reduction_inter_io_mod::previous_clean_reduction_count
private

Definition at line 39 of file reduction-inter-io.F90.

◆ reduction_count

integer, volatile reduction_inter_io_mod::reduction_count
private

Definition at line 39 of file reduction-inter-io.F90.

◆ reduction_count_mutex

integer, volatile reduction_inter_io_mod::reduction_count_mutex
private

Definition at line 39 of file reduction-inter-io.F90.

◆ reduction_progress_rwlock

integer, volatile reduction_inter_io_mod::reduction_progress_rwlock
private

Definition at line 39 of file reduction-inter-io.F90.

39  integer, volatile :: reduction_progress_rwlock, inter_io_description_mutex, clean_progress_mutex, &
40  reduction_count_mutex, previous_clean_reduction_count, reduction_count

◆ reduction_progresses

type(hashmap_type), volatile reduction_inter_io_mod::reduction_progresses
private

Definition at line 41 of file reduction-inter-io.F90.

41  type(hashmap_type), volatile :: reduction_progresses

◆ sum

integer, parameter reduction_inter_io_mod::sum =4
private

Definition at line 27 of file reduction-inter-io.F90.