MONC
io_state_reader.F90
Go to the documentation of this file.
1 
3  use iso_c_binding, only: c_int, c_char, c_null_char, c_size_t, c_ptrdiff_t, c_ptr, c_loc, c_sizeof, c_long
5  use netcdf, only : nf90_global, nf90_nowrite, nf90_inquire_attribute, nf90_open, nf90_inq_dimid, nf90_inquire_dimension, &
6  nf90_inq_varid, nf90_get_var, nf90_get_att, nf90_close
12  use mpi, only : mpi_comm_rank, mpi_comm_size
18  implicit none
19 
20 #ifndef TEST_MODE
21  private
22 #endif
23 
24  interface
25 
26  function nc_inq_dim(ncid, dimid, name, lenp) bind(C)
27  use iso_c_binding, only: c_int, c_size_t, c_char
28 
29  integer(kind=c_int), value :: ncid
30  integer(kind=c_int), value :: dimid
31  character(kind=c_char), intent(inout) :: name(*)
32  integer(kind=c_size_t), intent(out) :: lenp
33  integer(kind=c_int) :: nc_inq_dim
34  end function nc_inq_dim
35 
37  function nc_get_vara_long(ncid, varid, startp, countp, ip) bind(C)
38  use iso_c_binding, only: c_int, c_long, c_ptr
39 
40  integer(kind=c_int), value :: ncid, varid
41  type(c_ptr), value :: startp, countp
42  integer(kind=c_long), intent(out) :: ip(*)
43  integer(kind=c_int) :: nc_get_vara_long
44  end function nc_get_vara_long
45 
47  function nc_get_vars_text(ncid, varid, startp, countp, stridep, ip) bind(C)
48  use iso_c_binding, only: c_int, c_ptr, c_char
49 
50  integer(kind=c_int), value :: ncid, varid
51  type(c_ptr), value :: startp, countp, stridep
52  character(kind=c_char), intent(out) :: ip(*)
53  integer(kind=c_int) :: nc_get_vars_text
54  end function nc_get_vars_text
55  end interface
56 
57  abstract interface
59  character, dimension(:), intent(in) :: byte_data
61  end interface
62 
64 contains
65 
71  subroutine read_io_server_configuration(checkpoint_filename, io_xml_configuration, io_communicator_arg)
72  character(len=STRING_LENGTH), intent(in) :: checkpoint_filename
73  character, dimension(:), allocatable, intent(inout) :: io_xml_configuration
74  integer, intent(in) :: io_communicator_arg
75 
76  integer :: ncid, number_io_server, my_io_server_rank, ierr
77  integer :: dim_id, dim_size
78  logical :: found
79 
80  call mpi_comm_rank(io_communicator_arg, my_io_server_rank, ierr)
81  call mpi_comm_size(io_communicator_arg, number_io_server, ierr)
82  call check_netcdf_status(nf90_open(path = checkpoint_filename, mode = nf90_nowrite, ncid = ncid))
83  call check_netcdf_status(nf90_inq_dimid(ncid, "entries_directory_dim", dim_id), found)
84  if (.not. found) then
85  if (my_io_server_rank==0) then
86  call log_log(log_warn, "Restarting the IO server fresh as the checkpoint file does not contain IO state")
87  end if
88  return
89  end if
90  call check_netcdf_status(nf90_inquire_dimension(ncid, dim_id, len=dim_size))
91 
92  if (dim_size .ne. number_io_server) then
93  call log_log(log_error, "Can not restart IO server with a different number of IO servers")
94  end if
95  call get_io_server_configuration(ncid, io_xml_configuration)
96  call check_netcdf_status(nf90_close(ncid))
97  end subroutine read_io_server_configuration
98 
105  subroutine reactivate_writer_federator_state(io_configuration, writer_entries, time_points)
106  type(io_configuration_type), intent(inout) :: io_configuration
107  type(writer_type), dimension(:) :: writer_entries
108  type(hashmap_type), volatile, intent(inout) :: time_points
109 
110  integer :: ncid, ierr, i
111  character, dimension(:), allocatable :: raw_bytes
112 
113  call check_netcdf_status(nf90_open(path = options_get_string(io_configuration%options_database, "checkpoint"), &
114  mode = nf90_nowrite, ncid = ncid))
115 
116  do i=1, size(writer_entries)
117  ! Note that the different writer entries are dealt with separately for memory reasons
118  if (writer_entries(i)%include_in_io_state_write) then
119  call get_io_server_serialised_bytes(ncid, io_configuration%number_of_io_servers, io_configuration%my_io_rank, &
120  "serialised_writer_entry_"//trim(conv_to_string(i)), raw_bytes)
121  call unserialise_writer_type(writer_entries(i), raw_bytes)
122  deallocate(raw_bytes)
123  end if
124  end do
125 
126  call get_io_server_serialised_bytes(ncid, io_configuration%number_of_io_servers, io_configuration%my_io_rank, &
127  "serialised_timeaveraged_manipulation", raw_bytes)
129  deallocate(raw_bytes)
130 
131  call get_io_server_serialised_bytes(ncid, io_configuration%number_of_io_servers, io_configuration%my_io_rank, &
132  "serialised_instantaneous_manipulation", raw_bytes)
134  deallocate(raw_bytes)
135 
136  call get_io_server_serialised_bytes(ncid, io_configuration%number_of_io_servers, io_configuration%my_io_rank, &
137  "serialised_timepoints", raw_bytes)
138  call restart_writer_state_timepoints(time_points, raw_bytes)
139  deallocate(raw_bytes)
140 
141  call check_netcdf_status(nf90_close(ncid))
142  end subroutine reactivate_writer_federator_state
143 
148  subroutine reactivate_writer_field_manager_state(io_configuration, unserialise_writer_field_manager)
149  type(io_configuration_type), intent(inout) :: io_configuration
150  procedure(writer_field_manager_unserialise_state) :: unserialise_writer_field_manager
151 
152  integer :: ncid, ierr
153  character, dimension(:), allocatable :: raw_bytes
154 
155  call check_netcdf_status(nf90_open(path = options_get_string(io_configuration%options_database, "checkpoint"), &
156  mode = nf90_nowrite, ncid = ncid))
157 
158  call get_io_server_serialised_bytes(ncid, io_configuration%number_of_io_servers, io_configuration%my_io_rank, &
159  "serialised_writer_manager", raw_bytes)
160  call restart_writer_field_manager_from_checkpoint(unserialise_writer_field_manager, raw_bytes)
161  deallocate(raw_bytes)
162 
163  call check_netcdf_status(nf90_close(ncid))
165 
169  subroutine get_io_server_configuration(ncid, io_xml_configuration)
170  integer, intent(in) :: ncid
171  character, dimension(:), allocatable, intent(inout) :: io_xml_configuration
172 
173  integer :: dim_id, var_id, dim_size
174  logical :: found
175 
176  call check_netcdf_status(nf90_inq_dimid(ncid, "io_configuration_dim", dim_id), found)
177  if (.not. found) return
178  call check_netcdf_status(nf90_inquire_dimension(ncid, dim_id, len=dim_size))
179 
180  call check_netcdf_status(nf90_inq_varid(ncid, "io_configuration", var_id), found)
181  if (.not. found) return
182  allocate(io_xml_configuration(dim_size))
183  call check_netcdf_status(nf90_get_var(ncid, var_id, io_xml_configuration, count=(/dim_size/)))
184  end subroutine get_io_server_configuration
185 
193  subroutine get_io_server_serialised_bytes(ncid, number_io_server, my_io_server_rank, base_key, raw_bytes)
194  integer, intent(in) :: ncid, number_io_server, my_io_server_rank
195  character(len=*), intent(in) :: base_key
196  character, dimension(:), allocatable, intent(out) :: raw_bytes
197 
198  integer :: dim_id, var_id
199  logical :: found
200  integer(kind=8) :: dim_size, serialised_range(2), number_serialised_entries
201 
202  integer(kind=c_int) :: cncid, cdimid, cstatus, cvarid
203  integer(kind=c_size_t) :: cdlen
204  character(len=256) :: tmpname
205  integer(KIND=c_size_t), target :: cstart(1), ccounts(1)
206  Integer(KIND=c_ptrdiff_t), target :: cstrides(1)
207  type(c_ptr) :: cstartptr, ccountsptr, cstridesptr
208 
209  cncid=ncid
210  cstartptr=c_loc(cstart)
211  ccountsptr=c_loc(ccounts)
212  cstridesptr=c_loc(cstrides)
213 
214  call check_netcdf_status(nf90_inq_dimid(ncid, trim(base_key)//"_dim", dim_id), found)
215  if (.not. found) return
216 
217  cdimid=dim_id-1
218  call check_netcdf_status(nc_inq_dim(cncid, cdimid, tmpname, cdlen))
219  dim_size=cdlen
220 
221  call check_netcdf_status(nf90_inq_varid(ncid, trim(base_key)//"_directory", var_id), found)
222  if (.not. found) return
223  cvarid=var_id-1
224  cstart(1)=my_io_server_rank
225  if (my_io_server_rank .lt. number_io_server-1) then
226  ccounts(1)=2
227  call check_netcdf_status(nc_get_vara_long(cncid, cvarid, cstartptr, ccountsptr, serialised_range))
228  if (serialised_range(2) .gt. dim_size) then
229  call log_log(log_error, "Serialised entry beyond size in the file")
230  end if
231  else
232  ccounts(1)=1
233  call check_netcdf_status(nc_get_vara_long(cncid, cvarid, cstartptr, ccountsptr, serialised_range))
234  serialised_range(2)=dim_size
235  end if
236  number_serialised_entries=(serialised_range(2)-serialised_range(1)) + 1
237  call check_netcdf_status(nf90_inq_varid(ncid, trim(base_key), var_id), found)
238  if (.not. found) return
239  allocate(raw_bytes(number_serialised_entries))
240 
241  cvarid=var_id-1
242  cstart=serialised_range(1)-1
243  ccounts=number_serialised_entries
244  cstrides(1)=1
245  call check_netcdf_status(nc_get_vars_text(cncid, cvarid, cstartptr, ccountsptr, cstridesptr, raw_bytes))
246  end subroutine get_io_server_serialised_bytes
247 
251  subroutine restart_writer_state_from_checkpoint(writer_entries, raw_bytes)
252  type(writer_type), dimension(:) :: writer_entries
253  character, dimension(:), allocatable :: raw_bytes
254 
255  integer :: i, number_entries, current_point, byte_size
256 
257  if (.not. allocated(raw_bytes)) then
258  call log_master_log(log_warn, "On restart no writer state in checkpoint file")
259  return
260  end if
261  current_point=1
262  number_entries=unpack_scalar_integer_from_bytedata(raw_bytes, current_point)
263  if (number_entries .ne. size(writer_entries)) then
264  call log_log(log_error, "On restart have a different number of configured entries than those in the checkpoint file")
265  end if
266  do i=1, size(writer_entries)
267  if (writer_entries(i)%include_in_io_state_write) then
268  byte_size=unpack_scalar_integer_from_bytedata(raw_bytes, current_point)
269  call unserialise_writer_type(writer_entries(i), raw_bytes(current_point:current_point+byte_size-1))
270  current_point=current_point+byte_size
271  end if
272  end do
274 
278  subroutine restart_writer_state_timepoints(time_points, raw_bytes)
279  type(hashmap_type), volatile, intent(inout) :: time_points
280  character, dimension(:), allocatable :: raw_bytes
281 
282  integer :: i, number_entries, current_point, byte_size, timestep_key
283  real(kind=DEFAULT_PRECISION) :: r_value
284 
285  if (.not. allocated(raw_bytes)) then
286  call log_master_log(log_warn, "On restart no writer state timepoints in checkpoint file")
287  return
288  end if
289  current_point=1
290  number_entries=unpack_scalar_integer_from_bytedata(raw_bytes, current_point)
291  do i=1, number_entries
292  timestep_key=unpack_scalar_integer_from_bytedata(raw_bytes, current_point)
293  r_value=unpack_scalar_dp_real_from_bytedata(raw_bytes, current_point)
294  call c_put_real(time_points, conv_to_string(timestep_key), r_value)
295  end do
296  end subroutine restart_writer_state_timepoints
297 
300  subroutine restart_timeaveraged_state_from_checkpoint(raw_bytes)
301  character, dimension(:), allocatable :: raw_bytes
302 
303  if (.not. allocated(raw_bytes)) then
304  call log_master_log(log_warn, "On restart no time averaged state in checkpoint file")
305  return
306  end if
307  call unserialise_time_averaged_state(raw_bytes)
309 
312  subroutine restart_instantaneous_state_from_checkpoint(raw_bytes)
313  character, dimension(:), allocatable :: raw_bytes
314 
315  if (.not. allocated(raw_bytes)) then
316  call log_master_log(log_warn, "On restart no instantaneous state in checkpoint file")
317  return
318  end if
319  call unserialise_instantaneous_state(raw_bytes)
321 
325  subroutine restart_writer_field_manager_from_checkpoint(unserialise_writer_field_manager, raw_bytes)
326  procedure(writer_field_manager_unserialise_state) :: unserialise_writer_field_manager
327  character, dimension(:), allocatable :: raw_bytes
328 
329  if (.not. allocated(raw_bytes)) then
330  call log_master_log(log_warn, "On restart no writer field manager state in checkpoint file")
331  return
332  end if
333  call unserialise_writer_field_manager(raw_bytes)
Performs time averaged, time manipulation and only returns a value if the output frequency determines...
subroutine, public unserialise_writer_type(writer_to_unserialise, byte_data)
Unserialises some byte data into the writer in order to recreate the state of the writer...
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
subroutine, public unserialise_instantaneous_state(byte_data)
Unpacks some serialised byte data to initialise this manipulator to some previous state...
character(len=string_length) function, public options_get_string(options_database, key, index)
Retrieves a string value from the database that matches the provided key.
Reads the IO server state that was stored in a NetCDF checkpoint file.
ISO C binding for NetCDF get long scalar variable, required for retrieving long variables.
Contains functionality for managing and extracting data from the raw data dumps that the IO server re...
Definition: datautils.F90:3
Performs instantaneous time manipulation and only returns a value if the output frequency determines ...
Logging utility.
Definition: logging.F90:2
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
subroutine, public log_master_log(level, message)
Will log just from the master process.
Definition: logging.F90:47
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
Definition: collections.F90:94
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
Converts data types to strings.
Definition: conversions.F90:36
subroutine, public unserialise_time_averaged_state(byte_data)
Unserialises some byte data to initialise the state from some previous version.
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
Writer types which are shared across writing functionality. Also includes serialisation functionality...
Definition: writer_types.F90:2
subroutine, public check_netcdf_status(status, found_flag)
Will check a NetCDF status and write to log_log error any decoded statuses. Can be used to decode whe...
Definition: netcdf_misc.F90:19
ISO C binding for NetCDF inquire dimension, required for 64 bit dimension length. ...
real(kind=double_precision) function, public unpack_scalar_dp_real_from_bytedata(data, start_point)
Unpacks a double precision scalar real from some byte data, this is a very simple unpack routine wrap...
Definition: datautils.F90:89
subroutine restart_timeaveraged_state_from_checkpoint(raw_bytes)
Will restart the time averaged manipulation state from the checkpoint file.
Collection data structures.
Definition: collections.F90:7
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Definition: logging.F90:12
subroutine restart_writer_state_from_checkpoint(writer_entries, raw_bytes)
Restarts the writer state from a specific checkpoint byte data chunk of memory.
integer, parameter, public string_length
Default length of strings.
Definition: datadefn.F90:10
integer function, public unpack_scalar_integer_from_bytedata(data, start_point)
Unpacks a scalar integer from some byte data, this is a very simple unpack routine wrapping the trans...
Definition: datautils.F90:34
subroutine get_io_server_serialised_bytes(ncid, number_io_server, my_io_server_rank, base_key, raw_bytes)
Retrieves some IO server serialised bytes which will make up the state of a specific facet...
subroutine, public read_io_server_configuration(checkpoint_filename, io_xml_configuration, io_communicator_arg)
Reads the IO server configuration, which is the XML configuration initially run with and stored in th...
Manages the options database. Contains administration functions and deduce runtime options from the c...
subroutine, public reactivate_writer_federator_state(io_configuration, writer_entries, time_points)
Reactivates the writer federator and everything beneath it (i.e. just not the writer field manager...
subroutine restart_instantaneous_state_from_checkpoint(raw_bytes)
Will restart the instantaneous manipulation state from the checkpoint file.
subroutine restart_writer_state_timepoints(time_points, raw_bytes)
Restarts the writer state timepoints held in the writer federator.
subroutine restart_writer_field_manager_from_checkpoint(unserialise_writer_field_manager, raw_bytes)
Will restart the field manager state from the checkpoint file.
NetCDF misc functionality which can be shared between modules that work with NetCDF files...
Definition: netcdf_misc.F90:2
ISO C binding for NetCDF get text vars, required for 64 bit start, count & stride.
subroutine, public reactivate_writer_field_manager_state(io_configuration, unserialise_writer_field_manager)
Reactivates the writer field manager state from the checkpoint file, for memory reasons this will ope...
Parses the XML configuration file to produce the io configuration description which contains the data...
subroutine get_io_server_configuration(ncid, io_xml_configuration)
Retrieves the IO server XML configuration from the checkpoint file.
Puts a double precision real key-value pair into the map.