MONC
io_state_writer.F90
Go to the documentation of this file.
1 
3  use iso_c_binding, only: c_int, c_char, c_null_char, c_size_t, c_ptrdiff_t, c_ptr, c_loc, c_sizeof, c_long
5  use netcdf, only : nf90_char, nf90_byte, nf90_int, nf90_def_var, nf90_put_var, nf90_def_dim
13  use mpi, only : mpi_in_place, mpi_int, mpi_long_long_int, mpi_sum, mpi_status_ignore, mpi_wait
18  implicit none
19 
20 #ifndef TEST_MODE
21  private
22 #endif
23 
24  interface
25 
26  function nc_def_dim(ncid, name, nlen, idp) bind(C)
27  use iso_c_binding, only: c_int, c_size_t, c_char
28 
29  integer(kind=c_int), value :: ncid
30  character(kind=c_char), intent(in) :: name(*)
31  integer(kind=c_size_t), value :: nlen
32  integer(kind=c_int), intent(inout) :: idp
33  integer(kind=c_int) :: nc_def_dim
34  end function nc_def_dim
35 
37  function nc_put_vars_text(ncid, varid, startp, countp, stridep, op) bind(C)
38  use iso_c_binding, only: c_int, c_ptr, c_char
39 
40  integer(kind=c_int), value :: ncid, varid
41  type(c_ptr), value :: startp, countp, stridep
42  character(kind=c_char), intent(in) :: op(*)
43  integer(kind=c_int) :: nc_put_vars_text
44  end function nc_put_vars_text
45 
47  function nc_put_var1_long(ncid, varid, indexp, op) bind(C)
48  use iso_c_binding, only: c_int, c_ptr, c_long
49 
50  integer(kind=c_int), value :: ncid, varid
51  type(c_ptr), value :: indexp
52  integer(kind=c_long), intent(in) :: op
53  integer(kind=c_int) :: nc_put_var1_long
54  end function nc_put_var1_long
55 
57  function nc_def_var(ncid, name, xtype, ndims, dimidsp, varidp) bind(C)
58  use iso_c_binding, only: c_int, c_char
59 
60  integer(kind=c_int), value :: ncid
61  character(kind=c_char), intent(in) :: name(*)
62  integer(kind=c_int), value :: xtype
63  integer(kind=c_int), value :: ndims
64  integer(kind=c_int), intent(in) :: dimidsp(*)
65  integer(kind=c_int), intent(out) :: varidp
66  integer(kind=c_int) :: nc_def_var
67  end function nc_def_var
68  end interface
69 
70  abstract interface
73 
75  character, dimension(:), allocatable, intent(inout) :: byte_data
77 
79  integer, intent(in) :: timestep
81  end interface
82 
83  character, dimension(:), allocatable :: serialised_writer_entries, serialised_timeaveraged_manipulation_state, &
85  integer(kind=8), dimension(:), allocatable :: global_writer_entry_byte_size, my_writer_entry_start_point, &
91 
94 contains
95 
101  subroutine set_serialise_write_field_manager_state(serialise_writer_field_manager_state_arg, &
102  prepare_to_serialise_writer_field_manager_state_arg, is_write_field_manager_up_to_date_arg)
103  procedure(writer_field_manager_serialise_state) :: serialise_writer_field_manager_state_arg
104  procedure(writer_field_manager_prepare_to_serialise_state) :: prepare_to_serialise_writer_field_manager_state_arg
105  procedure(write_field_manager_determine_if_up_to_date) :: is_write_field_manager_up_to_date_arg
106 
107  integer(kind=c_size_t) :: size_t_test
108 
109  serialise_writer_field_manager_state=>serialise_writer_field_manager_state_arg
110  prepare_to_serialise_writer_field_manager_state=>prepare_to_serialise_writer_field_manager_state_arg
111  is_write_field_manager_up_to_date=>is_write_field_manager_up_to_date_arg
112 
113  if (c_sizeof(size_t_test) .lt. 8) then
114  call log_master_log(log_warn, &
115  "Your system's size_t is not 64 bit, this will limit the size of IO server state storage to 4GB")
116  end if
118 
122  logical function is_io_server_state_writer_ready(timestep)
123  integer, intent(in) :: timestep
124 
126  end function is_io_server_state_writer_ready
127 
133  subroutine prepare_io_server_state_storage(io_configuration, writer_entries, time_points)
134  type(io_configuration_type), intent(inout) :: io_configuration
135  type(writer_type), volatile, dimension(:), intent(inout) :: writer_entries
136  type(hashmap_type), volatile, intent(inout) :: time_points
137 
138  integer :: i, current_data_point, number_writer_entries_included, ierr, current_index
139  character, dimension(:), allocatable :: dvt_byte_data, temp
140 
141  number_writer_entries_included=0
142  do i=1, size(writer_entries)
143  if (writer_entries(i)%include_in_io_state_write) number_writer_entries_included=number_writer_entries_included+1
144  end do
145 
146  allocate(local_writer_entry_byte_size(number_writer_entries_included+4), &
147  global_writer_entry_byte_size(number_writer_entries_included+4), &
148  my_writer_entry_start_point(number_writer_entries_included+4))
149 
150  current_index=1
151  do i=1, size(writer_entries)
152  if (writer_entries(i)%include_in_io_state_write) then
153  local_writer_entry_byte_size(current_index)=prepare_to_serialise_writer_type(writer_entries(i))
154  current_index=current_index+1
155  end if
156  end do
157 
162 
165  call lock_mpi()
166  call mpi_iallreduce(mpi_in_place, global_writer_entry_byte_size, size(global_writer_entry_byte_size), mpi_long_long_int, &
167  mpi_sum, io_configuration%io_communicator, global_writer_entry_byte_size_request, ierr)
168  call mpi_iscan(mpi_in_place, my_writer_entry_start_point, size(my_writer_entry_start_point), mpi_long_long_int, &
169  mpi_sum, io_configuration%io_communicator, my_writer_entry_start_request, ierr)
170  call unlock_mpi()
171  end subroutine prepare_io_server_state_storage
172 
176  integer(kind=8) function prepare_to_serialise_writer_entries_time_points(time_points)
177  type(hashmap_type), volatile, intent(inout) :: time_points
178 
179  real(kind=DEFAULT_PRECISION) :: r_value
180 
182  ((kind(prepare_to_serialise_writer_entries_time_points) + kind(r_value)) * c_size(time_points))
184 
188  subroutine serialise_writer_entries_time_points(time_points, byte_data)
189  type(hashmap_type), volatile, intent(inout) :: time_points
190  character, dimension(:), allocatable, intent(inout) :: byte_data
191 
192  integer :: key, current_data_point
193  type(mapentry_type) :: map_entry
194  type(iterator_type) :: iterator
195  real(kind=DEFAULT_PRECISION) :: r_value
196 
197  current_data_point=1
198  current_data_point=pack_scalar_field(byte_data, current_data_point, c_size(time_points))
199  iterator=c_get_iterator(time_points)
200  do while (c_has_next(iterator))
201  map_entry=c_next_mapentry(iterator)
202  key=conv_to_integer(map_entry%key)
203  r_value=c_get_real(map_entry)
204  current_data_point=pack_scalar_field(byte_data, current_data_point, key)
205  current_data_point=pack_scalar_field(byte_data, current_data_point, double_real_value=r_value)
206  end do
208 
213  subroutine define_io_server_state_contributions(io_configuration, writer_entries, time_points, netcdf_file)
214  type(io_configuration_type), intent(inout) :: io_configuration
215  type(writer_type), volatile, dimension(:), intent(inout) :: writer_entries
216  type(hashmap_type), volatile, intent(inout) :: time_points
217  type(netcdf_diagnostics_type), intent(inout) :: netcdf_file
218 
219  integer :: ncdf_dimid, ncdf_varid, current_index, i
220  character, dimension(:), allocatable :: byte
221 
222  call prepare_io_server_state_storage(io_configuration, writer_entries, time_points)
223 
224  call lock_mpi()
225  call check_netcdf_status(nf90_def_dim(netcdf_file%ncid, "io_configuration_dim", &
226  size(io_configuration%text_configuration), ncdf_dimid))
227  call c_put_integer(netcdf_file%dimension_to_id, "io_configuration_dim", ncdf_dimid)
228 
229  call check_netcdf_status(nf90_def_var(netcdf_file%ncid, "io_configuration", nf90_char, ncdf_dimid, ncdf_varid))
230  call c_put_integer(netcdf_file%variable_to_id, "io_configuration", ncdf_varid)
231 
232  call check_netcdf_status(nf90_def_dim(netcdf_file%ncid, "entries_directory_dim", io_configuration%number_of_io_servers, &
233  ncdf_dimid))
234  call unlock_mpi()
235  call c_put_integer(netcdf_file%dimension_to_id, "entries_directory_dim", ncdf_dimid)
236 
238 
239  current_index=1
240  do i=1, size(writer_entries)
241  if (writer_entries(i)%include_in_io_state_write) then
242  call define_state_storage(netcdf_file, ncdf_dimid, "serialised_writer_entry_"//trim(conv_to_string(i)), &
243  global_writer_entry_byte_size(current_index))
244  current_index=current_index+1
245  end if
246  end do
247 
248  call define_state_storage(netcdf_file, ncdf_dimid, "serialised_timeaveraged_manipulation", &
249  global_writer_entry_byte_size(current_index))
250  call define_state_storage(netcdf_file, ncdf_dimid, "serialised_instantaneous_manipulation", &
251  global_writer_entry_byte_size(current_index+1))
252  call define_state_storage(netcdf_file, ncdf_dimid, "serialised_writer_manager", global_writer_entry_byte_size(current_index+2))
253  call define_state_storage(netcdf_file, ncdf_dimid, "serialised_timepoints", global_writer_entry_byte_size(current_index+3))
255 
263  subroutine define_state_storage(netcdf_file, entries_directory_dim_id, base_key, expected_global_entries)
264  type(netcdf_diagnostics_type), intent(inout) :: netcdf_file
265  integer, intent(in) :: entries_directory_dim_id
266  integer(kind=8), intent(in) :: expected_global_entries
267  character(len=*), intent(in) :: base_key
268 
269  integer :: ncdf_varid
270  integer :: ncdf_dimid
271  integer(kind=c_int) :: cncid, cdimid, cvarid, cxtype, cnvdims
272  integer(kind=c_int) :: cvdims(1)
273 
274  cncid = netcdf_file%ncid
275  cnvdims=1
276  cvdims(1)=entries_directory_dim_id-1
277 
278  cxtype=10 ! NC_INT64 type, note that this is signed as Fortran doesn't really like unsigned integers
279  call lock_mpi()
280  call check_netcdf_status(nc_def_var(cncid, trim(base_key)//"_directory"//c_null_char, cxtype, &
281  cnvdims, cvdims, cvarid))
282  ncdf_varid=cvarid+1
283 
284  call c_put_integer(netcdf_file%variable_to_id, trim(base_key)//"_directory", ncdf_varid)
285 
286  call check_netcdf_status(nc_def_dim(cncid, trim(base_key)//"_dim"//c_null_char, &
287  int(expected_global_entries, c_size_t), cdimid))
288  ncdf_dimid=cdimid+1
289  call c_put_integer(netcdf_file%dimension_to_id, trim(base_key)//"_dim", ncdf_dimid)
290  call check_netcdf_status(nf90_def_var(netcdf_file%ncid, trim(base_key), nf90_char, ncdf_dimid, ncdf_varid))
291  call c_put_integer(netcdf_file%variable_to_id, trim(base_key), ncdf_varid)
292  call unlock_mpi()
293  end subroutine define_state_storage
294 
302  subroutine write_io_server_state(io_configuration, writer_entries, time_points, netcdf_file)
303  type(io_configuration_type), intent(inout) :: io_configuration
304  type(writer_type), volatile, dimension(:), intent(inout) :: writer_entries
305  type(hashmap_type), volatile, intent(inout) :: time_points
306  type(netcdf_diagnostics_type), intent(inout) :: netcdf_file
307 
308  integer :: field_id, var_id, current_index, i
309  character, dimension(:), allocatable :: serialised_bytes
310 
311  field_id=c_get_integer(netcdf_file%variable_to_id, "io_configuration")
312 
313  if (io_configuration%my_io_rank == 0) then
314  call lock_mpi()
315  call check_netcdf_status(nf90_put_var(netcdf_file%ncid, field_id, io_configuration%text_configuration, &
316  count=(/size(io_configuration%text_configuration)/)))
317  call unlock_mpi()
318  end if
320 
321  current_index=1
322  do i=1, size(writer_entries)
323  if (writer_entries(i)%include_in_io_state_write) then
324  allocate(serialised_bytes(local_writer_entry_byte_size(current_index)))
325  call serialise_writer_type(writer_entries(i), serialised_bytes)
326  call write_state_storage(netcdf_file, my_writer_entry_start_point(current_index), io_configuration%my_io_rank, &
327  "serialised_writer_entry_"//trim(conv_to_string(i)), serialised_bytes)
328  deallocate(serialised_bytes)
329  current_index=current_index+1
330  end if
331  end do
332 
333  allocate(serialised_bytes(local_writer_entry_byte_size(current_index)))
334  call serialise_time_averaged_state(serialised_bytes)
335  call write_state_storage(netcdf_file, my_writer_entry_start_point(current_index), io_configuration%my_io_rank, &
336  "serialised_timeaveraged_manipulation", serialised_bytes)
337  deallocate(serialised_bytes)
338 
339  allocate(serialised_bytes(local_writer_entry_byte_size(current_index+1)))
340  call serialise_instantaneous_state(serialised_bytes)
341  call write_state_storage(netcdf_file, my_writer_entry_start_point(current_index+1), io_configuration%my_io_rank, &
342  "serialised_instantaneous_manipulation", serialised_bytes)
343  deallocate(serialised_bytes)
344 
345  allocate(serialised_bytes(local_writer_entry_byte_size(current_index+2)))
346  call serialise_writer_field_manager_state(serialised_bytes)
347  call write_state_storage(netcdf_file, my_writer_entry_start_point(current_index+2), io_configuration%my_io_rank, &
348  "serialised_writer_manager", serialised_bytes)
349  deallocate(serialised_bytes)
350 
351  allocate(serialised_bytes(local_writer_entry_byte_size(current_index+3)))
352  call serialise_writer_entries_time_points(time_points, serialised_bytes)
353  call write_state_storage(netcdf_file, my_writer_entry_start_point(current_index+3), io_configuration%my_io_rank, &
354  "serialised_timepoints", serialised_bytes)
355  deallocate(serialised_bytes)
356 
358  end subroutine write_io_server_state
359 
367  subroutine write_state_storage(netcdf_file, writer_entry_start_point, my_io_rank, base_key, raw_byte_code)
368  type(netcdf_diagnostics_type), intent(inout) :: netcdf_file
369  integer, intent(in) :: my_io_rank
370  integer(kind=8), intent(in) :: writer_entry_start_point
371  character(len=*), intent(in) :: base_key
372  character, dimension(:), intent(in) :: raw_byte_code
373 
374  integer(kind=c_int) :: cncid, cvarid, cndims, cstat1, cstatus
375  integer(kind=c_size_t), target :: cstart(1), ccounts(1)
376  integer(kind=c_ptrdiff_t), target :: cstrides(1)
377  integer(kind=c_long) :: c_writer_corrected_start_point
378  type(c_ptr) :: cstartptr, ccountsptr, cstridesptr
379 
380  cncid=netcdf_file%ncid
381  cstart(1)=my_io_rank
382  cvarid=c_get_integer(netcdf_file%variable_to_id, trim(base_key)//"_directory")-1
383  c_writer_corrected_start_point=(writer_entry_start_point-size(raw_byte_code))+1
384 
385  ccounts(1)=size(raw_byte_code)
386  cstrides(1)=1
387 
388  cstartptr=c_loc(cstart)
389  ccountsptr=c_loc(ccounts)
390  cstridesptr=c_loc(cstrides)
391 
392  call lock_mpi()
393  call check_netcdf_status(nc_put_var1_long(cncid, cvarid, cstartptr, c_writer_corrected_start_point))
394  cvarid=c_get_integer(netcdf_file%variable_to_id, trim(base_key))-1
395  cstart(1)=c_writer_corrected_start_point-1
396  call check_netcdf_status(nc_put_vars_text(cncid, cvarid, cstartptr, ccountsptr, cstridesptr, raw_byte_code))
397  call unlock_mpi()
398  end subroutine write_state_storage
ISO C binding interface for NetCDF put long scalar, needed to support putting longs into file...
Performs time averaged, time manipulation and only returns a value if the output frequency determines...
integer(kind=8), dimension(:), allocatable global_writer_entry_byte_size
subroutine, public set_serialise_write_field_manager_state(serialise_writer_field_manager_state_arg, prepare_to_serialise_writer_field_manager_state_arg, is_write_field_manager_up_to_date_arg)
Sets the procedure to call for serialises the field manager state, this is handled in this manner due...
character, dimension(:), allocatable serialised_writer_field_manager_state
logical function, public is_io_server_state_writer_ready(timestep)
Determines whether the IO server state writer is ready (i.e. state is at a specific level for the tim...
Puts an integer key-value pair into the map.
integer(kind=8), dimension(:), allocatable local_writer_entry_byte_size
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
Performs instantaneous time manipulation and only returns a value if the output frequency determines ...
character, dimension(:), allocatable serialised_instantaneous_manipulation_state
Logging utility.
Definition: logging.F90:2
integer(kind=8) function, public prepare_to_serialise_writer_type(writer_to_serialise)
Prepares to serialise the writer type by issuing locks and determining the size of serialised bytes n...
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17
ISO C binding interface for NetCDF dimension definition, needed to support 64 bit lengths...
Abstraction layer around MPI, this issues and marshals the lower level communication details...
ISO C binding interface for NetCDF put variable text, needed to support 64 bit starts, counts and strides.
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
subroutine, public log_master_log(level, message)
Will log just from the master process.
Definition: logging.F90:47
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
Definition: collections.F90:94
Gets a specific integer element out of the list, stack, queue or map with the corresponding key...
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
subroutine, public wait_for_mpi_request(request, status)
Waits for a specific MPI request to complete, either by managing thread safety and interleaving or ju...
Converts data types to strings.
Definition: conversions.F90:36
procedure(write_field_manager_determine_if_up_to_date), pointer is_write_field_manager_up_to_date
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
Writer types which are shared across writing functionality. Also includes serialisation functionality...
Definition: writer_types.F90:2
subroutine, public check_netcdf_status(status, found_flag)
Will check a NetCDF status and write to log_log error any decoded statuses. Can be used to decode whe...
Definition: netcdf_misc.F90:19
subroutine, public write_io_server_state(io_configuration, writer_entries, time_points, netcdf_file)
Packags up and writes the actual IO server state into the NetCDF file. The act of serialisation will ...
This defines some constants and procedures that are useful to the IO server and clients that call it...
Definition: ioclient.F90:3
subroutine serialise_writer_entries_time_points(time_points, byte_data)
Serialises the writer entry time points which are held in a hashmap.
integer(kind=8), dimension(:), allocatable my_writer_entry_start_point
integer(kind=8) function, public prepare_to_serialise_instantaneous_state()
Prepares to serialise the instantaneous state, both determines the byte storage size and issues any l...
procedure(writer_field_manager_serialise_state), pointer serialise_writer_field_manager_state
Returns the number of elements in the collection.
integer(kind=8) function prepare_to_serialise_writer_entries_time_points(time_points)
Prepares to serialise the writer entry time points.
character, dimension(:), allocatable serialised_writer_entries_time_points
procedure(writer_field_manager_prepare_to_serialise_state), pointer prepare_to_serialise_writer_field_manager_state
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
Definition: collections.F90:7
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Definition: logging.F90:12
subroutine, public serialise_time_averaged_state(byte_data)
Serialises the state of this manipulator so that it can be restarted later on. Releases any locks iss...
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
character, dimension(:), allocatable serialised_timeaveraged_manipulation_state
subroutine, public serialise_instantaneous_state(byte_data)
Will serialise the state of this manipulator so that it can be later restarted. Any locks issued duri...
subroutine define_state_storage(netcdf_file, entries_directory_dim_id, base_key, expected_global_entries)
Defines some state storate for a specific facet of the IO server. This creates the directory (locatio...
subroutine, public serialise_writer_type(writer_to_serialise, byte_data)
Serialises a specific writer type into byte data (for storage or transmission.) Releases any locks is...
subroutine, public define_io_server_state_contributions(io_configuration, writer_entries, time_points, netcdf_file)
Defines the dimensions and variables in a NetCDF file that consitute the IO server current state...
subroutine prepare_io_server_state_storage(io_configuration, writer_entries, time_points)
Will determine the size of the package for different facets of the IO server state and kick off non-b...
NetCDF misc functionality which can be shared between modules that work with NetCDF files...
Definition: netcdf_misc.F90:2
integer(kind=8) function, public prepare_to_serialise_time_averaged_state()
Prepares to serialise the time averaged state values. Both determines the storage size required and a...
Converts data types to integers.
Definition: conversions.F90:47
subroutine write_state_storage(netcdf_file, writer_entry_start_point, my_io_rank, base_key, raw_byte_code)
Writes out the state for a specific facet into the NetCDF file. Note that this uses the ISO C binding...
Gets a specific double precision real element out of the list, stack, queue or map with the correspon...
Parses the XML configuration file to produce the io configuration description which contains the data...
ISO C binding interface for NetCDF define variable, needed to support defining a long scalar variable...
The IO server state module which will write out the current state of the IO server to a NetCDF file...
integer function, public pack_scalar_field(buffer, start_offset, int_value, real_value, single_real_value, double_real_value, string_value, logical_value)
Packs the data of a scalar field into a buffer.
Definition: ioclient.F90:312
character, dimension(:), allocatable serialised_writer_entries