MONC
writer_federator.F90
Go to the documentation of this file.
1 
34  use grids_mod, only : z_index, y_index, x_index
35  use mpi, only : mpi_int, mpi_max
37  implicit none
38 
39 #ifndef TEST_MODE
40  private
41 #endif
42 
43  type(writer_type), volatile, dimension(:), allocatable :: writer_entries
46 
48  logical, volatile :: currently_writing
49 
53 contains
54 
57  subroutine initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
58  type(io_configuration_type), intent(inout) :: io_configuration
59  type(hashmap_type), intent(inout) :: diagnostic_generation_frequency
60  logical, intent(in) :: continuation_run
61 
62  integer :: i, j, number_contents, current_field_index
63  type(hashset_type) :: writer_field_names, duplicate_field_names
64 
68 
69  currently_writing=.false.
70 
74 
75  allocate(writer_entries(io_configuration%number_of_writers))
76  do i=1, io_configuration%number_of_writers
77  current_field_index=0
78  number_contents=io_configuration%file_writers(i)%number_of_contents
79  allocate(writer_entries(i)%contents(get_total_number_writer_fields(io_configuration, i)))
80  writer_entries(i)%filename=io_configuration%file_writers(i)%file_name
81  writer_entries(i)%title=io_configuration%file_writers(i)%title
82  writer_entries(i)%write_on_terminate=io_configuration%file_writers(i)%write_on_terminate
83  writer_entries(i)%include_in_io_state_write=io_configuration%file_writers(i)%include_in_io_state_write
84  call check_thread_status(forthread_mutex_init(writer_entries(i)%trigger_and_write_mutex, -1))
85  call check_thread_status(forthread_mutex_init(writer_entries(i)%num_fields_to_write_mutex, -1))
86  call check_thread_status(forthread_mutex_init(writer_entries(i)%pending_writes_mutex, -1))
87  writer_entries(i)%write_on_model_time=io_configuration%file_writers(i)%write_on_model_time
88  if (writer_entries(i)%write_on_model_time) then
89  writer_entries(i)%write_timestep_frequency=0
90  writer_entries(i)%write_time_frequency=io_configuration%file_writers(i)%write_time_frequency
91  else
92  writer_entries(i)%write_time_frequency=0
93  writer_entries(i)%write_timestep_frequency=io_configuration%file_writers(i)%write_timestep_frequency
94  end if
95  writer_entries(i)%previous_write_time=0
96  writer_entries(i)%defined_write_time=io_configuration%file_writers(i)%write_time_frequency
97  writer_entries(i)%latest_pending_write_time=0
98  writer_entries(i)%latest_pending_write_timestep=0
99  writer_entries(i)%contains_io_status_dump=.false.
100  do j=1, number_contents
101  if (io_configuration%file_writers(i)%contents(j)%facet_type == group_type) then
102  current_field_index=add_group_of_fields_to_writer_entry(io_configuration, i, j, current_field_index, &
103  writer_field_names, duplicate_field_names, diagnostic_generation_frequency)
104  else if (io_configuration%file_writers(i)%contents(j)%facet_type == field_type) then
105  current_field_index=current_field_index+add_field_to_writer_entry(io_configuration, &
106  i, j, current_field_index, io_configuration%file_writers(i)%contents(j)%facet_name, "", writer_field_names, &
107  duplicate_field_names, diagnostic_generation_frequency)
108  else if (io_configuration%file_writers(i)%contents(j)%facet_type == io_state_type) then
109  writer_entries(i)%contains_io_status_dump=.true.
110  end if
111  end do
112  if (.not. c_is_empty(duplicate_field_names)) call handle_duplicate_field_names(writer_entries(i), duplicate_field_names)
113  call c_free(writer_field_names)
114  call c_free(duplicate_field_names)
115  end do
116  if (continuation_run) then
118  end if
119  end subroutine initialise_writer_federator
120 
122  subroutine finalise_writer_federator()
129  end subroutine finalise_writer_federator
130 
131  subroutine inform_writer_federator_time_point(io_configuration, source, data_id, data_dump)
132  type(io_configuration_type), intent(inout) :: io_configuration
133  integer, intent(in) :: source, data_id
134  character, dimension(:), allocatable, intent(in) :: data_dump
135 
136  real(kind=DEFAULT_PRECISION) :: time
137  integer :: timestep
138  character(len=STRING_LENGTH) :: timestep_key
139 
140  if (is_field_present(io_configuration, source, data_id, "time") .and. &
141  is_field_present(io_configuration, source, data_id, "timestep")) then
142  time=get_scalar_real_from_monc(io_configuration, source, data_id, data_dump, "time")
143  timestep=get_scalar_integer_from_monc(io_configuration, source, data_id, data_dump, "timestep")
144 
145  timestep_key=conv_to_string(timestep)
146 
148  if (.not. c_contains(time_points, timestep_key)) then
151  if (.not. c_contains(time_points, timestep_key)) then
152  call c_put_real(time_points, timestep_key, time)
153  end if
154  end if
156  end if
158 
161  subroutine inform_writer_federator_fields_present(io_configuration, field_names, diag_field_names_and_roots)
162  type(io_configuration_type), intent(inout) :: io_configuration
163  type(hashset_type), intent(inout), optional :: field_names
164  type(hashmap_type), intent(inout), optional :: diag_field_names_and_roots
165 
166  type(iterator_type) :: iterator
167  character(len=STRING_LENGTH) :: specific_name
168  integer :: i, number_q_fields, expected_io
169  logical :: field_found, expected_here, diagnostics_mode
170 
172  expected_io=-1
173  do while (c_has_next(iterator))
174  specific_name=c_next_string(iterator)
175  if (present(field_names)) then
176  field_found=c_contains(field_names, specific_name)
177  diagnostics_mode=.false.
178  else if (present(diag_field_names_and_roots)) then
179  field_found=c_contains(diag_field_names_and_roots, specific_name)
180  if (field_found) expected_io=c_get_integer(diag_field_names_and_roots, specific_name)
181  diagnostics_mode=.true.
182  else
183  field_found=.false.
184  end if
185  if (field_found) then
186  expected_here=expected_io == -1 .or. expected_io == io_configuration%my_io_rank
187  call enable_specific_field_by_name(specific_name, diagnostics_mode, expected_here)
188  end if
189  end do
190  iterator=c_get_iterator(q_field_names)
191  do while (c_has_next(iterator))
192  specific_name=c_next_string(iterator)
193  if (present(field_names)) then
194  field_found=c_contains(field_names, specific_name)
195  diagnostics_mode=.false.
196  else if (present(diag_field_names_and_roots)) then
197  field_found=c_contains(diag_field_names_and_roots, specific_name)
198  if (field_found) expected_io=c_get_integer(diag_field_names_and_roots, specific_name)
199  diagnostics_mode=.true.
200  else
201  field_found=.false.
202  end if
203  if (field_found) then
204  expected_here=expected_io == -1 .or. expected_io == io_configuration%my_io_rank
205  number_q_fields=c_get_integer(io_configuration%dimension_sizing, "qfields")
206  do i=1, number_q_fields
207  if (c_size(io_configuration%q_field_names) .ge. i) then
208  call enable_specific_field_by_name(trim(specific_name)//"_"//trim(c_get_string(io_configuration%q_field_names, i)), &
209  diagnostics_mode, expected_here)
210  else
211  call enable_specific_field_by_name(trim(specific_name)//"_udef"//trim(conv_to_string(i)), &
212  diagnostics_mode, expected_here)
213  end if
214  end do
215  end if
216  end do
218 
222  logical function is_field_used_by_writer_federator(field_name, field_namespace)
223  character(len=*), intent(in) :: field_name, field_namespace
224 
225  integer :: writer_index, contents_index
226 
227  writer_index=1
228  contents_index=1
229  if (c_contains(used_field_names, field_name)) then
230  is_field_used_by_writer_federator=get_next_applicable_writer_entry(field_name, field_namespace, writer_index, contents_index)
231  else
233  end if
235 
239  logical function is_field_split_on_q(field_name)
240  character(len=*), intent(in) :: field_name
241 
243  end function is_field_split_on_q
244 
247  subroutine enable_specific_field_by_name(field_name, diagnostics_mode, expected_here)
248  character(len=*), intent(in) :: field_name
249  logical, intent(in) :: diagnostics_mode
250  logical, intent(in), optional :: expected_here
251 
252  logical :: continue_search
253  integer :: writer_index, contents_index
254 
255  continue_search=.true.
256  writer_index=1
257  contents_index=0
258  do while (continue_search)
259  contents_index=contents_index+1
260  continue_search=get_next_applicable_writer_entry(field_name, writer_index_point=writer_index, &
261  contents_index_point=contents_index)
262  if (continue_search) then
263  if ((writer_entries(writer_index)%contents(contents_index)%diagnostic_field .and. diagnostics_mode) .or. &
264  (writer_entries(writer_index)%contents(contents_index)%prognostic_field .and. .not. diagnostics_mode)) then
265  writer_entries(writer_index)%contents(contents_index)%enabled=.true.
266  if (present(expected_here)) then
267  writer_entries(writer_index)%contents(contents_index)%expected_here=expected_here
268  end if
269  end if
270  end if
271  end do
272  end subroutine enable_specific_field_by_name
273 
277  subroutine provide_q_field_names_to_writer_federator(q_provided_field_names)
278  type(list_type), intent(inout) :: q_provided_field_names
279 
280  type(iterator_type) :: iterator, q_field_iterator
281  logical :: continue_search
282  integer :: writer_index, contents_index, i
283  character(len=STRING_LENGTH) :: search_field, field_name, specific_name
284 
285  iterator=c_get_iterator(q_field_names)
286  do while (c_has_next(iterator))
287  specific_name=c_next_string(iterator)
288  q_field_iterator=c_get_iterator(q_provided_field_names)
289  i=1
290  do while (c_has_next(q_field_iterator))
291  search_field=trim(specific_name)//"_udef"//trim(conv_to_string(i))
292  field_name=trim(specific_name)//"_"//trim(c_next_string(q_field_iterator))
293  continue_search=.true.
294  writer_index=1
295  contents_index=0
296  do while (continue_search)
297  contents_index=contents_index+1
298  continue_search=get_next_applicable_writer_entry(search_field, writer_index_point=writer_index, &
299  contents_index_point=contents_index)
300  if (continue_search) then
301  writer_entries(writer_index)%contents(contents_index)%field_name=field_name
302  end if
303  end do
304  i=i+1
305  call c_add_string(used_field_names, field_name)
306  call c_remove(used_field_names, search_field)
307  end do
308  end do
310 
311  subroutine provide_ordered_field_to_writer_federator(io_configuration, field_name, field_namespace, field_values, &
312  timestep, time, source)
313  type(io_configuration_type), intent(inout) :: io_configuration
314  character(len=*), intent(in) :: field_name, field_namespace
315  integer, intent(in) :: timestep, source
316  type(data_values_type), target :: field_values
317  real(kind=DEFAULT_PRECISION), intent(in) :: time
318 
319  integer :: writer_index, contents_index
320  logical :: continue_search
321  type(data_values_type), pointer :: result_values
322  type(hashmap_type) :: typed_result_values
323  class(*), pointer :: generic
324 
325  if (field_values%data_type == double_data_type) then
326  call provide_ordered_field_to_writer_federator_real_values(io_configuration, field_name, field_namespace, &
327  field_values%values, timestep, time, source)
328  else if (field_values%data_type == string_data_type) then
329  continue_search=.true.
330  writer_index=1
331  contents_index=0
332  allocate(result_values, source=field_values)
333  generic=>result_values
334  if (c_contains(used_field_names, field_name)) then
335  do while (continue_search)
336  contents_index=contents_index+1
337  continue_search=get_next_applicable_writer_entry(field_name, field_namespace, writer_index, contents_index)
338  if (continue_search) then
339  if (.not. writer_entries(writer_index)%contents(contents_index)%enabled) then
340  call log_log(log_warn, "Received data for previously un-enabled field '"//&
341  writer_entries(writer_index)%contents(contents_index)%field_name//"'")
342  end if
343  writer_entries(writer_index)%contents(contents_index)%enabled=.true.
344  writer_entries(writer_index)%contents(contents_index)%latest_timestep_values=timestep
345  if (log_get_logging_level() .ge. log_debug) then
346  call log_log(log_debug, "[WRITE FED VALUE STORE] Storing value for field "//trim(field_name)//" ts="//&
347  trim(conv_to_string(timestep))// " t="//trim(conv_to_string(time)))
348  end if
349  call check_thread_status(forthread_mutex_lock(writer_entries(writer_index)%contents(contents_index)%values_mutex))
350  call c_put_generic(writer_entries(writer_index)%contents(contents_index)%values_to_write, conv_to_string(time), &
351  generic, .false.)
352  call check_thread_status(forthread_mutex_unlock(writer_entries(writer_index)%contents(contents_index)%values_mutex))
353  if (writer_entries(writer_index)%contents(contents_index)%pending_to_write) then
354  call determine_if_outstanding_field_can_be_written(io_configuration, writer_entries(writer_index), &
355  writer_entries(writer_index)%contents(contents_index))
356  end if
357  end if
358  end do
359  end if
360  end if
362 
371  subroutine provide_ordered_field_to_writer_federator_real_values(io_configuration, field_name, field_namespace, field_values, &
372  timestep, time, source)
373  type(io_configuration_type), intent(inout) :: io_configuration
374  character(len=*), intent(in) :: field_name, field_namespace
375  integer, intent(in) :: timestep, source
376  real(kind=DEFAULT_PRECISION), dimension(:), intent(in) :: field_values
377  real(kind=DEFAULT_PRECISION), intent(in) :: time
378 
379  type(iterator_type) :: iterator
380  integer :: individual_size, index
381 
382  if (c_contains(used_field_names, field_name)) then
383  call provide_ordered_single_field_to_writer_federator(io_configuration, field_name, field_namespace, field_values, &
384  timestep, time, source)
385  else if (c_contains(q_field_names, field_name)) then
386  if (c_contains(q_field_splits, field_name)) then
387  individual_size=c_get_integer(q_field_splits, field_name)
388  else if (source .gt. -1) then
389  individual_size=get_size_of_collective_q(io_configuration, field_name, source)
390  else
391  call log_log(log_warn, "Can not find Q split field in Q field names or collective field names with source, ignoring")
392  return
393  end if
394  iterator=c_get_iterator(io_configuration%q_field_names)
395  index=1
396  do while (c_has_next(iterator))
397  call provide_ordered_single_field_to_writer_federator(io_configuration, &
398  trim(field_name)//"_"//trim(c_next_string(iterator)), field_namespace, field_values(index:index+individual_size-1), &
399  timestep, time, source)
400  index=index+individual_size
401  end do
402  end if
404 
410  integer function get_size_of_collective_q(io_configuration, field_name, source)
411  type(io_configuration_type), intent(inout) :: io_configuration
412  character(len=*), intent(in) :: field_name
413  integer, intent(in) :: source
414 
415  class(*), pointer :: generic
416  integer :: i, monc_index
417 
419  monc_index=get_monc_location(io_configuration, source)
420  generic=>c_get_generic(collective_q_field_dims, field_name)
421  select type(generic)
423  do i=1, size(generic%dimensions)
425  get_size_of_collective_q*io_configuration%registered_moncs(monc_index)%local_dim_sizes(generic%dimensions(i))
426  end do
427  end select
428  end function get_size_of_collective_q
429 
437  subroutine provide_ordered_single_field_to_writer_federator(io_configuration, field_name, field_namespace, field_values, &
438  timestep, time, source)
439  type(io_configuration_type), intent(inout) :: io_configuration
440  character(len=*), intent(in) :: field_name, field_namespace
441  integer, intent(in) :: timestep, source
442  real(kind=DEFAULT_PRECISION), dimension(:), intent(in) :: field_values
443  real(kind=DEFAULT_PRECISION), intent(in) :: time
444 
445  integer :: writer_index, contents_index
446  logical :: continue_search
447  type(data_values_type), pointer :: result_values
448  type(hashmap_type) :: typed_result_values
449  class(*), pointer :: generic
450 
451  continue_search=.true.
452  writer_index=1
453  contents_index=0
454  if (c_contains(used_field_names, field_name)) then
455  do while (continue_search)
456  contents_index=contents_index+1
457  continue_search=get_next_applicable_writer_entry(field_name, field_namespace, writer_index, contents_index)
458  if (continue_search) then
459  if (.not. writer_entries(writer_index)%contents(contents_index)%enabled) then
460  call log_log(log_warn, "Received data for previously un-enabled field '"//&
461  writer_entries(writer_index)%contents(contents_index)%field_name//"'")
462  end if
463  writer_entries(writer_index)%contents(contents_index)%enabled=.true.
464  if (.not. c_contains(typed_result_values, conv_to_string(&
465  writer_entries(writer_index)%contents(contents_index)%time_manipulation_type))) then
466  allocate(result_values)
467  if (writer_entries(writer_index)%contents(contents_index)%collective_write .and. source .gt. -1) then
468  result_values=writer_entries(writer_index)%contents(contents_index)%time_manipulation(field_values, &
469  writer_entries(writer_index)%contents(contents_index)%output_frequency, &
470  trim(field_name)//"#"//conv_to_string(source), timestep, time)
471  else
472  result_values=writer_entries(writer_index)%contents(contents_index)%time_manipulation(field_values, &
473  writer_entries(writer_index)%contents(contents_index)%output_frequency, &
474  field_name, timestep, time)
475  end if
476  generic=>result_values
477  call c_put_generic(typed_result_values, conv_to_string(&
478  writer_entries(writer_index)%contents(contents_index)%time_manipulation_type), generic, .false.)
479  else
480  result_values=>get_data_value_by_field_name(typed_result_values, conv_to_string(&
481  writer_entries(writer_index)%contents(contents_index)%time_manipulation_type))
482  end if
483  if (allocated(result_values%values)) then
484  writer_entries(writer_index)%contents(contents_index)%latest_timestep_values=timestep
485  if (log_get_logging_level() .ge. log_debug) then
486  call log_log(log_debug, "[WRITE FED VALUE STORE] Storing value for field "//trim(field_name)//" ts="//&
487  trim(conv_to_string(timestep))// " t="//trim(conv_to_string(time)))
488  end if
489  call check_thread_status(forthread_mutex_lock(writer_entries(writer_index)%contents(contents_index)%values_mutex))
490  if (writer_entries(writer_index)%contents(contents_index)%collective_write .and. source .gt. -1) then
491  call write_collective_write_value(result_values, writer_index, contents_index, source, conv_to_string(time))
492  else
493  call c_put_generic(writer_entries(writer_index)%contents(contents_index)%values_to_write, conv_to_string(time), &
494  generic, .false.)
495  end if
496  call check_thread_status(forthread_mutex_unlock(writer_entries(writer_index)%contents(contents_index)%values_mutex))
497  if (writer_entries(writer_index)%contents(contents_index)%pending_to_write) then
498  call determine_if_outstanding_field_can_be_written(io_configuration, writer_entries(writer_index), &
499  writer_entries(writer_index)%contents(contents_index))
500  end if
501  end if
502  end if
503  end do
504  end if
505  call c_free(typed_result_values)
507 
515  subroutine write_collective_write_value(result_values, writer_index, contents_index, source, lookup_key)
516  integer, intent(in) :: writer_index, contents_index, source
517  type(data_values_type), pointer :: result_values
518  character(len=*), intent(in) :: lookup_key
519 
520  class(*), pointer :: generic
521  type(write_field_collective_values_type), pointer :: stored_monc_values
522 
523  if (c_contains(writer_entries(writer_index)%contents(contents_index)%values_to_write, lookup_key)) then
524  generic=>c_get_generic(writer_entries(writer_index)%contents(contents_index)%values_to_write, lookup_key)
525  select type(generic)
527  stored_monc_values=>generic
528  end select
529  else
530  allocate(stored_monc_values)
531  generic=>stored_monc_values
532  call c_put_generic(writer_entries(writer_index)%contents(contents_index)%values_to_write, lookup_key, generic, .false.)
533  end if
534  generic=>result_values
535  call c_put_generic(stored_monc_values%monc_values, conv_to_string(source), generic, .false.)
536  end subroutine write_collective_write_value
537 
541  subroutine determine_if_outstanding_field_can_be_written(io_configuration, writer_entry, specific_field)
542  type(io_configuration_type), intent(inout) :: io_configuration
543  type(writer_type), intent(inout) :: writer_entry
544  type(writer_field_type), intent(inout) :: specific_field
545 
546  logical :: field_write_success, do_close_num_fields
547 
548  if (specific_field%pending_to_write) then
549  call determine_if_field_can_be_written(io_configuration, writer_entry, specific_field, writer_entry%write_timestep, &
550  writer_entry%previous_write_timestep, writer_entry%write_time, writer_entry%previous_write_time, field_write_success)
551  if (field_write_success) then
552  if (log_get_logging_level() .ge. log_debug) then
553  call log_log(log_debug, "Flushed outstanding field ts="//conv_to_string(writer_entry%write_timestep)//&
554  " write time="//conv_to_string(writer_entry%write_time))
555  end if
556  call check_thread_status(forthread_mutex_lock(writer_entry%num_fields_to_write_mutex))
557  writer_entry%num_fields_to_write=writer_entry%num_fields_to_write-1
558  do_close_num_fields=writer_entry%num_fields_to_write == 0
559  call check_thread_status(forthread_mutex_unlock(writer_entry%num_fields_to_write_mutex))
560  if (do_close_num_fields) then
561  call close_diagnostics_file(io_configuration, writer_entry, writer_entry%write_timestep, writer_entry%write_time)
562  end if
563  end if
564  end if
566 
573  subroutine determine_if_field_can_be_written(io_configuration, writer_entry, specific_field, &
574  timestep, previous_write_timestep, write_time, previous_write_time, field_written)
575  type(io_configuration_type), intent(inout) :: io_configuration
576  type(writer_type), intent(inout) :: writer_entry
577  type(writer_field_type), intent(inout) :: specific_field
578  integer, intent(in) :: timestep, previous_write_timestep
579  real, intent(in) :: write_time, previous_write_time
580  logical, intent(out), optional :: field_written
581 
582  real :: value_to_test, largest_value_found
583  integer :: num_matching
584  type(iterator_type) :: iterator
585  type(mapentry_type) :: map_entry
586  type(write_field_collective_values_type), pointer :: multi_monc_entries
587  class(*), pointer :: generic
588 
589  num_matching=0
590  largest_value_found=0.0
591  call check_thread_status(forthread_mutex_lock(specific_field%values_mutex))
592  if (.not. c_is_empty(specific_field%values_to_write)) then
593  iterator=c_get_iterator(specific_field%values_to_write)
594  do while (c_has_next(iterator))
595  map_entry=c_next_mapentry(iterator)
596  value_to_test=conv_to_real(map_entry%key)
597  if (specific_field%collective_write) then
598  generic=>c_get_generic(map_entry)
599  select type(generic)
601  multi_monc_entries=>generic
602  end select
603  if (c_size(multi_monc_entries%monc_values) .ne. io_configuration%number_of_moncs) cycle
604  end if
605  if (value_to_test .le. write_time .and. value_to_test .gt. previous_write_time) then
606  num_matching=num_matching+1
607  if (largest_value_found .lt. value_to_test) largest_value_found=value_to_test
608  end if
609  end do
610  end if
611 
612  if (num_matching .gt. 0 .and. specific_field%ready_to_write(largest_value_found, specific_field%output_frequency, write_time, &
613  specific_field%latest_timestep_values, timestep)) then
614  if (.not. specific_field%collective_write .or. .not. specific_field%collective_contiguous_optimisation) then
615  if (specific_field%issue_write) then
616  call write_variable(io_configuration, specific_field, writer_entry%filename, timestep, write_time)
617  end if
618  specific_field%previous_write_time=writer_entry%write_time
619  end if
620  specific_field%pending_to_write=.false.
621  if (present(field_written)) field_written=.true.
622  else
623  if (log_get_logging_level() .ge. log_debug) then
624  call log_log(log_debug, "Setting outstanding field ts="//conv_to_string(writer_entry%write_timestep)//&
625  " write time="//conv_to_string(writer_entry%write_time)//" prev="//conv_to_string(previous_write_time)//&
626  " largest entry="//conv_to_string(largest_value_found)//" num matching="//conv_to_string(num_matching))
627  end if
628  specific_field%pending_to_write=.true.
629  if (present(field_written)) field_written=.false.
630  end if
631  call check_thread_status(forthread_mutex_unlock(specific_field%values_mutex))
632  end subroutine determine_if_field_can_be_written
633 
639  subroutine check_writer_for_trigger(io_configuration, source, data_id, data_dump)
640  type(io_configuration_type), intent(inout) :: io_configuration
641  integer, intent(in) :: source, data_id
642  character, dimension(:), allocatable, intent(in) :: data_dump
643 
644  integer :: i, timestep
645  real(kind=DEFAULT_PRECISION) :: time
646  logical :: terminated
647 
648  if (is_field_present(io_configuration, source, data_id, "timestep") .and. &
649  is_field_present(io_configuration, source, data_id, "time")) then
650  timestep=get_scalar_integer_from_monc(io_configuration, source, data_id, data_dump, "timestep")
651  time=get_scalar_real_from_monc(io_configuration, source, data_id, data_dump, "time")
652 
653  if (is_field_present(io_configuration, source, data_id, "terminated")) then
654  terminated=get_scalar_logical_from_monc(io_configuration, source, data_id, data_dump, "terminated")
655  else
656  terminated=.false.
657  end if
658  do i=1, size(writer_entries)
659  call check_writer_trigger(io_configuration, i, timestep, real(time, kind=4), terminated)
660  end do
661  end if
662  end subroutine check_writer_for_trigger
663 
671  subroutine check_writer_trigger(io_configuration, writer_entry_index, timestep, time, terminated)
672  type(io_configuration_type), intent(inout) :: io_configuration
673  integer, intent(in) :: writer_entry_index, timestep
674  real, intent(in) :: time
675  logical, intent(in) :: terminated
676 
677  real :: time_difference
678  integer :: i
679  logical :: issue_write, issue_terminated_write
680 
681  call check_thread_status(forthread_mutex_lock(writer_entries(writer_entry_index)%trigger_and_write_mutex))
682  issue_terminated_write=writer_entries(writer_entry_index)%write_on_terminate .and. terminated
683  if (writer_entries(writer_entry_index)%write_on_model_time) then
684  time_difference=time-writer_entries(writer_entry_index)%latest_pending_write_time
685  issue_write=time_difference .ge. writer_entries(writer_entry_index)%write_time_frequency
686  else
687  if (writer_entries(writer_entry_index)%write_timestep_frequency .gt. 0) then
688  issue_write=writer_entries(writer_entry_index)%latest_pending_write_timestep .ne. timestep .and. &
689  mod(timestep, writer_entries(writer_entry_index)%write_timestep_frequency) == 0
690  else
691  issue_write=.false.
692  end if
693  issue_terminated_write=issue_terminated_write .and. &
694  writer_entries(writer_entry_index)%latest_pending_write_timestep .ne. timestep
695  end if
696 
697  if (issue_write .or. issue_terminated_write) then
698  writer_entries(writer_entry_index)%latest_pending_write_time=time
699  writer_entries(writer_entry_index)%latest_pending_write_timestep=timestep
700 
702 
703  if (currently_writing) then
705  call check_thread_status(forthread_mutex_unlock(writer_entries(writer_entry_index)%trigger_and_write_mutex))
706  call register_pending_file_write(writer_entry_index, timestep, time, &
707  writer_entries(writer_entry_index)%write_on_terminate .and. terminated)
708  else
709  currently_writing=.true.
711  call check_thread_status(forthread_mutex_unlock(writer_entries(writer_entry_index)%trigger_and_write_mutex))
712  call issue_actual_write(io_configuration, writer_entries(writer_entry_index), timestep, time, &
713  writer_entries(writer_entry_index)%write_on_terminate .and. terminated)
714  end if
715  else
716  call check_thread_status(forthread_mutex_unlock(writer_entries(writer_entry_index)%trigger_and_write_mutex))
717  end if
718  end subroutine check_writer_trigger
719 
725  subroutine issue_actual_write(io_configuration, writer_entry, timestep, time, terminated_write)
726  type(io_configuration_type), intent(inout) :: io_configuration
727  type(writer_type), intent(inout) :: writer_entry
728  integer, intent(in) :: timestep
729  real, intent(in) :: time
730  logical, intent(in) :: terminated_write
731 
732  integer :: i, j, total_outstanding, num_written, total_flds
733  logical :: field_written
734  type(map_type) :: applicable_time_points
735 
737  do i=1, size(writer_entry%contents)
738  if (writer_entry%contents(i)%enabled .and. writer_entry%contents(i)%collective_write) then
739  if (.not. writer_entry%contents(i)%collective_initialised) then
740  call determine_collective_type_and_optimise_if_possible(io_configuration, writer_entry%contents(i))
741  end if
742  end if
743  end do
745 
746  writer_entry%write_time=time
747  writer_entry%write_timestep=timestep
748  applicable_time_points=extract_applicable_time_points(writer_entry%previous_write_time, time)
749  call define_netcdf_file(io_configuration, writer_entry, timestep, time, applicable_time_points, terminated_write)
750  call c_free(applicable_time_points)
751  total_outstanding=0
752  total_flds=0
753  num_written=0
754  call check_thread_status(forthread_mutex_lock(writer_entry%num_fields_to_write_mutex))
755  do j=1, size(writer_entry%contents)
756  if (writer_entry%contents(j)%enabled .and. writer_entry%contents(j)%expected_here) then
757  total_flds=total_flds+1
758  call determine_if_field_can_be_written(io_configuration, writer_entry, writer_entry%contents(j), timestep, &
759  writer_entry%previous_write_timestep, time, writer_entry%contents(j)%previous_write_time, field_written)
760  if (.not. field_written) then
761  total_outstanding=total_outstanding+1
762  else
763  num_written=num_written+1
764  end if
765  end if
766  end do
767  writer_entry%num_fields_to_write=total_outstanding
768  call check_thread_status(forthread_mutex_unlock(writer_entry%num_fields_to_write_mutex))
769  if (log_get_logging_level() .ge. log_debug) then
770  call log_log(log_debug, "Started write for NetCDF file, timestep= "//trim(conv_to_string(timestep))&
771  //" total="//trim(conv_to_string(total_flds))//" written="//trim(conv_to_string(num_written))//&
772  " outstanding="//trim(conv_to_string(total_outstanding)))
773  end if
774  if (total_outstanding == 0) then
775  call close_diagnostics_file(io_configuration, writer_entry, timestep, time)
776  end if
777  end subroutine issue_actual_write
778 
783  type(map_type) function extract_applicable_time_points(start_time, end_time)
784  real, intent(in) :: start_time, end_time
785 
786  real :: time_entry
787  type(iterator_type) :: iterator
788  type(mapentry_type) :: map_entry
789 
791  iterator=c_get_iterator(time_points)
792  do while (c_has_next(iterator))
793  map_entry=c_next_mapentry(iterator)
794  time_entry=real(c_get_real(map_entry))
795  if (time_entry .gt. start_time .and. time_entry .le. end_time) then
796  call c_put_real(extract_applicable_time_points, map_entry%key, conv_single_real_to_double(time_entry))
797  end if
798  end do
800  extract_applicable_time_points=sort_applicable_time_points(extract_applicable_time_points)
801  end function extract_applicable_time_points
802 
807  type(map_type) function sort_applicable_time_points(unsorted_timepoints)
808  type(map_type), intent(inout) :: unsorted_timepoints
809 
810  integer :: i, entries, specific_ts, smallest_ts
811  character(len=STRING_LENGTH) :: smallest_key
812  real(kind=DEFAULT_PRECISION) :: rvalue
813  type(iterator_type) :: iterator
814  type(mapentry_type) :: map_entry
815 
816  entries=c_size(unsorted_timepoints)
817  do i=1, entries
818  smallest_key=""
819  iterator=c_get_iterator(unsorted_timepoints)
820  do while (c_has_next(iterator))
821  map_entry=c_next_mapentry(iterator)
822  specific_ts=conv_to_integer(map_entry%key)
823  if (len_trim(smallest_key) == 0 .or. smallest_ts .gt. specific_ts) then
824  smallest_ts=specific_ts
825  smallest_key=map_entry%key
826  rvalue=c_get_real(map_entry)
827  end if
828  end do
829  call c_put_real(sort_applicable_time_points, smallest_key, rvalue)
830  call c_remove(unsorted_timepoints, smallest_key)
831  end do
832  call c_free(unsorted_timepoints)
833  end function sort_applicable_time_points
834 
841  subroutine close_diagnostics_file(io_configuration, writer_entry, timestep, time)
842  type(io_configuration_type), intent(inout) :: io_configuration
843  type(writer_type), intent(inout) :: writer_entry
844  integer, intent(in) :: timestep
845  real, intent(in) :: time
846 
847  if (log_get_logging_level() .ge. log_debug) then
848  call log_log(log_debug, "Issue close for NetCDF file at timestep "//trim(conv_to_string(timestep)))
849  end if
850  call perform_global_callback(io_configuration, writer_entry%filename, timestep, handle_close_diagnostics_globalcallback)
851  end subroutine close_diagnostics_file
852 
860  subroutine handle_close_diagnostics_globalcallback(io_configuration, values, field_name, timestep)
861  type(io_configuration_type), intent(inout) :: io_configuration
862  real(DEFAULT_PRECISION), dimension(:) :: values
863  character(len=STRING_LENGTH) :: field_name
864  integer :: timestep
865 
866  type(writer_type), pointer :: writer_entry
867  integer :: i
868  logical :: terminated, done_chain_run
869 
870  writer_entry=>get_writer_entry_from_netcdf(field_name, timestep, terminated)
871 
872  do i=1, size(writer_entry%contents)
873  if (writer_entry%contents(i)%enabled .and. writer_entry%contents(i)%collective_write .and. &
874  writer_entry%contents(i)%collective_contiguous_optimisation) then
875  call check_thread_status(forthread_mutex_lock(writer_entry%contents(i)%values_mutex))
876  call write_variable(io_configuration, writer_entry%contents(i), writer_entry%filename, timestep, writer_entry%write_time)
877  writer_entry%contents(i)%previous_write_time=writer_entry%write_time
878  call check_thread_status(forthread_mutex_unlock(writer_entry%contents(i)%values_mutex))
879  end if
880  end do
881 
882  writer_entry%previous_write_time=writer_entry%write_time
883  writer_entry%previous_write_timestep=writer_entry%write_timestep
884  writer_entry%defined_write_time=writer_entry%defined_write_time+writer_entry%write_time_frequency
885 
886  if (writer_entry%contains_io_status_dump) then
887  if (.not. terminated) then
888  do while (.not. is_io_server_state_writer_ready(timestep))
889  end do
890  end if
891  call check_thread_status(forthread_rwlock_rdlock(time_points_rwlock))
892  call store_io_server_state(io_configuration, writer_entries, time_points, writer_entry, timestep)
893  call check_thread_status(forthread_rwlock_unlock(time_points_rwlock))
894  end if
895 
896  call close_netcdf_file(io_configuration, field_name, timestep)
897 
898  done_chain_run=.false.
899  do i=1, size(writer_entries)
900  if (writer_entries(i)%filename .ne. writer_entry%filename) then
901  done_chain_run=check_for_and_issue_chain_write(io_configuration, writer_entries(i))
902  if (done_chain_run) exit
903  end if
904  end do
905  if (.not. done_chain_run) done_chain_run=check_for_and_issue_chain_write(io_configuration, writer_entry)
906 
907  if (.not. done_chain_run) then
908  call check_thread_status(forthread_mutex_lock(currently_writing_mutex))
909  currently_writing=.false.
910  call check_thread_status(forthread_mutex_unlock(currently_writing_mutex))
911  if (log_get_logging_level() .ge. log_debug) then
912  call log_log(log_debug, "No more pending entries to chain to at ts= "//trim(conv_to_string(timestep)))
913  end if
914  end if
916 
921  logical function check_for_and_issue_chain_write(io_configuration, writer_entry)
922  type(io_configuration_type), intent(inout) :: io_configuration
923  type(writer_type), intent(inout) :: writer_entry
924 
925  class(*), pointer :: generic
926 
927  call check_thread_status(forthread_mutex_lock(writer_entry%pending_writes_mutex))
928  if (.not. c_is_empty(writer_entry%pending_writes)) then
930  generic=>c_pop_generic(writer_entry%pending_writes)
931  call check_thread_status(forthread_mutex_unlock(writer_entry%pending_writes_mutex))
932  select type(generic)
933  type is (pending_write_type)
934  if (log_get_logging_level() .ge. log_debug) then
935  call log_log(log_debug, "Chain to next pending entry ts= "//trim(conv_to_string(generic%timestep)))
936  end if
937  call issue_actual_write(io_configuration, writer_entry, generic%timestep, &
938  generic%write_time, generic%terminated_write)
939  deallocate(generic)
940  end select
941  else
943  call check_thread_status(forthread_mutex_unlock(writer_entry%pending_writes_mutex))
944  end if
946 
951  subroutine register_pending_file_write(writer_entry_index, timestep, time, terminated_write)
952  integer, intent(in) :: writer_entry_index, timestep
953  real, intent(in) :: time
954  logical, intent(in) :: terminated_write
955 
956  type(pending_write_type), pointer :: pending_write
957  class(*), pointer :: generic
958 
959  allocate(pending_write)
960  pending_write%write_time=time
961  pending_write%timestep=timestep
962  pending_write%terminated_write=terminated_write
963 
964  generic=>pending_write
965  call check_thread_status(forthread_mutex_lock(writer_entries(writer_entry_index)%pending_writes_mutex))
966  call c_push_generic(writer_entries(writer_entry_index)%pending_writes, generic, .false.)
967  call check_thread_status(forthread_mutex_unlock(writer_entries(writer_entry_index)%pending_writes_mutex))
968  end subroutine register_pending_file_write
969 
976  logical function get_next_applicable_writer_entry(field_name, field_namespace, writer_index_point, contents_index_point)
977  character(len=*), intent(in) :: field_name
978  character(len=*), intent(in), optional :: field_namespace
979  integer, intent(inout) :: writer_index_point, contents_index_point
980 
981  integer :: i, j
982 
983  if (writer_index_point .le. size(writer_entries)) then
984  do i=writer_index_point, size(writer_entries)
985  if (contents_index_point .le. size(writer_entries(i)%contents)) then
986  do j=contents_index_point, size(writer_entries(i)%contents)
987  if (writer_entries(i)%contents(j)%field_name==field_name) then
988  if (present(field_namespace)) then
989  if (writer_entries(i)%contents(j)%field_namespace .ne. field_namespace) cycle
990  end if
991  writer_index_point=i
992  contents_index_point=j
994  return
995  end if
996  end do
997  end if
998  contents_index_point=1
999  end do
1000  end if
1002  end function get_next_applicable_writer_entry
1003 
1009  integer function get_total_number_writer_fields(io_configuration, writer_entry_index)
1010  type(io_configuration_type), intent(inout) :: io_configuration
1011  integer, intent(in) :: writer_entry_index
1012 
1013  integer :: i, number_contents, group_index, number_q_fields
1014 
1016  number_q_fields=c_get_integer(io_configuration%dimension_sizing, "qfields")
1017 
1018  number_contents=io_configuration%file_writers(writer_entry_index)%number_of_contents
1019  do i=1, number_contents
1020  if (io_configuration%file_writers(writer_entry_index)%contents(i)%facet_type == group_type) then
1021  group_index=get_index_of_group(io_configuration, &
1022  io_configuration%file_writers(writer_entry_index)%contents(i)%facet_name)
1023  if (group_index == 0) call log_log(log_error, "Can not find group '"//trim(&
1024  io_configuration%file_writers(writer_entry_index)%contents(i)%facet_name)//"'")
1026  get_group_number_of_fields(io_configuration, io_configuration%groups(group_index)%members, number_q_fields, &
1027  io_configuration%groups(group_index)%namespace)
1028  else if (io_configuration%file_writers(writer_entry_index)%contents(i)%facet_type == field_type) then
1029  ! NSE
1031  io_configuration%file_writers(writer_entry_index)%contents(i)%facet_name, "", number_q_fields)
1032  end if
1033  end do
1034  end function get_total_number_writer_fields
1035 
1041  integer function get_group_number_of_fields(io_configuration, group_members, num_q_fields, namespace)
1042  type(io_configuration_type), intent(inout) :: io_configuration
1043  type(list_type) :: group_members
1044  integer, intent(in) :: num_q_fields
1045  character(len=STRING_LENGTH), intent(in) :: namespace
1046 
1047  type(iterator_type) :: iterator
1048  character(len=STRING_LENGTH) :: field_name
1049 
1051  iterator=c_get_iterator(group_members)
1052  do while (c_has_next(iterator))
1053  field_name=c_next_string(iterator)
1054  get_group_number_of_fields=get_group_number_of_fields+get_field_number_of_fields(io_configuration, field_name, namespace, &
1055  num_q_fields)
1056  end do
1057  end function get_group_number_of_fields
1058 
1065  integer function get_field_number_of_fields(io_configuration, field_name, field_namespace, num_q_fields)
1066  type(io_configuration_type), intent(inout) :: io_configuration
1067  character(len=STRING_LENGTH), intent(in) :: field_name, field_namespace
1068  integer, intent(in) :: num_q_fields
1069 
1070  type(io_configuration_field_type) :: prognostic_field_configuration
1071  type(io_configuration_data_definition_type) :: prognostic_containing_data_defn
1072  type(io_configuration_diagnostic_field_type) :: diagnostic_field_configuration
1073 
1074  if (get_diagnostic_field_configuration(io_configuration, field_name, field_namespace, diagnostic_field_configuration)) then
1075  if (diagnostic_field_configuration%field_type == array_field_type) then
1076  if (diagnostic_field_configuration%dim_size_defns(diagnostic_field_configuration%dimensions) .eq. "qfields") then
1077  get_field_number_of_fields=num_q_fields
1078  return
1079  end if
1080  end if
1082  else if (get_prognostic_field_configuration(io_configuration, field_name, field_namespace, &
1083  prognostic_field_configuration, prognostic_containing_data_defn)) then
1084  if (prognostic_field_configuration%field_type == array_field_type) then
1085  if (prognostic_field_configuration%dim_size_defns(prognostic_field_configuration%dimensions) .eq. "qfields") then
1086  get_field_number_of_fields=num_q_fields
1087  return
1088  end if
1089  end if
1091  end if
1092  end function get_field_number_of_fields
1093 
1101  integer function add_group_of_fields_to_writer_entry(io_configuration, writer_entry_index, facet_index, current_field_index, &
1102  writer_field_names, duplicate_field_names, diagnostic_generation_frequency)
1103  type(io_configuration_type), intent(inout) :: io_configuration
1104  integer, intent(in) :: writer_entry_index, facet_index, current_field_index
1105  type(hashset_type), intent(inout) :: writer_field_names, duplicate_field_names
1106  type(hashmap_type), intent(inout) :: diagnostic_generation_frequency
1107 
1108  integer :: group_index
1109  character(len=STRING_LENGTH) :: field_name
1110  type(iterator_type) :: iterator
1111 
1112  add_group_of_fields_to_writer_entry=current_field_index
1113  group_index=get_index_of_group(io_configuration, &
1114  io_configuration%file_writers(writer_entry_index)%contents(facet_index)%facet_name)
1115  if (group_index == 0) then
1116  call log_log(log_error, "Can not find group '"//&
1117  trim(io_configuration%file_writers(writer_entry_index)%contents(facet_index)%facet_name)//"' in the configuration")
1118  end if
1119  iterator=c_get_iterator(io_configuration%groups(group_index)%members)
1120  do while (c_has_next(iterator))
1121  field_name=c_next_string(iterator)
1123  writer_entry_index, facet_index, add_group_of_fields_to_writer_entry, field_name, &
1124  io_configuration%groups(group_index)%namespace, writer_field_names, duplicate_field_names, &
1125  diagnostic_generation_frequency)
1126  end do
1128 
1140  integer function add_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, &
1141  my_facet_index, field_name, field_namespace, writer_field_names, duplicate_field_names, diagnostic_generation_frequency)
1142  type(io_configuration_type), intent(inout) :: io_configuration
1143  integer, intent(in) :: writer_entry_index, io_config_facet_index, my_facet_index
1144  character(len=*), intent(in) :: field_name, field_namespace
1145  type(hashset_type), intent(inout) :: writer_field_names, duplicate_field_names
1146  type(hashmap_type), intent(inout) :: diagnostic_generation_frequency
1147 
1148  integer :: i, number_q_fields, tot_size
1149  type(io_configuration_field_type) :: prognostic_field_configuration
1150  type(io_configuration_data_definition_type) :: prognostic_containing_data_defn
1151  type(io_configuration_diagnostic_field_type) :: diagnostic_field_configuration
1152  type(collective_q_field_representation_type), pointer :: collective_q_field
1153  class(*), pointer :: generic
1154 
1155  if (get_diagnostic_field_configuration(io_configuration, field_name, field_namespace, diagnostic_field_configuration)) then
1156  if (diagnostic_field_configuration%field_type == array_field_type) then
1157  if (diagnostic_field_configuration%dim_size_defns(diagnostic_field_configuration%dimensions) .eq. "qfields") then
1158  number_q_fields=c_get_integer(io_configuration%dimension_sizing, "qfields")
1159  do i=1, number_q_fields
1160  call add_specific_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, &
1161  my_facet_index+i, trim(field_name)//"_udef"//trim(conv_to_string(i)), field_namespace, writer_field_names, &
1162  duplicate_field_names, c_get_integer(diagnostic_generation_frequency, field_name), diagnostic_field_configuration)
1163  end do
1164  tot_size=1
1165  do i=1, writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%dimensions
1166  tot_size=tot_size*writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%actual_dim_size(i)
1167  end do
1168  call c_put_integer(q_field_splits, field_name, tot_size)
1169  add_field_to_writer_entry=number_q_fields
1170  return
1171  end if
1172  end if
1173  call add_specific_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, &
1174  my_facet_index+1, field_name, field_namespace, writer_field_names, duplicate_field_names, &
1175  c_get_integer(diagnostic_generation_frequency, field_name), diagnostic_field_configuration)
1177  else if (get_prognostic_field_configuration(io_configuration, field_name, field_namespace, &
1178  prognostic_field_configuration, prognostic_containing_data_defn)) then
1179  if (prognostic_field_configuration%field_type == array_field_type) then
1180  if (prognostic_field_configuration%dim_size_defns(prognostic_field_configuration%dimensions) .eq. "qfields") then
1181  number_q_fields=c_get_integer(io_configuration%dimension_sizing, "qfields")
1182  do i=1, number_q_fields
1183  call add_specific_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, &
1184  my_facet_index+i, trim(field_name)//"_udef"//trim(conv_to_string(i)), field_namespace, writer_field_names, &
1185  duplicate_field_names, prognostic_containing_data_defn%frequency, &
1186  prognostic_field_configuration=prognostic_field_configuration)
1187  end do
1188  if (prognostic_field_configuration%collective) then
1189  allocate(collective_q_field)
1190  allocate(collective_q_field%dimensions(&
1191  writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%dimensions))
1192  do i=1, writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%dimensions
1193  if (trim(writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%dim_size_defns(i)) == "z") then
1194  collective_q_field%dimensions(i)=1
1195  else if (trim(writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%dim_size_defns(i)) &
1196  == "y") then
1197  collective_q_field%dimensions(i)=2
1198  else if (trim(writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%dim_size_defns(i)) &
1199  == "x") then
1200  collective_q_field%dimensions(i)=3
1201  end if
1202  end do
1203  generic=>collective_q_field
1204  call c_put_generic(collective_q_field_dims, field_name, generic, .false.)
1205  else
1206  tot_size=1
1207  do i=1, writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%dimensions
1208  tot_size=tot_size*writer_entries(writer_entry_index)%contents(my_facet_index+number_q_fields)%actual_dim_size(i)
1209  end do
1210  call c_put_integer(q_field_splits, field_name, tot_size)
1211  end if
1212  call c_add_string(q_field_names, field_name)
1213  add_field_to_writer_entry=number_q_fields
1214  return
1215  end if
1216  end if
1217  call add_specific_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, &
1218  my_facet_index+1, field_name, field_namespace, writer_field_names, duplicate_field_names, &
1219  prognostic_containing_data_defn%frequency, prognostic_field_configuration=prognostic_field_configuration)
1221  else
1222  call log_log(log_error, "Field '"//trim(field_name)//&
1223  "' configured for file write but can not find this as a prognostic or diagnostic definition")
1224  end if
1225  end function add_field_to_writer_entry
1226 
1238  subroutine add_specific_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, &
1239  my_facet_index, field_name, field_namespace, writer_field_names, duplicate_field_names, timestep_frequency, &
1240  diagnostic_field_configuration, prognostic_field_configuration)
1241  type(io_configuration_type), intent(inout) :: io_configuration
1242  integer, intent(in) :: writer_entry_index, io_config_facet_index, my_facet_index, timestep_frequency
1243  character(len=*), intent(in) :: field_name, field_namespace
1244  type(hashset_type), intent(inout) :: writer_field_names, duplicate_field_names
1245  type(io_configuration_diagnostic_field_type), intent(inout), optional :: diagnostic_field_configuration
1246  type(io_configuration_field_type), intent(inout), optional :: prognostic_field_configuration
1247 
1248  integer :: i
1249 
1250  writer_entries(writer_entry_index)%contents(my_facet_index)%field_name=field_name
1251  writer_entries(writer_entry_index)%contents(my_facet_index)%field_namespace=field_namespace
1252 
1253  call c_add_string(used_field_names, field_name)
1254 
1255  if (.not. c_contains(writer_field_names, field_name)) then
1256  call c_add_string(writer_field_names, writer_entries(writer_entry_index)%contents(my_facet_index)%field_name)
1257  else
1258  call c_add_string(duplicate_field_names, writer_entries(writer_entry_index)%contents(my_facet_index)%field_name)
1259  end if
1260 
1261  if (io_configuration%file_writers(writer_entry_index)%contents(io_config_facet_index)%time_manipulation_type == &
1262  instantaneous_type) then
1263  writer_entries(writer_entry_index)%contents(my_facet_index)%time_manipulation=>perform_instantaneous_time_manipulation
1264  writer_entries(writer_entry_index)%contents(my_facet_index)%ready_to_write=>is_instantaneous_time_manipulation_ready_to_write
1265  else if (io_configuration%file_writers(writer_entry_index)%contents(io_config_facet_index)%time_manipulation_type == &
1266  time_averaged_type) then
1267  writer_entries(writer_entry_index)%contents(my_facet_index)%time_manipulation=>perform_timeaveraged_time_manipulation
1268  writer_entries(writer_entry_index)%contents(my_facet_index)%ready_to_write=>is_time_averaged_time_manipulation_ready_to_write
1269  else if (io_configuration%file_writers(writer_entry_index)%contents(io_config_facet_index)%time_manipulation_type == &
1270  none_type) then
1271  writer_entries(writer_entry_index)%contents(my_facet_index)%time_manipulation=>perform_none_time_manipulation
1272  writer_entries(writer_entry_index)%contents(my_facet_index)%ready_to_write=>is_none_time_manipulation_ready_to_write
1273  end if
1274  writer_entries(writer_entry_index)%contents(my_facet_index)%time_manipulation_type=&
1275  io_configuration%file_writers(writer_entry_index)%contents(io_config_facet_index)%time_manipulation_type
1276  writer_entries(writer_entry_index)%contents(my_facet_index)%output_frequency=&
1277  io_configuration%file_writers(writer_entry_index)%contents(io_config_facet_index)%output_time_frequency
1278  writer_entries(writer_entry_index)%contents(my_facet_index)%previous_write_time=0.0
1279  writer_entries(writer_entry_index)%contents(my_facet_index)%previous_tracked_write_point=0.0
1280  writer_entries(writer_entry_index)%contents(my_facet_index)%duplicate_field_name=.false.
1281  writer_entries(writer_entry_index)%contents(my_facet_index)%pending_to_write=.false.
1282  writer_entries(writer_entry_index)%contents(my_facet_index)%enabled=.false.
1283  writer_entries(writer_entry_index)%contents(my_facet_index)%expected_here=.true.
1284  writer_entries(writer_entry_index)%contents(my_facet_index)%prognostic_field=.false.
1285  writer_entries(writer_entry_index)%contents(my_facet_index)%diagnostic_field=.false.
1286 
1287  if (present(diagnostic_field_configuration)) then
1288  writer_entries(writer_entry_index)%contents(my_facet_index)%timestep_frequency=timestep_frequency
1289  writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions=diagnostic_field_configuration%dimensions
1290  writer_entries(writer_entry_index)%contents(my_facet_index)%data_type=diagnostic_field_configuration%data_type
1291  writer_entries(writer_entry_index)%contents(my_facet_index)%field_type=diagnostic_field_configuration%field_type
1292  writer_entries(writer_entry_index)%contents(my_facet_index)%dim_size_defns=diagnostic_field_configuration%dim_size_defns
1293  writer_entries(writer_entry_index)%contents(my_facet_index)%units=diagnostic_field_configuration%units
1294  writer_entries(writer_entry_index)%contents(my_facet_index)%collective_write=diagnostic_field_configuration%collective
1295  writer_entries(writer_entry_index)%contents(my_facet_index)%collective_initialised=.false.
1296  writer_entries(writer_entry_index)%contents(my_facet_index)%issue_write=.true.
1297  writer_entries(writer_entry_index)%contents(my_facet_index)%diagnostic_field=.true.
1298  else if (present(prognostic_field_configuration)) then
1299  writer_entries(writer_entry_index)%contents(my_facet_index)%timestep_frequency=timestep_frequency
1300  writer_entries(writer_entry_index)%contents(my_facet_index)%data_type=prognostic_field_configuration%data_type
1301  writer_entries(writer_entry_index)%contents(my_facet_index)%field_type=prognostic_field_configuration%field_type
1302  writer_entries(writer_entry_index)%contents(my_facet_index)%units=prognostic_field_configuration%units
1303  writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions=prognostic_field_configuration%dimensions
1304  writer_entries(writer_entry_index)%contents(my_facet_index)%collective_write=prognostic_field_configuration%collective
1305  writer_entries(writer_entry_index)%contents(my_facet_index)%collective_initialised=.false.
1306  writer_entries(writer_entry_index)%contents(my_facet_index)%prognostic_field=.true.
1307  if (.not. prognostic_field_configuration%collective) then
1308  writer_entries(writer_entry_index)%contents(my_facet_index)%issue_write=io_configuration%my_io_rank==0
1309  else
1310  writer_entries(writer_entry_index)%contents(my_facet_index)%issue_write=.true.
1311  end if
1312  if (prognostic_field_configuration%field_type == array_field_type .or. &
1313  prognostic_field_configuration%field_type == map_field_type) then
1314  if (prognostic_field_configuration%dimensions .gt. 0) then
1315  writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions=prognostic_field_configuration%dimensions
1316  writer_entries(writer_entry_index)%contents(my_facet_index)%dim_size_defns=&
1317  prognostic_field_configuration%dim_size_defns
1318  else
1319  call log_log(log_error, "The writing prognostic field '"//trim(field_name)//"' configuration must have dimensions")
1320  end if
1321  end if
1322  else
1323  call log_log(log_error, "A diagnostic or prognostic configuration for the field '"//trim(field_name)//"' was not found")
1324  end if
1325  if (writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions .gt. 0) then
1326  if (writer_entries(writer_entry_index)%contents(my_facet_index)%dim_size_defns(&
1327  writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions) .eq. "qfields") then
1328  writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions=&
1329  writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions-1
1330  end if
1331  do i=1, writer_entries(writer_entry_index)%contents(my_facet_index)%dimensions
1332  writer_entries(writer_entry_index)%contents(my_facet_index)%actual_dim_size(i)=c_get_integer(&
1333  io_configuration%dimension_sizing, writer_entries(writer_entry_index)%contents(my_facet_index)%dim_size_defns(i))
1334  end do
1335  end if
1336  call check_thread_status(forthread_mutex_init(writer_entries(writer_entry_index)%contents(my_facet_index)%values_mutex, -1))
1337  end subroutine add_specific_field_to_writer_entry
1338 
1343  subroutine handle_duplicate_field_names(writer_entry, duplicate_field_names)
1344  type(writer_type), intent(inout) :: writer_entry
1345  type(hashset_type), intent(inout) :: duplicate_field_names
1346 
1347  integer :: i
1348 
1349  do i=1, size(writer_entry%contents)
1350  if (c_contains(duplicate_field_names, writer_entry%contents(i)%field_name)) then
1351  writer_entry%contents(i)%duplicate_field_name=.true.
1352  end if
1353  end do
1354  end subroutine handle_duplicate_field_names
1355 
1361  integer function get_index_of_group(io_configuration, group_name)
1362  type(io_configuration_type), intent(inout) :: io_configuration
1363  character(len=*), intent(in) :: group_name
1364 
1365  integer :: i, entries
1366 
1367  entries=io_configuration%number_of_groups
1368  do i=1, entries
1369  if (io_configuration%groups(i)%name == group_name) then
1371  return
1372  end if
1373  end do
1375  end function get_index_of_group
1376 
1382  subroutine determine_collective_type_and_optimise_if_possible(io_configuration, field_to_write_information)
1383  type(io_configuration_type), intent(inout) :: io_configuration
1384  type(writer_field_type), intent(inout) :: field_to_write_information
1385 
1386  if (field_to_write_information%dimensions .eq. 3 .and. &
1387  get_dimension_identifier(field_to_write_information%dim_size_defns(1)) == z_index .and. &
1388  get_dimension_identifier(field_to_write_information%dim_size_defns(2)) == y_index .and. &
1389  get_dimension_identifier(field_to_write_information%dim_size_defns(3)) == x_index) then
1390  field_to_write_information%collective_contiguous_optimisation=.true.
1391  call initialise_contiguous_data_regions(io_configuration, field_to_write_information)
1392  else
1393  field_to_write_information%collective_contiguous_optimisation=.false.
1394  end if
1395  field_to_write_information%collective_initialised=.true.
1397 
1403  subroutine initialise_contiguous_data_regions(io_configuration, field_to_write_information)
1404  type(io_configuration_type), intent(inout) :: io_configuration
1405  type(writer_field_type), intent(inout) :: field_to_write_information
1406 
1407  integer :: start(field_to_write_information%dimensions, io_configuration%number_of_moncs), &
1408  count(field_to_write_information%dimensions, io_configuration%number_of_moncs), &
1409  common_starters(io_configuration%number_of_moncs), num_common, num_current_contents, active_dim, other_dim, &
1410  j, k, i, dim_identifier, number_distinct_writes, start_blocks(io_configuration%number_of_moncs), ierr, &
1411  count_blocks(io_configuration%number_of_moncs), current_contents(io_configuration%number_of_moncs), &
1412  monc_write_start_offset_per_dim(field_to_write_information%dimensions,io_configuration%number_of_moncs)
1413  logical :: processed(io_configuration%number_of_moncs)
1414 
1415  type(write_field_collective_descriptor_type), pointer :: collective_descriptor
1416  type(write_field_collective_monc_info_type), pointer :: specific_monc_collective
1417  class(*), pointer :: generic
1418 
1419  processed=.false.
1420  number_distinct_writes=0
1421  do j=1, io_configuration%number_of_moncs
1422  do k=1, field_to_write_information%dimensions
1423  dim_identifier=get_dimension_identifier(field_to_write_information%dim_size_defns(k))
1424  start(k, j)=io_configuration%registered_moncs(j)%local_dim_starts(dim_identifier)
1425  count(k, j)=io_configuration%registered_moncs(j)%local_dim_sizes(dim_identifier)
1426  end do
1427  end do
1428 
1429  do j=1, io_configuration%number_of_moncs
1430  if (.not. processed(j)) then
1431  call get_common_starts(y_index, start(y_index, j), start, common_starters, num_common)
1432  if (num_common == 0) then
1433  call get_common_starts(x_index, start(x_index, j), start, common_starters, num_common)
1434  if (num_common .gt. 0) then
1435  active_dim=x_index
1436  other_dim=y_index
1437  end if
1438  else
1439  active_dim=y_index
1440  other_dim=x_index
1441  end if
1442  number_distinct_writes=number_distinct_writes+1
1443  allocate(collective_descriptor)
1444  allocate(collective_descriptor%absolute_start(field_to_write_information%dimensions), &
1445  collective_descriptor%count(field_to_write_information%dimensions))
1446  start_blocks(number_distinct_writes)=start(other_dim, j)
1447  count_blocks(number_distinct_writes)=count(other_dim, j)
1448  num_current_contents=1
1449  current_contents(num_current_contents)=j
1450  processed(j)=.true.
1451  if (num_common .gt. 0) then
1452  do k=1, num_common
1453  do i=1, num_common
1454  if (.not. processed(common_starters(i)) .and. count(active_dim, j) == count(active_dim, i)) then
1455  if (start(other_dim, common_starters(i)) .lt. start_blocks(number_distinct_writes) .and. &
1456  start(other_dim, common_starters(i)) + count(other_dim, common_starters(i)) &
1457  == start_blocks(number_distinct_writes)) then
1458  start_blocks(number_distinct_writes)=start(other_dim, common_starters(i))
1459  count_blocks(number_distinct_writes)=count_blocks(number_distinct_writes)+count(other_dim, common_starters(i))
1460  processed(common_starters(i))=.true.
1461  num_current_contents=num_current_contents+1
1462  current_contents(num_current_contents)=common_starters(i)
1463  else if (start(other_dim, common_starters(i)) .gt. start_blocks(number_distinct_writes) .and. &
1464  start_blocks(number_distinct_writes) + count_blocks(number_distinct_writes) &
1465  == start(other_dim, common_starters(i))) then
1466  count_blocks(number_distinct_writes)=count_blocks(number_distinct_writes)+count(other_dim, common_starters(i))
1467  processed(common_starters(i))=.true.
1468  num_current_contents=num_current_contents+1
1469  current_contents(num_current_contents)=common_starters(i)
1470  end if
1471  end if
1472  end do
1473  end do
1474  end if
1475  collective_descriptor%absolute_start=start(:,j)
1476  collective_descriptor%count=count(:,j)
1477  collective_descriptor%absolute_start(other_dim)=start_blocks(number_distinct_writes)
1478  collective_descriptor%count(other_dim)=count_blocks(number_distinct_writes)
1479  collective_descriptor%split_dim=other_dim
1480  if (num_current_contents .gt. 0) then
1481  do k=1, num_current_contents
1482  allocate(specific_monc_collective)
1483  specific_monc_collective%relative_dimension_start=(start(other_dim,current_contents(k))-&
1484  start_blocks(number_distinct_writes)) + 1
1485  specific_monc_collective%counts=count(:, current_contents(k))
1486  specific_monc_collective%monc_location=current_contents(k)
1487  specific_monc_collective%monc_source=io_configuration%registered_moncs(current_contents(k))%source_id
1488  generic=>specific_monc_collective
1489  call c_add_generic(collective_descriptor%specific_monc_info, generic, .false.)
1490  end do
1491  end if
1492  generic=>collective_descriptor
1493  call c_add_generic(field_to_write_information%collective_descriptors, generic, .false.)
1494  end if
1495  end do
1496  call lock_mpi()
1497  call mpi_iallreduce(number_distinct_writes, field_to_write_information%max_num_collective_writes, 1, mpi_int, mpi_max, &
1498  io_configuration%io_communicator, field_to_write_information%max_num_collective_writes_request_handle, ierr)
1499  call unlock_mpi()
1500  end subroutine initialise_contiguous_data_regions
1501 
1508  subroutine get_common_starts(dim, val, vals, common_starters, num_common)
1509  integer, intent(in) :: dim, val
1510  integer, dimension(:,:), intent(in) :: vals
1511  integer, dimension(:), intent(out) :: common_starters
1512  integer, intent(out) :: num_common
1513 
1514  integer :: i
1515 
1516  num_common=0
1517  do i=1, size(vals, 2)
1518  if (vals(dim, i) == val) then
1519  num_common=num_common+1
1520  common_starters(num_common)=i
1521  end if
1522  end do
1523  end subroutine get_common_starts
1524 
1529  integer function get_dimension_identifier(dim_name, is_auto_dimension)
1530  character(len=*), intent(in) :: dim_name
1531  logical, intent(out), optional :: is_auto_dimension
1532 
1533  integer :: dash_idx
1534  logical :: is_modified_size
1535 
1536  dash_idx=index(dim_name, "_")
1537  dash_idx=dash_idx-1
1538  is_modified_size=dash_idx .ne. -1
1539  if (.not. is_modified_size) dash_idx=len_trim(dim_name)
1540 
1541  if (dim_name(:dash_idx) .eq. "z" .or. dim_name(:dash_idx) .eq. "zn") then
1542  get_dimension_identifier=z_index
1543  else if (dim_name(:dash_idx) .eq. "y") then
1544  get_dimension_identifier=y_index
1545  else if (dim_name(:dash_idx) .eq. "x") then
1546  get_dimension_identifier=x_index
1547  else
1549  end if
1550 
1551  if (present(is_auto_dimension)) is_auto_dimension=is_modified_size
1552  end function get_dimension_identifier
1553 end module writer_federator_mod
type(writer_type), dimension(:), allocatable, volatile writer_entries
type(map_type) function sort_applicable_time_points(unsorted_timepoints)
Sorts the time points based upon their timestep, smallest to largest. Note that this is a bubble sort...
type(writer_type) function, pointer, public get_writer_entry_from_netcdf(field_name, timestep, terminated)
Looks up and retrieves the writer entry that corresponds to this NetCDF file state.
Performs time averaged, time manipulation and only returns a value if the output frequency determines...
type(map_type) function extract_applicable_time_points(start_time, end_time)
Extracts the applicable time points from the overall map that lie within a specific range...
logical, volatile currently_writing
integer function forthread_rwlock_init(rwlock_id, attr_id)
Definition: forthread.F90:504
Retrieves the key currently being held at a specific index in the map or "" if the index > map elemen...
Gets a specific generic element out of the list, stack, queue or map with the corresponding key...
subroutine, public finalise_instantaneous_manipulation()
Finalises the instantaneous time manipulation.
logical function, public is_io_server_state_writer_ready(timestep)
Determines whether the IO server state writer is ready (i.e. state is at a specific level for the tim...
Returns whether a collection is empty.
subroutine handle_duplicate_field_names(writer_entry, duplicate_field_names)
Marks duplicate field names in a writer entry as duplicates so that the NetCDF layer can then deal wi...
Puts an integer key-value pair into the map.
type(data_values_type) function, public perform_none_time_manipulation(instant_values, output_frequency, field_name, timestep, time)
Performs no time manipulation and returns data.
integer function forthread_mutex_unlock(mutex_id)
Definition: forthread.F90:302
type(data_values_type) function, public perform_instantaneous_time_manipulation(instant_values, output_frequency, field_name, timestep, time)
Performs the instantaneous time manipulation and returns data only if this is to be written to the st...
subroutine, public inform_writer_federator_fields_present(io_configuration, field_names, diag_field_names_and_roots)
Informs the writer federator that specific fields are present and should be reflected in the diagnost...
Pushes a generic element onto the stack or queue.
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
type(hashset_type), volatile q_field_names
integer, parameter, public array_field_type
Definition: ioclient.F90:38
subroutine, public provide_ordered_field_to_writer_federator(io_configuration, field_name, field_namespace, field_values, timestep, time, source)
integer function forthread_mutex_destroy(mutex_id)
Definition: forthread.F90:265
logical function, public get_diagnostic_field_configuration(io_configuration, field_name, field_namespace, diagnostic_config)
Retrieves the diagnostics field configuration corresponding to a specific field name and returns whet...
Reads the IO server state that was stored in a NetCDF checkpoint file.
subroutine, public close_netcdf_file(io_configuration, field_name, timestep)
Call back for the inter IO reduction which actually does the NetCDF file closing which is a collectiv...
Contains functionality for managing and extracting data from the raw data dumps that the IO server re...
Definition: datautils.F90:3
Performs instantaneous time manipulation and only returns a value if the output frequency determines ...
integer function add_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, my_facet_index, field_name, field_namespace, writer_field_names, duplicate_field_names, diagnostic_generation_frequency)
Adds a field to the writer entry, this will split the Q fields. However at initialisation we don't kn...
Logging utility.
Definition: logging.F90:2
subroutine close_diagnostics_file(io_configuration, writer_entry, timestep, time)
Closes the diagnostics file, this is done via a global callback to issue the closes synchronously (co...
logical function, public get_scalar_logical_from_monc(io_configuration, source, data_id, data_dump, key)
Retrieves a single logical element (scalar) from the data dump.
Definition: datautils.F90:328
integer function get_dimension_identifier(dim_name, is_auto_dimension)
Translates a dimension name to its numeric corresponding identifier.
subroutine determine_if_outstanding_field_can_be_written(io_configuration, writer_entry, specific_field)
For a specific field wil determine and handle any outstanding fields writes until an outstanding writ...
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17
integer, parameter, public z_index
Grid index parameters.
Definition: grids.F90:14
subroutine check_writer_trigger(io_configuration, writer_entry_index, timestep, time, terminated)
Checks a writer trigger and issues a file creation along with field write if the conditions (time or ...
integer, parameter, public io_state_type
integer, parameter, public field_type
subroutine, public issue_actual_write(io_configuration, writer_entry, timestep, time, terminated_write)
Issues the actual file creation, write of available fields and closure if all completed.
logical function, public is_field_used_by_writer_federator(field_name, field_namespace)
Determines whether a field is used by the writer federator or not.
Abstraction layer around MPI, this issues and marshals the lower level communication details...
logical function, public is_field_split_on_q(field_name)
Determines whether a field is split on Q or not.
integer function forthread_rwlock_rdlock(lock_id)
Definition: forthread.F90:514
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
type(hashmap_type), volatile time_points
subroutine determine_if_field_can_be_written(io_configuration, writer_entry, specific_field, timestep, previous_write_timestep, write_time, previous_write_time, field_written)
Determines if a file can be written to its overarching write representation. If so then a write is is...
Pops a generic element off the stack or queue.
subroutine, public log_master_log(level, message)
Will log just from the master process.
Definition: logging.F90:47
integer, parameter, public log_debug
Log DEBUG, INFO, WARNING and ERROR messages.
Definition: logging.F90:14
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
Definition: collections.F90:94
subroutine, public inform_writer_federator_time_point(io_configuration, source, data_id, data_dump)
integer function forthread_mutex_init(mutex_id, attr_id)
Definition: forthread.F90:274
Gets a specific integer element out of the list, stack, queue or map with the corresponding key...
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
integer, parameter, public double_data_type
Definition: ioclient.F90:40
subroutine, public store_io_server_state(io_configuration, writer_entries, time_points, file_writer_information, timestep)
Stores the IO server state in the NetCDF file.
subroutine, public initialise_writer_federator(io_configuration, diagnostic_generation_frequency, continuation_run)
Initialises the write federator and configures it based on the user configuration. Also initialises the time manipulations.
Converts data types to strings.
Definition: conversions.F90:36
type(hashmap_type), volatile collective_q_field_dims
integer, parameter, public instantaneous_type
integer function forthread_rwlock_wrlock(lock_id)
Definition: forthread.F90:532
subroutine, public perform_global_callback(io_configuration, field_name, timestep, completion_procedure)
Performs a global callback.
The NetCDF file type writer which performs actual writing of NetCDF files to the parallel filesystem...
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
subroutine provide_ordered_field_to_writer_federator_real_values(io_configuration, field_name, field_namespace, field_values, timestep, time, source)
Provides fields (either diagnostics or prognostics) to the write federator which will action these as...
logical function, public get_prognostic_field_configuration(io_configuration, field_name, field_namespace, prognostic_config, prognostic_containing_data_defn)
Retrieves the prognostic field configuration corresponding to a specific field name and returns wheth...
Writer types which are shared across writing functionality. Also includes serialisation functionality...
Definition: writer_types.F90:2
integer, parameter, public string_data_type
Definition: ioclient.F90:40
This defines some constants and procedures that are useful to the IO server and clients that call it...
Definition: ioclient.F90:3
logical function, public is_field_present(io_configuration, source, data_id, key)
Definition: datautils.F90:146
Map data structure that holds string (length 20 maximum) key value pairs.
Definition: collections.F90:86
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Definition: threadpool.F90:5
Global callback inter IO, which registers the callback with identifiers and then the procedure is act...
subroutine get_common_starts(dim, val, vals, common_starters, num_common)
Retrieves the number of common starting points that match a specific input value. ...
subroutine add_specific_field_to_writer_entry(io_configuration, writer_entry_index, io_config_facet_index, my_facet_index, field_name, field_namespace, writer_field_names, duplicate_field_names, timestep_frequency, diagnostic_field_configuration, prognostic_field_configuration)
Adds a specific field and its information to a writer entry.
Returns the number of elements in the collection.
subroutine write_collective_write_value(result_values, writer_index, contents_index, source, lookup_key)
Writes the collective values, this is held differently to independent values which are written direct...
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
Definition: threadpool.F90:229
subroutine, public init_instantaneous_manipulation()
Initialises the instantaneous time manipulation.
integer function forthread_mutex_lock(mutex_id)
Definition: forthread.F90:284
subroutine initialise_contiguous_data_regions(io_configuration, field_to_write_information)
Will initialise the collective data regions that form contiguous blocks within the data...
subroutine handle_close_diagnostics_globalcallback(io_configuration, values, field_name, timestep)
Call back for the inter IO reduction which actually does the NetCDF file closing which is a collectiv...
integer function get_field_number_of_fields(io_configuration, field_name, field_namespace, num_q_fields)
Retrieves the number of fields that make up this field, if it is a Q field then it will be split into...
logical function check_for_and_issue_chain_write(io_configuration, writer_entry)
Will check whether there are any pending writes and if so will issue a chain write for this...
subroutine, public define_netcdf_file(io_configuration, file_writer_information, timestep, time, time_points, termination_write)
Defines a NetCDF file - which creates it, defines all dimensions and variables. This must be called b...
subroutine, public unlock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then unlocks MPI.
Collection data structures.
Definition: collections.F90:7
integer function, public get_monc_location(io_configuration, source)
A helper function to get the location of a MONC's configuration in the IO data structure.
type(hashmap_type), volatile q_field_splits
subroutine register_pending_file_write(writer_entry_index, timestep, time, terminated_write)
Registers a pending file write which will be actioned later on.
subroutine, public check_writer_for_trigger(io_configuration, source, data_id, data_dump)
Checks all writer entries for any trigger fires and issues the underlying file storage.
Performs no time manipulation and returns the value, basically a no-op.
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Definition: logging.F90:12
logical function, public is_time_averaged_time_manipulation_ready_to_write(latest_time, output_frequency, write_time, latest_timestep, write_timestep)
Converts data types to real.
Definition: conversions.F90:58
integer, parameter, public string_length
Default length of strings.
Definition: datadefn.F90:10
subroutine, public lock_mpi()
If we are explicitly managing MPI thread safety (SERIALIZED mode) then locks MPI. ...
logical function, public is_none_time_manipulation_ready_to_write(latest_time, output_frequency, write_time, latest_timestep, write_timestep)
integer function get_group_number_of_fields(io_configuration, group_members, num_q_fields, namespace)
Retrieves the number of fields within a group of fields.
integer, parameter, public time_averaged_type
This federates over the writing of diagnostic and prognostic data to the file system. It also manages the time manipulation of fields and groups.
List data structure which implements a doubly linked list. This list will preserve its order...
Definition: collections.F90:60
subroutine enable_specific_field_by_name(field_name, diagnostics_mode, expected_here)
Enables a specific field by its name, this will locate all the fields with this name and enable them...
Queue (FIFO) data structure.
Definition: collections.F90:70
Adds a generic element to the end of the list.
Functionality to support the different types of grid and abstraction between global grids and local o...
Definition: grids.F90:5
integer, volatile time_points_rwlock
integer function forthread_rwlock_destroy(rwlock_id)
Definition: forthread.F90:495
integer function add_group_of_fields_to_writer_entry(io_configuration, writer_entry_index, facet_index, current_field_index, writer_field_names, duplicate_field_names, diagnostic_generation_frequency)
Adds a group of fields to a writer entry, groups are expanded out into individual fields...
logical function, public log_is_master()
Determines whether the process is the master logging process. This might be preferable rather than ca...
Definition: logging.F90:66
subroutine, public finalise_time_averaged_manipulation()
Finalises the reduction action, waiting for all outstanding requests and then freeing data...
subroutine, public reactivate_writer_federator_state(io_configuration, writer_entries, time_points)
Reactivates the writer federator and everything beneath it (i.e. just not the writer field manager...
logical function, public is_instantaneous_time_manipulation_ready_to_write(latest_time, output_frequency, write_time, latest_timestep, write_timestep)
integer, volatile currently_writing_mutex
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
subroutine, public provide_q_field_names_to_writer_federator(q_provided_field_names)
Provides the Q field names to the write federator, this is required as on initialisation we don't kno...
integer function forthread_rwlock_unlock(lock_id)
Definition: forthread.F90:550
Puts a generic key-value pair into the map.
integer function, public get_scalar_integer_from_monc(io_configuration, source, data_id, data_dump, key)
Retrieves a single integer element (scalar) from the data dump.
Definition: datautils.F90:372
real(kind=double_precision) function, public get_scalar_real_from_monc(io_configuration, source, data_id, data_dump, key)
Retreives a scalar real with a corresponding key from the raw data dump.
Definition: datautils.F90:416
integer, volatile collective_contiguous_initialisation_mutex
subroutine, public initialise_netcdf_filetype()
Initialises the NetCDF writing functionality.
Hashset structure which will store unique strings. The hashing aspect means that lookup is very fast ...
subroutine, public init_time_averaged_manipulation()
Initialises the reduction action.
integer function get_index_of_group(io_configuration, group_name)
Searches the IO server configuration for a group with a specific name and returns the index to that g...
type(hashset_type), volatile used_field_names
Configuration associated with the representation of a specific data field.
logical function get_next_applicable_writer_entry(field_name, field_namespace, writer_index_point, contents_index_point)
Retrieves the index of the next writer which uses a specific field. If none is found then returns fal...
Converts data types to integers.
Definition: conversions.F90:47
Determines whether or not a map contains a specific key.
subroutine, public finalise_writer_federator()
Finalises the write federator and the manipulations.
Gets a specific double precision real element out of the list, stack, queue or map with the correspon...
integer function get_size_of_collective_q(io_configuration, field_name, source)
Retrieves the data size for each Q entry of a collective Q field for the specific source MONC that ha...
Adds a string to the end of the list.
integer function, public log_get_logging_level()
Retrieves the current logging level.
Definition: logging.F90:122
integer, parameter, public map_field_type
Field data type identifiers.
Definition: ioclient.F90:38
integer, parameter, public y_index
Definition: grids.F90:14
integer function get_total_number_writer_fields(io_configuration, writer_entry_index)
Determines the total number of fields that make up a writer entry, this is all the fields of the grou...
Gets a specific string element out of the list, stack, queue or map with the corresponding key...
Parses the XML configuration file to produce the io configuration description which contains the data...
subroutine, public write_variable(io_configuration, field_to_write_information, filename, timestep, time)
Writes the contents of a variable to the NetCDF file. This also removes the written entries from the ...
Retrieves the double precision real value held at the specific map index or null if index > map eleme...
subroutine, public finalise_netcdf_filetype()
Finalises the NetCDF writing functionality.
The IO server state module which will write out the current state of the IO server to a NetCDF file...
integer, parameter, public group_type
integer, parameter, public x_index
Definition: grids.F90:14
type(data_values_type) function, public perform_timeaveraged_time_manipulation(instant_values, output_frequency, field_name, timestep, time)
Performs the time averaged manipulation and only returns values if these are to be stored (i...
real(kind=double_precision) function, public conv_single_real_to_double(input_real)
Converts from a single to double precision real. This applies some rounding to a certain number of de...
subroutine provide_ordered_single_field_to_writer_federator(io_configuration, field_name, field_namespace, field_values, timestep, time, source)
Provides a single ordered field, i.e. Q fields have been split by this point.
integer, parameter, public none_type
Puts a double precision real key-value pair into the map.
Removes a specific element from the list or map.
subroutine determine_collective_type_and_optimise_if_possible(io_configuration, field_to_write_information)
Determines whether it can optimise a specific collective field. If the field fits into certain limite...