17 use mpi
, only : mpi_double_precision, mpi_int, mpi_any_source, mpi_request_null, mpi_statuses_ignore, mpi_character, mpi_byte
32 integer,
dimension(:),
allocatable :: send_requests
33 character,
dimension(:),
allocatable :: send_buffer
34 real(kind=DEFAULT_PRECISION),
dimension(:),
allocatable :: cached_values
41 character(len=STRING_LENGTH) :: field_name
42 real(DEFAULT_PRECISION),
dimension(:),
allocatable :: values
87 if (
allocated(broadcast_item%send_requests))
then 89 deallocate(broadcast_item%send_requests)
90 if (
allocated(broadcast_item%send_buffer))
deallocate(broadcast_item%send_buffer)
121 if (.not. broadcast_item%handled)
then 138 character,
dimension(:),
intent(inout) :: data_buffer
139 integer,
intent(in) :: inter_io_index
142 real(kind=DEFAULT_PRECISION),
dimension(:),
allocatable :: data_values
143 character(len=STRING_LENGTH) :: field_name
150 if (
associated(broadcast_item%completion_procedure))
then 152 broadcast_item%handled=.true.
157 allocate(broadcast_item%cached_values(
size(data_values)), source=data_values)
158 broadcast_item%cached_values=data_values
161 if (
allocated(data_values))
deallocate(data_values)
175 timestep, completion_procedure)
177 real(kind=DOUBLE_PRECISION),
dimension(:) :: field_values
178 integer,
intent(in) :: field_size, root, timestep
179 character(len=*),
intent(in) :: field_name
183 integer :: inter_io_comm_index, i, ierr
190 if (io_configuration%my_io_rank == root .and. .not. broadcast_item%handled)
then 191 broadcast_item%handled=.true.
193 allocate(broadcast_item%send_requests(io_configuration%number_of_io_servers))
196 do i=0, io_configuration%number_of_io_servers-1
197 if (i .ne. io_configuration%my_io_rank)
then 199 call mpi_isend(broadcast_item%send_buffer,
size(broadcast_item%send_buffer), mpi_byte, i, &
200 io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
201 io_configuration%io_communicator, broadcast_item%send_requests(i+1), ierr)
204 broadcast_item%send_requests(i+1)=mpi_request_null
210 if (
allocated(broadcast_item%cached_values) .and. .not. broadcast_item%handled)
then 211 broadcast_item%handled=.true.
213 if (
allocated(broadcast_item%cached_values))
deallocate(broadcast_item%cached_values)
226 integer,
intent(in) :: timestep
227 character(len=*),
intent(in) :: field_name
228 real(DEFAULT_PRECISION),
dimension(:),
intent(in) :: values
232 class(*),
pointer :: generic
234 allocate(threaded_callback_state)
236 threaded_callback_state%field_name=field_name
237 threaded_callback_state%timestep=timestep
238 allocate(threaded_callback_state%values(
size(values)), source=values)
239 threaded_callback_state%completion_procedure=>completion_procedure
242 generic=>threaded_callback_state
254 integer,
dimension(:),
intent(in) :: arguments
255 character,
dimension(:),
allocatable,
intent(inout),
optional :: data_buffer
257 class(*),
pointer :: generic
264 threaded_callback_state=>generic
269 if (
associated(threaded_callback_state))
then 271 threaded_callback_state%field_name, threaded_callback_state%timestep)
272 deallocate(threaded_callback_state%values)
273 deallocate(threaded_callback_state)
293 integer :: completion_flag, ierr, num_to_remove, have_lock
294 character(len=STRING_LENGTH) :: entry_key
296 logical :: destroy_lock
299 class(*),
pointer :: generic
302 if (have_lock==0)
then 310 if (
allocated(specific_broadcast_item_at_index%send_requests))
then 312 call mpi_testall(
size(specific_broadcast_item_at_index%send_requests), specific_broadcast_item_at_index%send_requests, &
313 completion_flag, mpi_statuses_ignore, ierr)
315 if (completion_flag == 1)
then 316 deallocate(specific_broadcast_item_at_index%send_requests)
317 if (
allocated(specific_broadcast_item_at_index%send_buffer))
deallocate(specific_broadcast_item_at_index%send_buffer)
322 if (specific_broadcast_item_at_index%handled)
then 323 if (
allocated(specific_broadcast_item_at_index%cached_values))
then 324 deallocate(specific_broadcast_item_at_index%cached_values)
346 call c_free(entries_to_remove)
357 character(len=*),
intent(in) :: field_name
358 integer,
intent(in) :: timestep
362 class(*),
pointer :: generic
365 if (.not.
associated(find_or_add_broadcast_item))
then 368 if (.not.
associated(find_or_add_broadcast_item))
then 369 allocate(find_or_add_broadcast_item)
370 if (
present(completion_procedure))
then 371 find_or_add_broadcast_item%completion_procedure=>completion_procedure
373 find_or_add_broadcast_item%completion_procedure=>null()
375 find_or_add_broadcast_item%handled=.false.
377 generic=>find_or_add_broadcast_item
390 character(len=*),
intent(in) :: field_name
391 integer,
intent(in) :: timestep
392 logical,
intent(in) :: do_read_lock
395 class(*),
pointer :: generic
401 if (
associated(generic))
then 404 find_broadcast_item=>generic
407 find_broadcast_item=>null()
418 class(*),
pointer :: generic
422 if (
associated(generic))
then 425 retrieve_broadcast_item=>generic
428 retrieve_broadcast_item=>null()
type(hashmap_type), volatile broadcast_statuses
integer, volatile inter_io_description_mutex
logical function, public check_broadcast_inter_io_for_completion(io_configuration)
Checks the statuses for broadcast completion and returns whether they are all finished or not...
integer, volatile broadcast_statuses_rwlock
integer, volatile bcast_count
integer function forthread_rwlock_init(rwlock_id, attr_id)
Gets a specific generic element out of the list, stack, queue or map with the corresponding key...
Returns whether a collection is empty.
integer function forthread_mutex_unlock(mutex_id)
integer, parameter perform_clean_every
Overall IO configuration.
integer, volatile bcast_count_mutex
integer function forthread_mutex_destroy(mutex_id)
character function, dimension(:), allocatable, public package_inter_io_communication_message(field_name, timestep, field_values, other_int)
Packages up fields into an io binary message (allocated here) which is used for sending.
subroutine issue_thread_call_to_completion(field_name, timestep, values, completion_procedure)
Issues the call into the thread pool to call the completion procedure, this runs in a seperate thread...
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
integer, parameter my_inter_io_tag
Abstraction layer around MPI, this issues and marshals the lower level communication details...
integer function forthread_rwlock_rdlock(lock_id)
Contains common definitions for the data and datatypes used by MONC.
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
subroutine, public finalise_broadcast_inter_io()
Finalises the broadcast inter IO functionality.
integer function forthread_mutex_init(mutex_id, attr_id)
Conversion between common inbuilt FORTRAN data types.
Converts data types to strings.
integer function forthread_rwlock_wrlock(lock_id)
integer function forthread_mutex_trylock(mutex_id)
integer, volatile thread_callback_params_mutex
integer, parameter, public double_precision
Double precision (64 bit) kind.
Broadcast inter IO communication which sends a value from one IO server to all others. This tracks field name and timestep and only issues one call (and one results call to completion) for that combination.
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
integer, volatile thread_callback_params_id
integer function forthread_mutex_lock(mutex_id)
subroutine thread_call_to_completion(arguments, data_buffer)
Called by the thread pool, this will call onto the completion procedure before cleaning up Integer a...
subroutine, public perform_inter_io_broadcast(io_configuration, field_values, field_size, field_name, root, timestep, completion_procedure)
Performs an inter IO broadcast of data from the root to all other IO servers. Note that this is on th...
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
type(inter_io_broadcast) function, pointer find_or_add_broadcast_item(field_name, timestep, completion_procedure)
Locates and returns or adds and returns a specific broadcast item representing a timestep and field...
subroutine, public init_broadcast_inter_io(io_configuration)
Initialises the broadcast inter IO functionality.
integer, parameter, public string_length
Default length of strings.
subroutine clean_broadcast_progress_if_needed()
Calls out to do a broadcast progress clean if needed (i.e. every n steps.)
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
type(hashmap_type), volatile thread_callback_params
subroutine clean_broadcast_progress()
Performs a clean of the broadcast progresses that no longer need to be stored.
integer, volatile clean_progress_mutex
type(inter_io_broadcast) function, pointer find_broadcast_item(field_name, timestep, do_read_lock)
Finds a specific broadcast item or null if none is found.
subroutine, public unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, other_int)
Unpackages some binary data into its individual fields. The field values are allocated here and the s...
Inter IO server communication specific functionality. This manages all of the communication that migh...
List data structure which implements a doubly linked list. This list will preserve its order...
character(len= *), parameter my_inter_io_name
Type keeping track of broadcast statuses.
type(io_configuration_type), pointer stored_io_configuration
integer function forthread_rwlock_destroy(rwlock_id)
integer function, public find_inter_io_from_name(io_configuration, name)
Locates a the index of an inter IO entry from the operator name or returns 0 if none is found...
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
integer function forthread_rwlock_unlock(lock_id)
logical, volatile initialised
Puts a generic key-value pair into the map.
subroutine, public register_inter_io_communication(io_configuration, message_tag, handling_procedure, name)
Registers an inter IO communication operation.
Retrieves the generic value held at the specific map index or null if index > map elements...
type(inter_io_broadcast) function, pointer retrieve_broadcast_item(mapentry)
Locates a broadcast item within a mapentry or null if none exists.
integer, volatile bcast_clean_reduction_count
Adds a string to the end of the list.
Gets a specific string element out of the list, stack, queue or map with the corresponding key...
Parses the XML configuration file to produce the io configuration description which contains the data...
subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
Handles receiving data from another IO server and processing this. If the request has already been re...
Removes a specific element from the list or map.