12 use mpi
, only : mpi_comm_world, mpi_source, mpi_int, mpi_byte, mpi_status_size, mpi_request_null, &
13 mpi_status_ignore, mpi_statuses_ignore, mpi_any_source, mpi_thread_multiple, mpi_thread_serialized
28 integer(c_int32_t),
value :: useconds
32 integer ::
command_buffer,& !< Buffer used to receive the command data into when it arrives on that channel
46 integer,
intent(in) :: provided_threading
50 call log_master_log(
log_error,
"You must run MONC in MPI thread serialized or thread multiple mode for the IO server")
75 integer,
intent(inout) :: request
76 integer,
intent(inout),
optional :: status(mpi_status_size)
82 do while (flag .ne. 1)
84 if (
present(status))
then 85 call mpi_test(request, flag, status, ierr)
87 call mpi_test(request, flag, mpi_status_ignore, ierr)
93 if (
present(status))
then 94 call mpi_wait(request, status, ierr)
96 call mpi_wait(request, mpi_status_ignore, ierr)
106 integer,
dimension(:),
intent(inout) :: requests
107 integer,
intent(in) :: count
109 integer :: ierr, flag
113 do while (flag .ne. 1)
115 call mpi_testall(count, requests, flag, mpi_statuses_ignore, ierr)
120 call mpi_waitall(count, requests, mpi_statuses_ignore, ierr)
128 integer,
intent(in) :: io_comm
130 integer :: number, ierr
132 call mpi_comm_size(io_comm, number, ierr)
140 integer,
intent(in) :: io_comm
142 integer :: number, ierr
144 call mpi_comm_rank(io_comm, number, ierr)
165 integer function data_receive(mpi_datatype, num_elements, source, dump_data, data_dump_id, description_data)
166 integer,
intent(in) :: mpi_datatype, num_elements, source
167 integer,
intent(in),
optional :: data_dump_id
168 character,
dimension(:),
allocatable,
intent(inout),
optional :: dump_data
170 integer :: ierr, request, status(mpi_status_size), recv_count, tag_to_use
172 if (
present(dump_data))
then 174 if (
present(data_dump_id)) tag_to_use=tag_to_use+data_dump_id
176 call mpi_irecv(dump_data, num_elements, mpi_datatype, source, tag_to_use, mpi_comm_world, request, ierr)
180 call mpi_get_count(status, mpi_datatype, recv_count, ierr)
183 else if (
present(description_data))
then 185 call mpi_irecv(description_data, num_elements, mpi_datatype, source,
data_tag, mpi_comm_world, request, ierr)
189 call mpi_get_count(status, mpi_datatype, recv_count, ierr)
203 integer,
intent(in) :: req
207 if (req .ne. mpi_request_null)
then 209 call mpi_cancel(req, ierr)
219 integer,
intent(out) :: command, source
221 integer :: ierr, status(mpi_status_size), complete
227 if (complete .eq. 1)
then 229 source = status(mpi_source)
243 logical function test_for_inter_io(inter_io_communications, number_of_inter_io, io_communicator, command, source, data_buffer)
244 integer,
intent(in) :: number_of_inter_io, io_communicator
245 integer,
intent(out) :: command, source
247 character,
dimension(:),
allocatable,
intent(inout) :: data_buffer
249 integer :: i, ierr, status(mpi_status_size), message_size
250 logical :: message_pending
253 do i=1, number_of_inter_io
254 call mpi_iprobe(mpi_any_source, inter_io_communications(i)%message_tag, io_communicator, message_pending, status, ierr)
255 if (message_pending)
then 256 call mpi_get_count(status, mpi_byte, message_size, ierr)
257 allocate(data_buffer(message_size))
258 call mpi_recv(data_buffer, message_size, mpi_byte, mpi_any_source, inter_io_communications(i)%message_tag, &
259 io_communicator, mpi_status_ignore, ierr)
274 integer,
intent(in) :: the_type
278 call mpi_type_free(the_type, ierr)
291 integer function build_mpi_datatype(data_definition, data_size_info, data_size, field_start_locations, &
292 field_end_locations, field_dimensions)
295 integer,
intent(out) :: data_size
296 type(
map_type),
intent(out) :: field_start_locations, field_end_locations
297 type(
map_type),
intent(out),
optional :: field_dimensions
299 integer :: type_extents(5), type_counts, i, j, field_start, data_type, field_array_sizes, &
300 temp_size, prev_data_type, old_types(20), offsets(20), block_counts(20), new_type, current_location, ierr, field_ignores
301 logical :: field_found
312 do i=1,data_definition%number_of_data_fields
313 if (data_type == 0)
then 314 prev_data_type=data_type
315 data_type=data_definition%fields(i)%data_type
317 if (data_type .ne. data_definition%fields(i)%data_type)
then 320 type_extents, prev_data_type, type_counts+1, old_types, offsets, block_counts)
324 prev_data_type=data_type
325 data_type=data_definition%fields(i)%data_type
326 type_counts=type_counts+1
329 call c_put_integer(field_start_locations, data_definition%fields(i)%name, current_location)
334 if (.not. field_found .or. field_size_info%dimensions == 0)
then 336 if (.not. data_definition%fields(i)%optional)
then 337 call log_log(
log_error,
"Non optional field `"//trim(data_definition%fields(i)%name)//&
338 "' omitted from MONC IO server registration")
340 field_ignores=field_ignores+1
344 do j=1, field_size_info%dimensions
345 temp_size=temp_size*field_size_info%dim_sizes(j)
347 if (data_definition%fields(i)%field_type .eq.
map_field_type)
then 348 field_array_sizes=(field_array_sizes+temp_size*
string_length*2)-1
351 field_array_sizes=(field_array_sizes+temp_size)-1
352 current_location=current_location+temp_size*type_extents(data_type)
356 if (data_definition%fields(i)%optional)
then 360 current_location=current_location+type_extents(data_type)*
string_length 362 current_location=current_location+type_extents(data_type)
365 field_ignores=field_ignores+1
370 current_location=current_location+type_extents(data_type)*
string_length 372 current_location=current_location+type_extents(data_type)
376 call c_put_integer(field_end_locations, data_definition%fields(i)%name, current_location-1)
377 if (
present(field_dimensions))
then 378 call c_put_integer(field_dimensions, data_definition%fields(i)%name, field_size_info%dimensions)
381 if (field_start .le. i-1)
then 384 type_extents, prev_data_type, type_counts+1, old_types, offsets, block_counts)
385 type_counts=type_counts+1
388 call mpi_type_struct(type_counts, block_counts, offsets, old_types, new_type, ierr)
389 call mpi_type_commit(new_type, ierr)
391 call mpi_type_size(new_type, data_size, ierr)
Configuration of a specific data definition.
Puts an integer key-value pair into the map.
integer function forthread_mutex_unlock(mutex_id)
integer mpi_threading_mode
integer function, public get_number_io_servers(io_comm)
Retrieves the number of IO servers that are running in total.
integer, parameter, public float_data_type
integer function, public get_mpi_datatype_from_internal_representation(type_code)
Gets the MPI datatype from out internal representation of the field data type (as in the configuratio...
logical function, public test_for_command(command, source)
Tests for a command message based upon the request already registered.
integer, parameter, public log_error
Only log ERROR messages.
integer, parameter, public array_field_type
integer function forthread_mutex_destroy(mutex_id)
subroutine, public register_command_receive()
Registers a request for receiving a command from any MONC process on the command channel.
integer, parameter, public boolean_data_type
integer, parameter ms_wait_between_tests
Interface to the C usleep Linux call which allows us to sleep for a specific number of MS...
subroutine cancel_request(req)
Cancels a specific communication request.
integer, parameter, public command_tag
Abstraction layer around MPI, this issues and marshals the lower level communication details...
Contains common definitions for the data and datatypes used by MONC.
subroutine, public log_master_log(level, message)
Will log just from the master process.
subroutine, public free_mpi_type(the_type)
Frees an MPI type, used in clean up.
integer function forthread_mutex_init(mutex_id, attr_id)
integer, parameter, public double_data_type
subroutine, public wait_for_mpi_request(request, status)
Waits for a specific MPI request to complete, either by managing thread safety and interleaving or ju...
integer function, public data_receive(mpi_datatype, num_elements, source, dump_data, data_dump_id, description_data)
Awaits some data on the data channel. This is of the type, size from the source provided and can eith...
logical function, public test_for_inter_io(inter_io_communications, number_of_inter_io, io_communicator, command, source, data_buffer)
Tests for inter IO server communication.
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
integer, parameter, public inter_io_communication
Field type identifiers.
integer, parameter, public string_data_type
This defines some constants and procedures that are useful to the IO server and clients that call it...
integer, parameter, public integer_data_type
Map data structure that holds string (length 20 maximum) key value pairs.
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
integer function forthread_mutex_lock(mutex_id)
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
integer command_request_handle
Request handle representing the asynchronous P2P command request.
Collection data structures.
subroutine, public initialise_mpi_communication(provided_threading)
Initialises MPI communication.
integer, parameter, public string_length
Default length of strings.
integer function, dimension(5), public populate_mpi_type_extents()
Provides the type extents of the types that we are using in construction of the MPI data type...
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
subroutine, public cancel_requests()
Cancels all outstanding communication requests.
logical function, public get_data_description_from_name(descriptions, name, field_description)
Look up the data description that corresponds to a specific field keyed by its name.
integer, volatile mpi_mutex
subroutine, public append_mpi_datatype(field_start, field_end, field_array_sizes, data_type, type_extents, prev_data_type, type_index, old_types, offsets, block_counts)
Appends the MPI datatype details to the block counts, old types and offsets arrays. This will lump together multiple concurrent fields with the same type.
integer function, public build_mpi_datatype(data_definition, data_size_info, data_size, field_start_locations, field_end_locations, field_dimensions)
Builds the MPI type that corresponds to the data which will be received from a specific MONC process...
integer command_buffer
Buffer used to receive the command data into when it arrives on that channel.
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
logical manage_mpi_thread_safety
subroutine, public pause_for_mpi_interleaving()
Pauses for a specific number of ms to allow for MPI interleaving, this is to avoid starvation...
integer, parameter, public map_field_type
Field data type identifiers.
integer, parameter, public data_tag
Parses the XML configuration file to produce the io configuration description which contains the data...
integer function, public get_my_io_rank(io_comm)
Retrieves my IO server rank out of the number of IO servers that are running.