MONC
broadcast-inter-io.F90
Go to the documentation of this file.
1 
17  use mpi, only : mpi_double_precision, mpi_int, mpi_any_source, mpi_request_null, mpi_statuses_ignore, mpi_character, mpi_byte
19  implicit none
20 
21 #ifndef TEST_MODE
22  private
23 #endif
24 
25  integer, parameter :: my_inter_io_tag=2, perform_clean_every=200
26  character(len=*), parameter :: my_inter_io_name="bcastinterio"
27 
28 
30  logical :: handled
31  integer :: mutex
32  integer, dimension(:), allocatable :: send_requests
33  character, dimension(:), allocatable :: send_buffer
34  real(kind=DEFAULT_PRECISION), dimension(:), allocatable :: cached_values
35  procedure(handle_completion), pointer, nopass :: completion_procedure
36  end type inter_io_broadcast
37 
38 
40  integer :: timestep
41  character(len=STRING_LENGTH) :: field_name
42  real(DEFAULT_PRECISION), dimension(:), allocatable :: values
43  procedure(handle_completion), pointer, nopass :: completion_procedure
45 
50  logical, volatile :: initialised=.false.
51 
53 contains
54 
57  subroutine init_broadcast_inter_io(io_configuration)
58  type(io_configuration_type), intent(inout), target :: io_configuration
59 
60  if (.not. initialised) then
61  stored_io_configuration=>io_configuration
62  initialised=.true.
63  bcast_count=0
72  end if
73  end subroutine init_broadcast_inter_io
74 
76  subroutine finalise_broadcast_inter_io()
77  type(inter_io_broadcast), pointer :: broadcast_item
78  type(iterator_type) :: iterator
79 
80  if (initialised) then
82  if (.not. c_is_empty(broadcast_statuses)) then
84  do while (c_has_next(iterator))
85  broadcast_item=>retrieve_broadcast_item(c_next_mapentry(iterator))
86  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
87  if (allocated(broadcast_item%send_requests)) then
88  call waitall_for_mpi_requests(broadcast_item%send_requests, size(broadcast_item%send_requests))
89  deallocate(broadcast_item%send_requests)
90  if (allocated(broadcast_item%send_buffer)) deallocate(broadcast_item%send_buffer)
91  end if
92  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
93  call check_thread_status(forthread_mutex_destroy(broadcast_item%mutex))
94  end do
95  end if
102  initialised=.false.
103  end if
104  end subroutine finalise_broadcast_inter_io
105 
109  logical function check_broadcast_inter_io_for_completion(io_configuration)
110  type(io_configuration_type), intent(inout) :: io_configuration
111 
112  type(inter_io_broadcast), pointer :: broadcast_item
113  type(iterator_type) :: iterator
114 
117  if (.not. c_is_empty(broadcast_statuses)) then
119  do while (c_has_next(iterator))
120  broadcast_item=>retrieve_broadcast_item(c_next_mapentry(iterator))
121  if (.not. broadcast_item%handled) then
123  exit
124  end if
125  end do
126  end if
129 
136  subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
137  type(io_configuration_type), intent(inout) :: io_configuration
138  character, dimension(:), intent(inout) :: data_buffer
139  integer, intent(in) :: inter_io_index
140 
141  type(inter_io_broadcast), pointer :: broadcast_item
142  real(kind=DEFAULT_PRECISION), dimension(:), allocatable :: data_values
143  character(len=STRING_LENGTH) :: field_name
144  integer :: timestep
145 
146  call unpackage_inter_io_communication_message(data_buffer, field_name, timestep, data_values)
147 
148  broadcast_item=>find_or_add_broadcast_item(field_name, timestep)
149 
150  if (associated(broadcast_item%completion_procedure)) then
151  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
152  broadcast_item%handled=.true.
153  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
154  call issue_thread_call_to_completion(field_name, timestep, data_values, broadcast_item%completion_procedure)
155  else
156  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
157  allocate(broadcast_item%cached_values(size(data_values)), source=data_values)
158  broadcast_item%cached_values=data_values
159  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
160  end if
161  if (allocated(data_values)) deallocate(data_values)
162  end subroutine handle_recv_data_from_io_server
163 
174  subroutine perform_inter_io_broadcast(io_configuration, field_values, field_size, field_name, root, &
175  timestep, completion_procedure)
176  type(io_configuration_type), intent(inout) :: io_configuration
177  real(kind=DOUBLE_PRECISION), dimension(:) :: field_values
178  integer, intent(in) :: field_size, root, timestep
179  character(len=*), intent(in) :: field_name
180  procedure(handle_completion) :: completion_procedure
181 
182  type(inter_io_broadcast), pointer :: broadcast_item
183  integer :: inter_io_comm_index, i, ierr
184 
186  inter_io_comm_index=find_inter_io_from_name(io_configuration, my_inter_io_name)
187  broadcast_item=>find_or_add_broadcast_item(field_name, timestep, completion_procedure)
188 
189  call check_thread_status(forthread_mutex_lock(broadcast_item%mutex))
190  if (io_configuration%my_io_rank == root .and. .not. broadcast_item%handled) then
191  broadcast_item%handled=.true.
192 
193  allocate(broadcast_item%send_requests(io_configuration%number_of_io_servers))
194  broadcast_item%send_buffer=package_inter_io_communication_message(field_name, timestep, field_values)
195 
196  do i=0, io_configuration%number_of_io_servers-1
197  if (i .ne. io_configuration%my_io_rank) then
198  call lock_mpi()
199  call mpi_isend(broadcast_item%send_buffer, size(broadcast_item%send_buffer), mpi_byte, i, &
200  io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
201  io_configuration%io_communicator, broadcast_item%send_requests(i+1), ierr)
202  call unlock_mpi()
203  else
204  broadcast_item%send_requests(i+1)=mpi_request_null
205  end if
206  end do
207  ! Still call the completion procedure on the root
208  call issue_thread_call_to_completion(field_name, timestep, field_values, completion_procedure)
209  else
210  if (allocated(broadcast_item%cached_values) .and. .not. broadcast_item%handled) then
211  broadcast_item%handled=.true.
212  call issue_thread_call_to_completion(field_name, timestep, broadcast_item%cached_values, completion_procedure)
213  if (allocated(broadcast_item%cached_values)) deallocate(broadcast_item%cached_values)
214  end if
215  end if
216  call check_thread_status(forthread_mutex_unlock(broadcast_item%mutex))
217  end subroutine perform_inter_io_broadcast
218 
225  subroutine issue_thread_call_to_completion(field_name, timestep, values, completion_procedure)
226  integer, intent(in) :: timestep
227  character(len=*), intent(in) :: field_name
228  real(DEFAULT_PRECISION), dimension(:), intent(in) :: values
229  procedure(handle_completion) :: completion_procedure
230 
231  type(threaded_callback_parameters_type), pointer :: threaded_callback_state
232  class(*), pointer :: generic
233 
234  allocate(threaded_callback_state)
235 
236  threaded_callback_state%field_name=field_name
237  threaded_callback_state%timestep=timestep
238  allocate(threaded_callback_state%values(size(values)), source=values)
239  threaded_callback_state%completion_procedure=>completion_procedure
240 
242  generic=>threaded_callback_state
246 
247  call threadpool_start_thread(thread_call_to_completion, (/ thread_callback_params_id-1 /))
248  end subroutine issue_thread_call_to_completion
249 
253  subroutine thread_call_to_completion(arguments, data_buffer)
254  integer, dimension(:), intent(in) :: arguments
255  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
256 
257  class(*), pointer :: generic
258  type(threaded_callback_parameters_type), pointer :: threaded_callback_state
259 
261  generic=>c_get_generic(thread_callback_params, trim(conv_to_string(arguments(1))))
262  select type(generic)
264  threaded_callback_state=>generic
265  call c_remove(thread_callback_params, trim(conv_to_string(arguments(1))))
266  end select
268 
269  if (associated(threaded_callback_state)) then
270  call threaded_callback_state%completion_procedure(stored_io_configuration, threaded_callback_state%values, &
271  threaded_callback_state%field_name, threaded_callback_state%timestep)
272  deallocate(threaded_callback_state%values)
273  deallocate(threaded_callback_state)
274  end if
275  end subroutine thread_call_to_completion
276 
285  else
287  end if
288  end subroutine clean_broadcast_progress_if_needed
289 
291  subroutine clean_broadcast_progress()
292  type(inter_io_broadcast), pointer :: specific_broadcast_item_at_index
293  integer :: completion_flag, ierr, num_to_remove, have_lock
294  character(len=STRING_LENGTH) :: entry_key
295  type(list_type) :: entries_to_remove
296  logical :: destroy_lock
297  type(iterator_type) :: iterator
298  type(mapentry_type) :: mapentry
299  class(*), pointer :: generic
300 
302  if (have_lock==0) then
305  do while (c_has_next(iterator))
306  destroy_lock=.false.
307  mapentry=c_next_mapentry(iterator)
308  specific_broadcast_item_at_index=>retrieve_broadcast_item(mapentry)
309  call check_thread_status(forthread_mutex_lock(specific_broadcast_item_at_index%mutex))
310  if (allocated(specific_broadcast_item_at_index%send_requests)) then
311  call lock_mpi()
312  call mpi_testall(size(specific_broadcast_item_at_index%send_requests), specific_broadcast_item_at_index%send_requests, &
313  completion_flag, mpi_statuses_ignore, ierr)
314  call unlock_mpi()
315  if (completion_flag == 1) then
316  deallocate(specific_broadcast_item_at_index%send_requests)
317  if (allocated(specific_broadcast_item_at_index%send_buffer)) deallocate(specific_broadcast_item_at_index%send_buffer)
318  call c_add_string(entries_to_remove, mapentry%key)
319  destroy_lock=.true.
320  end if
321  else
322  if (specific_broadcast_item_at_index%handled) then
323  if (allocated(specific_broadcast_item_at_index%cached_values)) then
324  deallocate(specific_broadcast_item_at_index%cached_values)
325  end if
326  call c_add_string(entries_to_remove, mapentry%key)
327  destroy_lock=.true.
328  end if
329  end if
330  call check_thread_status(forthread_mutex_unlock(specific_broadcast_item_at_index%mutex))
331  if (destroy_lock) call check_thread_status(forthread_mutex_destroy(specific_broadcast_item_at_index%mutex))
332  end do
334 
335  if (.not. c_is_empty(entries_to_remove)) then
337  iterator=c_get_iterator(entries_to_remove)
338  do while (c_has_next(iterator))
339  entry_key=c_next_string(iterator)
340  generic=>c_get_generic(broadcast_statuses, entry_key)
341  call c_remove(broadcast_statuses, entry_key)
342  deallocate(generic)
343  end do
345  end if
346  call c_free(entries_to_remove)
348  end if
349  end subroutine clean_broadcast_progress
350 
356  function find_or_add_broadcast_item(field_name, timestep, completion_procedure)
357  character(len=*), intent(in) :: field_name
358  integer, intent(in) :: timestep
359  procedure(handle_completion), optional :: completion_procedure
360  type(inter_io_broadcast), pointer :: find_or_add_broadcast_item
361 
362  class(*), pointer :: generic
363 
364  find_or_add_broadcast_item=>find_broadcast_item(field_name, timestep, .true.)
365  if (.not. associated(find_or_add_broadcast_item)) then
367  find_or_add_broadcast_item=>find_broadcast_item(field_name, timestep, .false.)
368  if (.not. associated(find_or_add_broadcast_item)) then
369  allocate(find_or_add_broadcast_item)
370  if (present(completion_procedure)) then
371  find_or_add_broadcast_item%completion_procedure=>completion_procedure
372  else
373  find_or_add_broadcast_item%completion_procedure=>null()
374  end if
375  find_or_add_broadcast_item%handled=.false.
376  call check_thread_status(forthread_mutex_init(find_or_add_broadcast_item%mutex, -1))
377  generic=>find_or_add_broadcast_item
378  call c_put_generic(broadcast_statuses, trim(field_name)//"#"//conv_to_string(timestep), generic, .false.)
379  end if
381  end if
382  end function find_or_add_broadcast_item
383 
389  function find_broadcast_item(field_name, timestep, do_read_lock)
390  character(len=*), intent(in) :: field_name
391  integer, intent(in) :: timestep
392  logical, intent(in) :: do_read_lock
393  type(inter_io_broadcast), pointer :: find_broadcast_item
394 
395  class(*), pointer :: generic
396 
398  generic=>c_get_generic(broadcast_statuses, trim(field_name)//"#"//conv_to_string(timestep))
400 
401  if (associated(generic)) then
402  select type(generic)
403  type is (inter_io_broadcast)
404  find_broadcast_item=>generic
405  end select
406  else
407  find_broadcast_item=>null()
408  end if
409  end function find_broadcast_item
410 
414  function retrieve_broadcast_item(mapentry)
415  type(mapentry_type), intent(in) :: mapentry
416  type(inter_io_broadcast), pointer :: retrieve_broadcast_item
417 
418  class(*), pointer :: generic
419 
420  generic=>c_get_generic(mapentry)
421 
422  if (associated(generic)) then
423  select type(generic)
424  type is (inter_io_broadcast)
425  retrieve_broadcast_item=>generic
426  end select
427  else
428  retrieve_broadcast_item=>null()
429  end if
430  end function retrieve_broadcast_item
431 end module broadcast_inter_io_mod
type(hashmap_type), volatile broadcast_statuses
integer, volatile inter_io_description_mutex
logical function, public check_broadcast_inter_io_for_completion(io_configuration)
Checks the statuses for broadcast completion and returns whether they are all finished or not...
integer, volatile broadcast_statuses_rwlock
integer, volatile bcast_count
integer function forthread_rwlock_init(rwlock_id, attr_id)
Definition: forthread.F90:504
Gets a specific generic element out of the list, stack, queue or map with the corresponding key...
Returns whether a collection is empty.
integer function forthread_mutex_unlock(mutex_id)
Definition: forthread.F90:302
integer, parameter perform_clean_every
integer, volatile bcast_count_mutex
integer function forthread_mutex_destroy(mutex_id)
Definition: forthread.F90:265
character function, dimension(:), allocatable, public package_inter_io_communication_message(field_name, timestep, field_values, other_int)
Packages up fields into an io binary message (allocated here) which is used for sending.
subroutine issue_thread_call_to_completion(field_name, timestep, values, completion_procedure)
Issues the call into the thread pool to call the completion procedure, this runs in a seperate thread...
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17
integer, parameter my_inter_io_tag
Abstraction layer around MPI, this issues and marshals the lower level communication details...
integer function forthread_rwlock_rdlock(lock_id)
Definition: forthread.F90:514
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
Definition: collections.F90:94
subroutine, public finalise_broadcast_inter_io()
Finalises the broadcast inter IO functionality.
integer function forthread_mutex_init(mutex_id, attr_id)
Definition: forthread.F90:274
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
Converts data types to strings.
Definition: conversions.F90:36
integer function forthread_rwlock_wrlock(lock_id)
Definition: forthread.F90:532
integer function forthread_mutex_trylock(mutex_id)
Definition: forthread.F90:293
integer, volatile thread_callback_params_mutex
integer, parameter, public double_precision
Double precision (64 bit) kind.
Definition: datadefn.F90:14
Broadcast inter IO communication which sends a value from one IO server to all others. This tracks field name and timestep and only issues one call (and one results call to completion) for that combination.
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Definition: threadpool.F90:5
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
Definition: threadpool.F90:229
integer, volatile thread_callback_params_id
integer function forthread_mutex_lock(mutex_id)
Definition: forthread.F90:284
subroutine thread_call_to_completion(arguments, data_buffer)
Called by the thread pool, this will call onto the completion procedure before cleaning up Integer a...
subroutine, public perform_inter_io_broadcast(io_configuration, field_values, field_size, field_name, root, timestep, completion_procedure)
Performs an inter IO broadcast of data from the root to all other IO servers. Note that this is on th...
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
Definition: collections.F90:7
type(inter_io_broadcast) function, pointer find_or_add_broadcast_item(field_name, timestep, completion_procedure)
Locates and returns or adds and returns a specific broadcast item representing a timestep and field...
subroutine, public init_broadcast_inter_io(io_configuration)
Initialises the broadcast inter IO functionality.
integer, parameter, public string_length
Default length of strings.
Definition: datadefn.F90:10
subroutine clean_broadcast_progress_if_needed()
Calls out to do a broadcast progress clean if needed (i.e. every n steps.)
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
type(hashmap_type), volatile thread_callback_params
subroutine clean_broadcast_progress()
Performs a clean of the broadcast progresses that no longer need to be stored.
integer, volatile clean_progress_mutex
type(inter_io_broadcast) function, pointer find_broadcast_item(field_name, timestep, do_read_lock)
Finds a specific broadcast item or null if none is found.
subroutine, public unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, other_int)
Unpackages some binary data into its individual fields. The field values are allocated here and the s...
Inter IO server communication specific functionality. This manages all of the communication that migh...
List data structure which implements a doubly linked list. This list will preserve its order...
Definition: collections.F90:60
character(len= *), parameter my_inter_io_name
Type keeping track of broadcast statuses.
type(io_configuration_type), pointer stored_io_configuration
integer function forthread_rwlock_destroy(rwlock_id)
Definition: forthread.F90:495
integer function, public find_inter_io_from_name(io_configuration, name)
Locates a the index of an inter IO entry from the operator name or returns 0 if none is found...
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
Definition: threadpool.F90:102
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
integer function forthread_rwlock_unlock(lock_id)
Definition: forthread.F90:550
logical, volatile initialised
Puts a generic key-value pair into the map.
subroutine, public register_inter_io_communication(io_configuration, message_tag, handling_procedure, name)
Registers an inter IO communication operation.
Retrieves the generic value held at the specific map index or null if index > map elements...
type(inter_io_broadcast) function, pointer retrieve_broadcast_item(mapentry)
Locates a broadcast item within a mapentry or null if none exists.
integer, volatile bcast_clean_reduction_count
Adds a string to the end of the list.
Gets a specific string element out of the list, stack, queue or map with the corresponding key...
Parses the XML configuration file to produce the io configuration description which contains the data...
subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
Handles receiving data from another IO server and processing this. If the request has already been re...
Removes a specific element from the list or map.