17  use mpi, only : mpi_double_precision, mpi_int, mpi_any_source, mpi_request_null, mpi_statuses_ignore, mpi_character, mpi_byte
19  implicit none
21 #ifndef TEST_MODE
22  private
23 #endif
25  integer, parameter :: my_inter_io_tag=2, perform_clean_every=200
26  character(len=*), parameter :: my_inter_io_name="bcastinterio"
30  logical :: handled
31  integer :: mutex
32  integer, dimension(:), allocatable :: send_requests
33  character, dimension(:), allocatable :: send_buffer
34  real(kind=DEFAULT_PRECISION), dimension(:), allocatable :: cached_values
35  procedure(handle_completion), pointer, nopass :: completion_procedure
36  end type inter_io_broadcast
40  integer :: timestep
41  character(len=STRING_LENGTH) :: field_name
42  real(DEFAULT_PRECISION), dimension(:), allocatable :: values
43  procedure(handle_completion), pointer, nopass :: completion_procedure
50  logical, volatile :: initialised=.false.
53 contains
57  subroutine init_broadcast_inter_io(io_configuration)
58  type(io_configuration_type), intent(inout), target :: io_configuration
60  if (.not. initialised) then
61  stored_io_configuration=>io_configuration
62  initialised=.true.
63  bcast_count=0
72  end if
73  end subroutine init_broadcast_inter_io
76  subroutine finalise_broadcast_inter_io()
77  type(inter_io_broadcast), pointer :: broadcast_item
78  type(iterator_type) :: iterator
80  if (initialised) then
82  if (.not. c_is_empty(broadcast_statuses)) then
84  do while (c_has_next(iterator))
85  broadcast_item=>retrieve_broadcast_item(c_next_mapentry(iterator))
86  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
87  if (allocated(broadcast_item%send_requests)) then
88  call waitall_for_mpi_requests(broadcast_item%send_requests, size(broadcast_item%send_requests))
89  deallocate(broadcast_item%send_requests)
90  if (allocated(broadcast_item%send_buffer)) deallocate(broadcast_item%send_buffer)
91  end if
92  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
93  call check_thread_status(forthread_mutex_destroy(broadcast_item%mutex))
94  end do
95  end if
102  initialised=.false.
103  end if
104  end subroutine finalise_broadcast_inter_io
109  logical function check_broadcast_inter_io_for_completion(io_configuration)
110  type(io_configuration_type), intent(inout) :: io_configuration
112  type(inter_io_broadcast), pointer :: broadcast_item
113  type(iterator_type) :: iterator
117  if (.not. c_is_empty(broadcast_statuses)) then
119  do while (c_has_next(iterator))
120  broadcast_item=>retrieve_broadcast_item(c_next_mapentry(iterator))
121  if (.not. broadcast_item%handled) then
123  exit
124  end if
125  end do
126  end if
136  subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
137  type(io_configuration_type), intent(inout) :: io_configuration
138  character, dimension(:), intent(inout) :: data_buffer
139  integer, intent(in) :: inter_io_index
141  type(inter_io_broadcast), pointer :: broadcast_item
142  real(kind=DEFAULT_PRECISION), dimension(:), allocatable :: data_values
143  character(len=STRING_LENGTH) :: field_name
144  integer :: timestep
146  call unpackage_inter_io_communication_message(data_buffer, field_name, timestep, data_values)
148  broadcast_item=>find_or_add_broadcast_item(field_name, timestep)
150  if (associated(broadcast_item%completion_procedure)) then
151  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
152  broadcast_item%handled=.true.
153  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
154  call issue_thread_call_to_completion(field_name, timestep, data_values, broadcast_item%completion_procedure)
155  else
156  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
157  allocate(broadcast_item%cached_values(size(data_values)), source=data_values)
158  broadcast_item%cached_values=data_values
159  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
160  end if
161  if (allocated(data_values)) deallocate(data_values)
162  end subroutine handle_recv_data_from_io_server
174  subroutine perform_inter_io_broadcast(io_configuration, field_values, field_size, field_name, root, &
175  timestep, completion_procedure)
176  type(io_configuration_type), intent(inout) :: io_configuration
177  real(kind=DOUBLE_PRECISION), dimension(:) :: field_values
178  integer, intent(in) :: field_size, root, timestep
179  character(len=*), intent(in) :: field_name
180  procedure(handle_completion) :: completion_procedure
182  type(inter_io_broadcast), pointer :: broadcast_item
183  integer :: inter_io_comm_index, i, ierr
186  inter_io_comm_index=find_inter_io_from_name(io_configuration, my_inter_io_name)
187  broadcast_item=>find_or_add_broadcast_item(field_name, timestep, completion_procedure)
189  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
190  if (io_configuration%my_io_rank == root .and. .not. broadcast_item%handled) then
191  broadcast_item%handled=.true.
193  allocate(broadcast_item%send_requests(io_configuration%number_of_io_servers))
194  broadcast_item%send_buffer=package_inter_io_communication_message(field_name, timestep, field_values)
196  do i=0, io_configuration%number_of_io_servers-1
197  if (i .ne. io_configuration%my_io_rank) then
198  call lock_mpi()
199  call mpi_isend(broadcast_item%send_buffer, size(broadcast_item%send_buffer), mpi_byte, i, &
200  io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
201  io_configuration%io_communicator, broadcast_item%send_requests(i+1), ierr)
202  call unlock_mpi()
203  else
204  broadcast_item%send_requests(i+1)=mpi_request_null
205  end if
206  end do
207  ! Still call the completion procedure on the root
208  call issue_thread_call_to_completion(field_name, timestep, field_values, completion_procedure)
209  else
210  if (allocated(broadcast_item%cached_values) .and. .not. broadcast_item%handled) then
211  broadcast_item%handled=.true.
212  call issue_thread_call_to_completion(field_name, timestep, broadcast_item%cached_values, completion_procedure)
213  if (allocated(broadcast_item%cached_values)) deallocate(broadcast_item%cached_values)
214  end if
215  end if
216  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
217  end subroutine perform_inter_io_broadcast
225  subroutine issue_thread_call_to_completion(field_name, timestep, values, completion_procedure)
226  integer, intent(in) :: timestep
227  character(len=*), intent(in) :: field_name
228  real(DEFAULT_PRECISION), dimension(:), intent(in) :: values
229  procedure(handle_completion) :: completion_procedure
231  type(threaded_callback_parameters_type), pointer :: threaded_callback_state
232  class(*), pointer :: generic
234  allocate(threaded_callback_state)
236  threaded_callback_state%field_name=field_name
237  threaded_callback_state%timestep=timestep
238  allocate(threaded_callback_state%values(size(values)), source=values)
239  threaded_callback_state%completion_procedure=>completion_procedure
242  generic=>threaded_callback_state
247  call threadpool_start_thread(thread_call_to_completion, (/ thread_callback_params_id-1 /))
248  end subroutine issue_thread_call_to_completion
253  subroutine thread_call_to_completion(arguments, data_buffer)
254  integer, dimension(:), intent(in) :: arguments
255  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
257  class(*), pointer :: generic
258  type(threaded_callback_parameters_type), pointer :: threaded_callback_state
261  generic=>c_get_generic(thread_callback_params, trim(conv_to_string(arguments(1))))
262  select type(generic)
264  threaded_callback_state=>generic
265  call c_remove(thread_callback_params, trim(conv_to_string(arguments(1))))
266  end select
269  if (associated(threaded_callback_state)) then
270  call threaded_callback_state%completion_procedure(stored_io_configuration, threaded_callback_state%values, &
271  threaded_callback_state%field_name, threaded_callback_state%timestep)
272  deallocate(threaded_callback_state%values)
273  deallocate(threaded_callback_state)
274  end if
275  end subroutine thread_call_to_completion
285  else
287  end if
288  end subroutine clean_broadcast_progress_if_needed
291  subroutine clean_broadcast_progress()
292  type(inter_io_broadcast), pointer :: specific_broadcast_item_at_index
293  integer :: completion_flag, ierr, num_to_remove, have_lock
294  character(len=STRING_LENGTH) :: entry_key
295  type(list_type) :: entries_to_remove
296  logical :: destroy_lock
297  type(iterator_type) :: iterator
298  type(mapentry_type) :: mapentry
299  class(*), pointer :: generic
302  if (have_lock==0) then
305  do while (c_has_next(iterator))
306  destroy_lock=.false.
307  mapentry=c_next_mapentry(iterator)
308  specific_broadcast_item_at_index=>retrieve_broadcast_item(mapentry)
309  call check_thread_status(forthread_mutex_lock(specific_broadcast_item_at_index%mutex))
310  if (allocated(specific_broadcast_item_at_index%send_requests)) then
311  call lock_mpi()
312  call mpi_testall(size(specific_broadcast_item_at_index%send_requests), specific_broadcast_item_at_index%send_requests, &
313  completion_flag, mpi_statuses_ignore, ierr)
314  call unlock_mpi()
315  if (completion_flag == 1) then
316  deallocate(specific_broadcast_item_at_index%send_requests)
317  if (allocated(specific_broadcast_item_at_index%send_buffer)) deallocate(specific_broadcast_item_at_index%send_buffer)
318  call c_add_string(entries_to_remove, mapentry%key)
319  destroy_lock=.true.
320  end if
321  else
322  if (specific_broadcast_item_at_index%handled) then
323  if (allocated(specific_broadcast_item_at_index%cached_values)) then
324  deallocate(specific_broadcast_item_at_index%cached_values)
325  end if
326  call c_add_string(entries_to_remove, mapentry%key)
327  destroy_lock=.true.
328  end if
329  end if
330  call check_thread_status(forthread_mutex_unlock(specific_broadcast_item_at_index%mutex))
331  if (destroy_lock) call check_thread_status(forthread_mutex_destroy(specific_broadcast_item_at_index%mutex))
332  end do
335  if (.not. c_is_empty(entries_to_remove)) then
337  iterator=c_get_iterator(entries_to_remove)
338  do while (c_has_next(iterator))
339  entry_key=c_next_string(iterator)
340  generic=>c_get_generic(broadcast_statuses, entry_key)
341  call c_remove(broadcast_statuses, entry_key)
342  deallocate(generic)
343  end do
345  end if
346  call c_free(entries_to_remove)
348  end if
349  end subroutine clean_broadcast_progress
356  function find_or_add_broadcast_item(field_name, timestep, completion_procedure)
357  character(len=*), intent(in) :: field_name
358  integer, intent(in) :: timestep
359  procedure(handle_completion), optional :: completion_procedure
360  type(inter_io_broadcast), pointer :: find_or_add_broadcast_item
362  class(*), pointer :: generic
364  find_or_add_broadcast_item=>find_broadcast_item(field_name, timestep, .true.)
365  if (.not. associated(find_or_add_broadcast_item)) then
367  find_or_add_broadcast_item=>find_broadcast_item(field_name, timestep, .false.)
368  if (.not. associated(find_or_add_broadcast_item)) then
369  allocate(find_or_add_broadcast_item)
370  if (present(completion_procedure)) then
371  find_or_add_broadcast_item%completion_procedure=>completion_procedure
372  else
373  find_or_add_broadcast_item%completion_procedure=>null()
374  end if
375  find_or_add_broadcast_item%handled=.false.
376  call check_thread_status(forthread_mutex_init(find_or_add_broadcast_item%mutex, -1))
377  generic=>find_or_add_broadcast_item
378  call c_put_generic(broadcast_statuses, trim(field_name)//"#"//conv_to_string(timestep), generic, .false.)
379  end if
381  end if
382  end function find_or_add_broadcast_item
389  function find_broadcast_item(field_name, timestep, do_read_lock)
390  character(len=*), intent(in) :: field_name
391  integer, intent(in) :: timestep
392  logical, intent(in) :: do_read_lock
393  type(inter_io_broadcast), pointer :: find_broadcast_item
395  class(*), pointer :: generic
398  generic=>c_get_generic(broadcast_statuses, trim(field_name)//"#"//conv_to_string(timestep))
401  if (associated(generic)) then
402  select type(generic)
403  type is (inter_io_broadcast)
404  find_broadcast_item=>generic
405  end select
406  else
407  find_broadcast_item=>null()
408  end if
409  end function find_broadcast_item
414  function retrieve_broadcast_item(mapentry)
415  type(mapentry_type), intent(in) :: mapentry
416  type(inter_io_broadcast), pointer :: retrieve_broadcast_item
418  class(*), pointer :: generic
420  generic=>c_get_generic(mapentry)
422  if (associated(generic)) then
423  select type(generic)
424  type is (inter_io_broadcast)
425  retrieve_broadcast_item=>generic
426  end select
427  else
428  retrieve_broadcast_item=>null()
429  end if
430  end function retrieve_broadcast_item
431 end module broadcast_inter_io_mod
