18 use mpi
, only : mpi_double_precision, mpi_int, mpi_any_source, mpi_request_null, mpi_status_ignore, mpi_character, mpi_byte
32 real(DEFAULT_PRECISION),
dimension(:),
allocatable :: values
33 character,
dimension(:),
allocatable :: send_buffer
34 character(len=STRING_LENGTH) :: field_name
35 integer :: contributed_moncs, contributed_io_servers, timestep, reduction_operator, async_handle, mutex, root
71 character,
dimension(:),
intent(inout) :: data_buffer
72 integer,
intent(in) :: inter_io_index
75 io_configuration%my_io_rank, data_buffer, io_configuration%number_of_io_servers)
102 if (progress%async_handle /= mpi_request_null)
then 126 root, timestep, completion_procedure)
128 real(kind=DOUBLE_PRECISION),
dimension(:) :: field_values
129 integer,
intent(in) :: field_size, reduction_op, root, timestep
130 character(len=*),
intent(in) :: reduction_field_name
134 logical :: collective_values_new
140 reduction_progress%contributed_moncs=reduction_progress%contributed_moncs+1
142 collective_values_new=.not.
allocated(reduction_progress%values)
143 if (collective_values_new)
allocate(reduction_progress%values(field_size))
146 if (reduction_progress%contributed_moncs == io_configuration%number_of_moncs)
then 147 reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
155 integer,
intent(in) :: myrank
175 integer,
intent(in) :: myrank
177 integer :: i, entries, completed, ierr, num_to_remove, have_lock
182 character(len=STRING_LENGTH) :: entry_key
183 class(*),
pointer :: generic
184 logical :: destroy_lock
188 if (have_lock == 0)
then 195 if (myrank /= specific_reduction_progress%root)
then 197 if (specific_reduction_progress%async_handle /= mpi_request_null)
then 199 if (completed == 1)
then 200 if (
allocated(specific_reduction_progress%send_buffer))
deallocate(specific_reduction_progress%send_buffer)
224 call c_free(entries_to_remove)
237 number_elements, collective_values_empty)
238 integer,
intent(in) :: reduction_op, number_elements
239 logical,
intent(in) :: collective_values_empty
241 real(kind=DOUBLE_PRECISION),
dimension(:),
intent(in) :: single_server_values
245 if (collective_values_empty)
then 246 reduction_progress%values=single_server_values
248 if (reduction_op ==
mean .or. reduction_op ==
sum)
then 249 reduction_progress%values=reduction_progress%values+single_server_values
250 else if (reduction_op ==
min .or. reduction_op ==
max)
then 251 do k=1, number_elements
252 if (reduction_op ==
min)
then 253 if (single_server_values(k) .lt. reduction_progress%values(k)) &
254 reduction_progress%values(k)=single_server_values(k)
255 else if (reduction_op ==
max)
then 256 if (single_server_values(k) .gt. reduction_progress%values(k)) &
257 reduction_progress%values(k)=single_server_values(k)
275 integer :: ierr, inter_io_comm_index
277 if (io_configuration%my_io_rank == reduction_progress%root .and. &
278 reduction_progress%contributed_io_servers == io_configuration%number_of_io_servers)
then 281 if (io_configuration%my_io_rank /= reduction_progress%root)
then 285 reduction_progress%timestep, reduction_progress%values, reduction_progress%reduction_operator)
287 call mpi_isend(reduction_progress%send_buffer,
size(reduction_progress%send_buffer), &
288 mpi_byte, reduction_progress%root, &
289 io_configuration%inter_io_communications(inter_io_comm_index)%message_tag, &
290 io_configuration%io_communicator, reduction_progress%async_handle, ierr)
293 if (
allocated(reduction_progress%values))
deallocate(reduction_progress%values)
306 character,
dimension(:),
intent(inout) :: data_buffer
307 integer,
intent(in) :: number_io_servers, myrank
310 character(len=STRING_LENGTH) :: field_name
311 integer :: timestep, reduction_op
312 real(kind=DEFAULT_PRECISION),
dimension(:),
allocatable :: field_values
313 logical :: collective_values_new
319 collective_values_new=.not.
allocated(reduction_progress%values)
320 if (collective_values_new)
allocate(reduction_progress%values(
size(field_values)))
322 reduction_progress%contributed_io_servers=reduction_progress%contributed_io_servers+1
324 field_values,
size(field_values), collective_values_new)
325 if (reduction_progress%contributed_io_servers == number_io_servers)
then 327 deallocate(field_values)
330 deallocate(field_values)
341 if (reduction_progress%reduction_operator ==
mean)
then 342 reduction_progress%values=reduction_progress%values/io_configuration%number_of_global_moncs
344 call reduction_progress%completion_procedure(io_configuration, reduction_progress%values, &
345 reduction_progress%field_name, reduction_progress%timestep)
348 if (
allocated(reduction_progress%values))
deallocate(reduction_progress%values)
360 integer,
intent(in) :: timestep, reduction_operator, root
362 character(len=*),
intent(in) :: field_name
365 class(*),
pointer :: generic
369 if (.not.
associated(find_or_add_reduction_progress))
then 371 find_or_add_reduction_progress=>
find_reduction_progress(timestep, reduction_operator, field_name, issue_read_lock=.false.)
372 if (.not.
associated(find_or_add_reduction_progress))
then 373 allocate(new_progress)
375 new_progress%timestep=timestep
376 new_progress%reduction_operator=reduction_operator
377 new_progress%contributed_moncs=0
378 new_progress%contributed_io_servers=0
379 new_progress%root=root
380 new_progress%async_handle=mpi_request_null
381 new_progress%field_name=field_name
382 if (
present(completion_procedure))
then 383 new_progress%completion_procedure=>completion_procedure
385 new_progress%completion_procedure=>null()
387 generic=>new_progress
390 find_or_add_reduction_progress=>new_progress
394 if (.not.
associated(find_or_add_reduction_progress%completion_procedure) .and.
present(completion_procedure))
then 395 find_or_add_reduction_progress%completion_procedure=>completion_procedure
406 integer,
intent(in) :: timestep, reduction_operator
407 logical,
intent(in),
optional :: issue_read_lock
409 character(len=*),
intent(in) :: field_name
411 class(*),
pointer :: generic
412 logical :: do_read_lock
414 if (
present(issue_read_lock))
then 415 do_read_lock=issue_read_lock
423 if (
associated(generic))
then 426 find_reduction_progress=>generic
429 find_reduction_progress=>null()
438 class(*),
pointer :: generic
439 character(len=STRING_LENGTH) :: specific_key
442 reduction_progress%reduction_operator)
447 if (
associated(generic))
deallocate(generic)
455 character(len=*),
intent(in) :: field_name
456 integer,
intent(in) :: timestep, reduction_operator
468 class(*),
pointer :: generic
472 if (
associated(generic))
then 475 retrieve_reduction_progress=>generic
478 retrieve_reduction_progress=>null()
487 character(len=*),
intent(in) :: op_string
489 if (op_string .eq.
"mean")
then 491 else if (op_string .eq.
"min")
then 493 else if (op_string .eq.
"max")
then 495 else if (op_string .eq.
"sum")
then 498 call log_log(
log_error,
"Reduction operator '"//trim(op_string)//
"' not recognised")
integer, volatile previous_clean_reduction_count
integer, volatile clean_progress_mutex
integer function forthread_rwlock_init(rwlock_id, attr_id)
Gets a specific generic element out of the list, stack, queue or map with the corresponding key...
logical, volatile initialised
Returns whether a collection is empty.
integer function forthread_mutex_unlock(mutex_id)
subroutine, public init_reduction_inter_io(io_configuration)
Initialises the reduction action.
subroutine integrate_io_server_collective_values(reduction_op, reduction_progress, single_server_values, number_elements, collective_values_empty)
Integrates the collective values from another IO server into the currently stored values...
Overall IO configuration.
integer, parameter my_inter_io_tag
integer, parameter, public log_error
Only log ERROR messages.
integer function forthread_mutex_destroy(mutex_id)
subroutine handle_process_recv_from_other_io_server(io_configuration, inter_io_comm, myrank, data_buffer, number_io_servers)
Handles the data received from another IO server, locates the correct reduction progress, appends the information and then checks for & deals with the situation where that reduction is completed.
subroutine, public perform_inter_io_reduction(io_configuration, field_values, field_size, reduction_field_name, reduction_op, root, timestep, completion_procedure)
Actually handles the processing for this data wrt the vertical reduction.
character function, dimension(:), allocatable, public package_inter_io_communication_message(field_name, timestep, field_values, other_int)
Packages up fields into an io binary message (allocated here) which is used for sending.
subroutine handle_recv_data_from_io_server(io_configuration, data_buffer, inter_io_index)
Handles the receiving of data from some other IO server. This is issued call back style within a thre...
logical function, public check_reduction_inter_io_for_completion(io_configuration)
Checks this action for completion, when all are completed then the IO server can shutdown as this is ...
subroutine handle_local_moncs_completed_collective(io_configuration, reduction_progress)
Handles the case where the local MONC processes have completed their collective operation for a speci...
integer, volatile reduction_count_mutex
integer function forthread_rwlock_trywrlock(lock_id)
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Abstraction layer around MPI, this issues and marshals the lower level communication details...
integer function forthread_rwlock_rdlock(lock_id)
Contains common definitions for the data and datatypes used by MONC.
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
integer function forthread_mutex_init(mutex_id, attr_id)
Conversion between common inbuilt FORTRAN data types.
subroutine, public wait_for_mpi_request(request, status)
Waits for a specific MPI request to complete, either by managing thread safety and interleaving or ju...
Converts data types to strings.
integer function forthread_rwlock_wrlock(lock_id)
character(len= *), parameter my_inter_io_name
integer function forthread_mutex_trylock(mutex_id)
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
subroutine handle_collective_completed(io_configuration, reduction_progress)
Handles the situation where collective communication for a specific reduction has completed across al...
integer, parameter, public double_precision
Double precision (64 bit) kind.
Map data structure that holds string (length 20 maximum) key value pairs.
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
integer function forthread_mutex_lock(mutex_id)
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
character(len=string_length) function generate_reduction_key(field_name, timestep, reduction_operator)
Generates the lookup key that is used for the map storage of reduction progresses.
type(hashmap_type), volatile reduction_progresses
integer, parameter, public string_length
Default length of strings.
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
subroutine, public unpackage_inter_io_communication_message(data_buffer, field_name, timestep, field_values, other_int)
Unpackages some binary data into its individual fields. The field values are allocated here and the s...
Inter IO server communication specific functionality. This manages all of the communication that migh...
integer, volatile reduction_count
List data structure which implements a doubly linked list. This list will preserve its order...
logical function check_and_clean_progress(myrank)
Checks all the reduction progresses and will remove any that have completed. This is designed to be c...
integer function forthread_rwlock_destroy(rwlock_id)
integer function, public find_inter_io_from_name(io_configuration, name)
Locates a the index of an inter IO entry from the operator name or returns 0 if none is found...
integer, volatile reduction_progress_rwlock
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
subroutine, public finalise_reduction_inter_io(io_configuration)
Finalises the reduction action, waiting for all outstanding requests and then freeing data...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
integer function forthread_rwlock_unlock(lock_id)
subroutine remove_reduction_progress(reduction_progress)
Removes a specific reduction progress.
Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as...
Puts a generic key-value pair into the map.
subroutine, public register_inter_io_communication(io_configuration, message_tag, handling_procedure, name)
Registers an inter IO communication operation.
integer function, public get_reduction_operator(op_string)
Given the map of action attributes this procedure will identify the reduction operator that has been ...
Retrieves the generic value held at the specific map index or null if index > map elements...
type(reduction_progress_type) function, pointer find_reduction_progress(timestep, reduction_operator, field_name, issue_read_lock)
Locates a specific reduction progress based upon the timestep, operator and field name...
type(reduction_progress_type) function, pointer retrieve_reduction_progress(mapentry)
Helper function to retrieve the reduction progress from a mapentry.
Determines whether or not a map contains a specific key.
Adds a string to the end of the list.
subroutine clean_progress(myrank)
Gets a specific string element out of the list, stack, queue or map with the corresponding key...
Parses the XML configuration file to produce the io configuration description which contains the data...
integer, parameter perform_clean_every
type(reduction_progress_type) function, pointer find_or_add_reduction_progress(timestep, reduction_operator, root, field_name, completion_procedure)
Finds or adds a specific reduction progress based upon the timestep and reduction operator...
integer, volatile inter_io_description_mutex
Removes a specific element from the list or map.