MONC
mpicommunication.F90
Go to the documentation of this file.
1 
3  use datadefn_mod, only : string_length
12  use mpi, only : mpi_comm_world, mpi_source, mpi_int, mpi_byte, mpi_status_size, mpi_request_null, &
13  mpi_status_ignore, mpi_statuses_ignore, mpi_any_source, mpi_thread_multiple, mpi_thread_serialized
14  use iso_c_binding
15  implicit none
16 
17 #ifndef TEST_MODE
18  private
19 #endif
20 
21  integer, parameter :: ms_wait_between_tests=100
22 
23 
24  interface
25  subroutine usleep(useconds) bind(C)
26  use iso_c_binding
27  implicit none
28  integer(c_int32_t), value :: useconds
29  end subroutine usleep
30  end interface
31 
32  integer :: command_buffer,& !< Buffer used to receive the command data into when it arrives on that channel
33  command_request_handle, & !< Request handle representing the asynchronous P2P command request
35  integer, volatile :: mpi_mutex
37 
41 contains
42 
45  subroutine initialise_mpi_communication(provided_threading)
46  integer, intent(in) :: provided_threading
47 
48  mpi_threading_mode=provided_threading
49  if (mpi_threading_mode .ne. mpi_thread_multiple .and. mpi_threading_mode .ne. mpi_thread_serialized) then
50  call log_master_log(log_error, "You must run MONC in MPI thread serialized or thread multiple mode for the IO server")
51  end if
52  manage_mpi_thread_safety=provided_threading == mpi_thread_serialized
54  end subroutine initialise_mpi_communication
55 
57  subroutine lock_mpi()
59  end subroutine lock_mpi
60 
62  subroutine unlock_mpi()
64  end subroutine unlock_mpi
65 
67  subroutine pause_for_mpi_interleaving()
68  call usleep(int(ms_wait_between_tests, c_int32_t))
69  end subroutine pause_for_mpi_interleaving
70 
74  subroutine wait_for_mpi_request(request, status)
75  integer, intent(inout) :: request
76  integer, intent(inout), optional :: status(mpi_status_size)
77 
78  integer :: ierr, flag
79 
80  if (manage_mpi_thread_safety) then
81  flag=0
82  do while (flag .ne. 1)
83  call lock_mpi()
84  if (present(status)) then
85  call mpi_test(request, flag, status, ierr)
86  else
87  call mpi_test(request, flag, mpi_status_ignore, ierr)
88  end if
89  call unlock_mpi()
90  if (flag .ne. 1) call pause_for_mpi_interleaving()
91  end do
92  else
93  if (present(status)) then
94  call mpi_wait(request, status, ierr)
95  else
96  call mpi_wait(request, mpi_status_ignore, ierr)
97  end if
98  end if
99  end subroutine wait_for_mpi_request
100 
105  subroutine waitall_for_mpi_requests(requests, count)
106  integer, dimension(:), intent(inout) :: requests
107  integer, intent(in) :: count
108 
109  integer :: ierr, flag
110 
111  if (manage_mpi_thread_safety) then
112  flag=0
113  do while (flag .ne. 1)
114  call lock_mpi()
115  call mpi_testall(count, requests, flag, mpi_statuses_ignore, ierr)
116  call unlock_mpi()
117  if (flag .ne. 1) call pause_for_mpi_interleaving()
118  end do
119  else
120  call mpi_waitall(count, requests, mpi_statuses_ignore, ierr)
121  end if
122  end subroutine waitall_for_mpi_requests
123 
127  integer function get_number_io_servers(io_comm)
128  integer, intent(in) :: io_comm
129 
130  integer :: number, ierr
131 
132  call mpi_comm_size(io_comm, number, ierr)
133  get_number_io_servers=number
134  end function get_number_io_servers
135 
139  integer function get_my_io_rank(io_comm)
140  integer, intent(in) :: io_comm
141 
142  integer :: number, ierr
143 
144  call mpi_comm_rank(io_comm, number, ierr)
145  get_my_io_rank=number
146  end function get_my_io_rank
147 
149  subroutine register_command_receive()
150  integer :: ierr
151 
152  call lock_mpi()
153  call mpi_irecv(command_buffer, 1, mpi_int, mpi_any_source, command_tag, &
154  mpi_comm_world, command_request_handle, ierr)
155  call unlock_mpi()
156  end subroutine register_command_receive
157 
165  integer function data_receive(mpi_datatype, num_elements, source, dump_data, data_dump_id, description_data)
166  integer, intent(in) :: mpi_datatype, num_elements, source
167  integer, intent(in), optional :: data_dump_id
168  character, dimension(:), allocatable, intent(inout), optional :: dump_data
169  type(data_sizing_description_type), dimension(:), intent(inout), optional :: description_data
170  integer :: ierr, request, status(mpi_status_size), recv_count, tag_to_use
171 
172  if (present(dump_data)) then
173  tag_to_use=data_tag
174  if (present(data_dump_id)) tag_to_use=tag_to_use+data_dump_id
175  call lock_mpi()
176  call mpi_irecv(dump_data, num_elements, mpi_datatype, source, tag_to_use, mpi_comm_world, request, ierr)
177  call unlock_mpi()
178  call wait_for_mpi_request(request, status)
179  call lock_mpi()
180  call mpi_get_count(status, mpi_datatype, recv_count, ierr)
181  call unlock_mpi()
182  data_receive=recv_count
183  else if (present(description_data)) then
184  call lock_mpi()
185  call mpi_irecv(description_data, num_elements, mpi_datatype, source, data_tag, mpi_comm_world, request, ierr)
186  call unlock_mpi()
187  call wait_for_mpi_request(request, status)
188  call lock_mpi()
189  call mpi_get_count(status, mpi_datatype, recv_count, ierr)
190  call unlock_mpi()
191  data_receive=recv_count
192  end if
193  end function data_receive
194 
196  subroutine cancel_requests()
198  end subroutine cancel_requests
199 
202  subroutine cancel_request(req)
203  integer, intent(in) :: req
204 
205  integer :: ierr
206 
207  if (req .ne. mpi_request_null) then
208  call lock_mpi()
209  call mpi_cancel(req, ierr)
210  call unlock_mpi()
211  end if
212  end subroutine cancel_request
213 
218  logical function test_for_command(command, source)
219  integer, intent(out) :: command, source
220 
221  integer :: ierr, status(mpi_status_size), complete
222 
223  call lock_mpi()
224  call mpi_test(command_request_handle, complete, status, ierr)
225  call unlock_mpi()
226 
227  if (complete .eq. 1) then
228  command = command_buffer
229  source = status(mpi_source)
231  test_for_command=.true.
232  else
233  test_for_command=.false.
234  end if
235  end function test_for_command
236 
243  logical function test_for_inter_io(inter_io_communications, number_of_inter_io, io_communicator, command, source, data_buffer)
244  integer, intent(in) :: number_of_inter_io, io_communicator
245  integer, intent(out) :: command, source
246  type(io_configuration_inter_communication_description), dimension(:), intent(inout) :: inter_io_communications
247  character, dimension(:), allocatable, intent(inout) :: data_buffer
248 
249  integer :: i, ierr, status(mpi_status_size), message_size
250  logical :: message_pending
251 
252  call lock_mpi()
253  do i=1, number_of_inter_io
254  call mpi_iprobe(mpi_any_source, inter_io_communications(i)%message_tag, io_communicator, message_pending, status, ierr)
255  if (message_pending) then
256  call mpi_get_count(status, mpi_byte, message_size, ierr)
257  allocate(data_buffer(message_size))
258  call mpi_recv(data_buffer, message_size, mpi_byte, mpi_any_source, inter_io_communications(i)%message_tag, &
259  io_communicator, mpi_status_ignore, ierr)
260  call unlock_mpi()
261  command=inter_io_communication
262  source=i
263  test_for_inter_io=.true.
264  return
265  end if
266  end do
267  call unlock_mpi()
268  test_for_inter_io=.false.
269  end function test_for_inter_io
270 
273  subroutine free_mpi_type(the_type)
274  integer, intent(in) :: the_type
275 
276  integer :: ierr
277 
278  call mpi_type_free(the_type, ierr)
279  end subroutine free_mpi_type
280 
291  integer function build_mpi_datatype(data_definition, data_size_info, data_size, field_start_locations, &
292  field_end_locations, field_dimensions)
293  type(io_configuration_data_definition_type), intent(in) :: data_definition
294  type(data_sizing_description_type), dimension(:), intent(in) :: data_size_info
295  integer, intent(out) :: data_size
296  type(map_type), intent(out) :: field_start_locations, field_end_locations
297  type(map_type), intent(out), optional :: field_dimensions
298 
299  integer :: type_extents(5), type_counts, i, j, field_start, data_type, field_array_sizes, &
300  temp_size, prev_data_type, old_types(20), offsets(20), block_counts(20), new_type, current_location, ierr, field_ignores
301  logical :: field_found
302  type(data_sizing_description_type) :: field_size_info
303 
304  type_extents=populate_mpi_type_extents()
305 
306  field_start=1
307  data_type=0
308  type_counts=0
309  field_array_sizes=0
310  field_ignores=0
311  current_location=1
312  do i=1,data_definition%number_of_data_fields
313  if (data_type == 0) then
314  prev_data_type=data_type
315  data_type=data_definition%fields(i)%data_type
316  else
317  if (data_type .ne. data_definition%fields(i)%data_type) then
318  ! For efficient type packing, combine multiple fields with the same type - therefore when the type changes work the previous one pack
319  call append_mpi_datatype(field_start, i-1-field_ignores, field_array_sizes, data_type, &
320  type_extents, prev_data_type, type_counts+1, old_types, offsets, block_counts)
321  field_start=i
322  field_array_sizes=0
323  field_ignores=0
324  prev_data_type=data_type
325  data_type=data_definition%fields(i)%data_type
326  type_counts=type_counts+1
327  end if
328  end if
329  call c_put_integer(field_start_locations, data_definition%fields(i)%name, current_location)
330  if (data_definition%fields(i)%field_type .eq. array_field_type .or. &
331  data_definition%fields(i)%field_type .eq. map_field_type) then
332  ! Grab the field info based upon the field name
333  field_found=get_data_description_from_name(data_size_info, data_definition%fields(i)%name, field_size_info)
334  if (.not. field_found .or. field_size_info%dimensions == 0) then
335  ! If no field info, or the dimension is 0 then this MONC process is not sending that field - check it is optional
336  if (.not. data_definition%fields(i)%optional) then
337  call log_log(log_error, "Non optional field `"//trim(data_definition%fields(i)%name)//&
338  "' omitted from MONC IO server registration")
339  end if
340  field_ignores=field_ignores+1
341  else
342  ! If the field is specified then use the size data to assemble the field size and append to current size
343  temp_size=1
344  do j=1, field_size_info%dimensions
345  temp_size=temp_size*field_size_info%dim_sizes(j)
346  end do
347  if (data_definition%fields(i)%field_type .eq. map_field_type) then
348  field_array_sizes=(field_array_sizes+temp_size*string_length*2)-1
349  current_location=current_location+temp_size*string_length*2
350  else
351  field_array_sizes=(field_array_sizes+temp_size)-1
352  current_location=current_location+temp_size*type_extents(data_type)
353  end if
354  end if
355  else
356  if (data_definition%fields(i)%optional) then
357  if (get_data_description_from_name(data_size_info, data_definition%fields(i)%name)) then
358  if (data_type==string_data_type) then
359  field_array_sizes=(field_array_sizes+string_length)-1
360  current_location=current_location+type_extents(data_type)*string_length
361  else
362  current_location=current_location+type_extents(data_type)
363  end if
364  else
365  field_ignores=field_ignores+1
366  end if
367  else
368  if (data_type==string_data_type) then
369  field_array_sizes=(field_array_sizes+string_length)-1
370  current_location=current_location+type_extents(data_type)*string_length
371  else
372  current_location=current_location+type_extents(data_type)
373  end if
374  end if
375  end if
376  call c_put_integer(field_end_locations, data_definition%fields(i)%name, current_location-1)
377  if (present(field_dimensions)) then
378  call c_put_integer(field_dimensions, data_definition%fields(i)%name, field_size_info%dimensions)
379  end if
380  end do
381  if (field_start .le. i-1) then
382  ! If there are outstanding fields to process then we do this here
383  call append_mpi_datatype(field_start, i-1, field_array_sizes, data_type, &
384  type_extents, prev_data_type, type_counts+1, old_types, offsets, block_counts)
385  type_counts=type_counts+1
386  end if
387  call lock_mpi()
388  call mpi_type_struct(type_counts, block_counts, offsets, old_types, new_type, ierr)
389  call mpi_type_commit(new_type, ierr)
390  call unlock_mpi()
391  call mpi_type_size(new_type, data_size, ierr)
392  build_mpi_datatype=new_type
393  end function build_mpi_datatype
394 end module mpi_communication_mod
Puts an integer key-value pair into the map.
integer function forthread_mutex_unlock(mutex_id)
Definition: forthread.F90:302
integer function, public get_number_io_servers(io_comm)
Retrieves the number of IO servers that are running in total.
integer, parameter, public float_data_type
Definition: ioclient.F90:40
integer function, public get_mpi_datatype_from_internal_representation(type_code)
Gets the MPI datatype from out internal representation of the field data type (as in the configuratio...
Definition: ioclient.F90:203
logical function, public test_for_command(command, source)
Tests for a command message based upon the request already registered.
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
integer, parameter, public array_field_type
Definition: ioclient.F90:38
integer function forthread_mutex_destroy(mutex_id)
Definition: forthread.F90:265
subroutine, public register_command_receive()
Registers a request for receiving a command from any MONC process on the command channel.
integer, parameter, public boolean_data_type
Definition: ioclient.F90:40
Logging utility.
Definition: logging.F90:2
integer, parameter ms_wait_between_tests
Interface to the C usleep Linux call which allows us to sleep for a specific number of MS...
subroutine cancel_request(req)
Cancels a specific communication request.
integer, parameter, public command_tag
Definition: ioclient.F90:34
Abstraction layer around MPI, this issues and marshals the lower level communication details...
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
subroutine, public log_master_log(level, message)
Will log just from the master process.
Definition: logging.F90:47
subroutine, public free_mpi_type(the_type)
Frees an MPI type, used in clean up.
integer function forthread_mutex_init(mutex_id, attr_id)
Definition: forthread.F90:274
integer, parameter, public double_data_type
Definition: ioclient.F90:40
subroutine, public wait_for_mpi_request(request, status)
Waits for a specific MPI request to complete, either by managing thread safety and interleaving or ju...
integer function, public data_receive(mpi_datatype, num_elements, source, dump_data, data_dump_id, description_data)
Awaits some data on the data channel. This is of the type, size from the source provided and can eith...
logical function, public test_for_inter_io(inter_io_communications, number_of_inter_io, io_communicator, command, source, data_buffer)
Tests for inter IO server communication.
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
integer, parameter, public inter_io_communication
Field type identifiers.
Definition: ioclient.F90:34
integer, parameter, public string_data_type
Definition: ioclient.F90:40
This defines some constants and procedures that are useful to the IO server and clients that call it...
Definition: ioclient.F90:3
integer, parameter, public integer_data_type
Definition: ioclient.F90:40
Map data structure that holds string (length 20 maximum) key value pairs.
Definition: collections.F90:86
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Definition: threadpool.F90:5
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
Definition: threadpool.F90:229
integer function forthread_mutex_lock(mutex_id)
Definition: forthread.F90:284
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
integer command_request_handle
Request handle representing the asynchronous P2P command request.
Collection data structures.
Definition: collections.F90:7
subroutine, public initialise_mpi_communication(provided_threading)
Initialises MPI communication.
integer, parameter, public string_length
Default length of strings.
Definition: datadefn.F90:10
integer function, dimension(5), public populate_mpi_type_extents()
Provides the type extents of the types that we are using in construction of the MPI data type...
Definition: ioclient.F90:60
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
subroutine, public cancel_requests()
Cancels all outstanding communication requests.
logical function, public get_data_description_from_name(descriptions, name, field_description)
Look up the data description that corresponds to a specific field keyed by its name.
Definition: ioclient.F90:355
integer, volatile mpi_mutex
subroutine, public append_mpi_datatype(field_start, field_end, field_array_sizes, data_type, type_extents, prev_data_type, type_index, old_types, offsets, block_counts)
Appends the MPI datatype details to the block counts, old types and offsets arrays. This will lump together multiple concurrent fields with the same type.
Definition: ioclient.F90:186
integer function, public build_mpi_datatype(data_definition, data_size_info, data_size, field_start_locations, field_end_locations, field_dimensions)
Builds the MPI type that corresponds to the data which will be received from a specific MONC process...
integer command_buffer
Buffer used to receive the command data into when it arrives on that channel.
subroutine, public waitall_for_mpi_requests(requests, count)
Waits for all MPI requests to complete, either by managing thread safety and interleaving or just a c...
subroutine, public pause_for_mpi_interleaving()
Pauses for a specific number of ms to allow for MPI interleaving, this is to avoid starvation...
integer, parameter, public map_field_type
Field data type identifiers.
Definition: ioclient.F90:38
integer, parameter, public data_tag
Definition: ioclient.F90:34
Parses the XML configuration file to produce the io configuration description which contains the data...
integer function, public get_my_io_rank(io_comm)
Retrieves my IO server rank out of the number of IO servers that are running.