MONC
diagnostic_federator.F90
Go to the documentation of this file.
1 
32  implicit none
33 
34 #ifndef TEST_MODE
35  private
36 #endif
37 
38 
40 
41 
43  integer :: communication_corresponding_activities_rwlock, completed_diagnostics_rwlock, completed_num, completed_num_mutex
44  type(list_type) :: diagnostic_entries
45  type(hashset_type) :: completed_diagnostics
46  type(hashmap_type) :: communication_corresponding_activities
48 
49 
51  integer :: timestep, completed_fields_rwlock, outstanding_fields_rwlock, activity_completion_mutex, source, &
52  source_location, number_diags_outstanding, number_datas_outstanding, deletion_metric_mutex
53  real(kind=DEFAULT_PRECISION) :: time
54  type(hashset_type) :: outstanding_fields, completed_activities
55  type(hashmap_type) :: completed_fields
57 
58 
60  character(len=STRING_LENGTH) :: diagnostic_name, diagnostic_namespace, uuid
61  type(list_type) :: activities
62  integer :: generation_timestep_frequency
63  logical :: collective
64  end type diagnostics_type
65 
66 
68  integer :: activity_type, communication_operator, root
69  real(kind=DEFAULT_PRECISION) :: result
70  type(list_type) :: required_fields
71  type(map_type) :: activity_attributes
72  character(len=STRING_LENGTH) :: result_name, activity_name, uuid
73  procedure(perform_activity), pointer, nopass :: operator_procedure
75 
78  type(diagnostics_type), volatile, dimension(:), allocatable :: diagnostic_definitions
81 
84 contains
85 
89  type(hashmap_type) function initialise_diagnostic_federator(io_configuration)
90  type(io_configuration_type), intent(inout) :: io_configuration
91 
96  call init_reduction_inter_io(io_configuration)
97  call init_broadcast_inter_io(io_configuration)
98  call init_allreduction_inter_io(io_configuration)
99  call init_global_callback_inter_io(io_configuration)
103  current_point=0
105 
110  logical function check_diagnostic_federator_for_completion(io_configuration)
111  type(io_configuration_type), intent(inout) :: io_configuration
112 
113  check_diagnostic_federator_for_completion=check_reduction_inter_io_for_completion(io_configuration)
115  check_diagnostic_federator_for_completion=check_broadcast_inter_io_for_completion(io_configuration)
117  check_diagnostic_federator_for_completion=check_allreduction_inter_io_for_completion(io_configuration)
118  end if
119  end if
121 
124  subroutine finalise_diagnostic_federator(io_configuration)
125  type(io_configuration_type), intent(inout) :: io_configuration
126 
127  call finalise_broadcast_inter_io()
128  call finalise_reduction_inter_io(io_configuration)
129  call finalise_allreduction_inter_io(io_configuration)
130  call finalise_global_callback_inter_io(io_configuration)
131  call check_thread_status(forthread_rwlock_destroy(timestep_entries_rwlock))
132  call check_thread_status(forthread_rwlock_destroy(all_diagnostics_per_timestep_rwlock))
133  call check_thread_status(forthread_mutex_destroy(clean_progress_mutex))
134  call finalise_operators()
135  end subroutine finalise_diagnostic_federator
136 
141  type(hashmap_type) function determine_diagnostics_fields_available(monc_field_names)
142  type(hashset_type), intent(inout) :: monc_field_names
143 
144  integer :: i, k, num_fields, diag_root
145  type(diagnostics_activity_type) :: specific_activity
146  type(hashset_type) :: result_names_for_activities
147  type(iterator_type) :: required_fields_iterator, activities_iterator
148  character(len=STRING_LENGTH) :: specific_field_name
149  logical :: diagnostic_provided
150 
151  if (.not. allocated(diagnostic_definitions)) return
152 
153  do i=1, size(diagnostic_definitions)
154  diag_root=-1
155  diagnostic_provided=.true.
156  activities_iterator=c_get_iterator(diagnostic_definitions(i)%activities)
157  do while (c_has_next(activities_iterator))
158  specific_activity=retrieve_next_activity(activities_iterator)
159  call c_add_string(result_names_for_activities, specific_activity%result_name)
160  end do
161 
162  activities_iterator=c_get_iterator(diagnostic_definitions(i)%activities)
163  do while (c_has_next(activities_iterator))
164  specific_activity=retrieve_next_activity(activities_iterator)
165  if (specific_activity%root .ne. -1 .and. diag_root == -1) diag_root=specific_activity%root
166  required_fields_iterator=c_get_iterator(specific_activity%required_fields)
167  do while (c_has_next(required_fields_iterator))
168  specific_field_name=c_next_string(required_fields_iterator)
169  if (.not. c_contains(result_names_for_activities, specific_field_name) .and. &
170  .not. c_contains(monc_field_names, specific_field_name)) then
171  diagnostic_provided=.false.
172  exit
173  end if
174  end do
175  if (.not. diagnostic_provided) exit
176  end do
177  if (diagnostic_provided) then
178  call c_put_integer(determine_diagnostics_fields_available, diagnostic_definitions(i)%diagnostic_name, diag_root)
179  call c_add_string(available_fields, diagnostic_definitions(i)%diagnostic_name)
180  end if
181  call c_free(result_names_for_activities)
182  end do
184 
191  subroutine pass_fields_to_diagnostics_federator(io_configuration, source, data_id, data_dump)
192  type(io_configuration_type), intent(inout) :: io_configuration
193  integer, intent(in) :: source, data_id
194  character, dimension(:), allocatable, intent(in) :: data_dump
195 
196  integer :: timestep
197  real(kind=DEFAULT_PRECISION) :: time
198  logical :: terminated
199  type(diagnostics_at_timestep_type), pointer :: timestep_entry
200  type(all_diagnostics_at_timestep_type), pointer :: diagnostics_by_timestep
201 
202  if (.not. allocated(diagnostic_definitions)) return
203  if (c_is_empty(available_fields)) return
204 
205  if (is_field_present(io_configuration, source, data_id, "timestep") .and. &
206  is_field_present(io_configuration, source, data_id, "time")) then
207  timestep=get_scalar_integer_from_monc(io_configuration, source, data_id, data_dump, "timestep")
208  time=get_scalar_real_from_monc(io_configuration, source, data_id, data_dump, "time")
209  timestep_entry=>find_or_register_timestep_entry(io_configuration, timestep, source, time)
210  diagnostics_by_timestep=>get_diagnostics_by_timestep(timestep, .true.)
211  terminated=.false.
212  if (is_field_present(io_configuration, source, data_id, "terminated")) then
213  terminated=get_scalar_logical_from_monc(io_configuration, source, data_id, data_dump, "terminated")
214  end if
215  if (.not. terminated) call clean_diagnostic_states(timestep)
216  call issue_communication_calls(io_configuration, timestep_entry, diagnostics_by_timestep, source, data_id, data_dump)
217  call check_diagnostics_entries_against_data(io_configuration, source, data_id, data_dump, timestep_entry)
218  call check_all_activities_against_completed_fields(io_configuration, timestep_entry, diagnostics_by_timestep)
219 
220  call check_thread_status(forthread_mutex_lock(timestep_entry%deletion_metric_mutex))
221  timestep_entry%number_datas_outstanding=timestep_entry%number_datas_outstanding-1
222  call check_thread_status(forthread_mutex_unlock(timestep_entry%deletion_metric_mutex))
223  else
224  call log_log(log_warn, "Can not run the diagnostics federator without a timestep and time field in the MONC data")
225  end if
227 
231  subroutine check_all_activities_against_completed_fields(io_configuration, timestep_entry, diagnostics_by_timestep)
232  type(io_configuration_type), intent(inout) :: io_configuration
233  type(diagnostics_at_timestep_type), intent(inout) :: timestep_entry
234  type(all_diagnostics_at_timestep_type), intent(inout) :: diagnostics_by_timestep
235 
236  integer :: j, num_diags
237  type(diagnostics_activity_type), pointer :: activity
238  character(len=STRING_LENGTH) :: field_name, activity_diag_key
239  logical :: updated_entry, entry_in_completed_diagnostics, operator_produced_values
240  type(data_values_type) :: value_to_send
241  type(iterator_type) :: activities_iterator
242 
243  updated_entry=.true.
244 
245  call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
246  do while (updated_entry)
247  updated_entry=.false.
248  num_diags=size(diagnostic_definitions)
249  do j=1, num_diags
250  call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
251  entry_in_completed_diagnostics=c_contains(diagnostics_by_timestep%completed_diagnostics, &
252  diagnostic_definitions(j)%diagnostic_name)
253  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
254  if (diagnostic_definitions(j)%collective .or. .not. entry_in_completed_diagnostics) then
255  activities_iterator=c_get_iterator(diagnostic_definitions(j)%activities)
256  do while (c_has_next(activities_iterator))
257  activity=>retrieve_next_activity(activities_iterator)
258  activity_diag_key=generate_activity_diagnostic_key(diagnostic_definitions(j), activity)
259  if (.not. c_contains(timestep_entry%completed_activities, activity_diag_key)) then
260  if (are_fields_available_for_activity(timestep_entry, activity)) then
261  call c_add_string(timestep_entry%completed_activities, activity_diag_key)
262  updated_entry=.true.
263  if (activity%activity_type == operator_type) then
264  operator_produced_values=handle_operator_completion(io_configuration, timestep_entry, activity)
265  if (operator_produced_values .and. activity%result_name == diagnostic_definitions(j)%diagnostic_name) then
266  call handle_diagnostic_calculation_completed(io_configuration, j, timestep_entry, diagnostics_by_timestep)
267  end if
268  else if (activity%activity_type == reduction_type .or. activity%activity_type == broadcast_type &
269  .or. activity%activity_type == allreduction_type) then
270  field_name=c_get_string(activity%required_fields, 1)
271  call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
272  value_to_send=get_data_value_by_field_name(timestep_entry%completed_fields, field_name)
273  call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
274  call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
275  call perform_inter_io_communication(io_configuration, timestep_entry, diagnostics_by_timestep, &
276  activity, value_to_send, field_name)
277  call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
278  end if
279  end if
280  end if
281  end do
282  end if
283  end do
284  end do
285  call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
287 
291  logical function handle_operator_completion(io_configuration, timestep_entry, specific_activity)
292  type(io_configuration_type), intent(inout) :: io_configuration
293  type(diagnostics_at_timestep_type), intent(inout) :: timestep_entry
294  type(diagnostics_activity_type), intent(inout) :: specific_activity
295 
296  type(data_values_type), pointer :: operator_result
297  real(kind=DEFAULT_PRECISION), dimension(:), allocatable :: operator_result_values
298  class(*), pointer :: generic
299 
300  call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
301  call specific_activity%operator_procedure(io_configuration, timestep_entry%completed_fields, &
302  specific_activity%activity_attributes, timestep_entry%source_location, timestep_entry%source, operator_result_values)
303  call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
304 
305  if (allocated(operator_result_values)) then
306  allocate(operator_result)
307  allocate(operator_result%values(size(operator_result_values)), source=operator_result_values)
308  operator_result%values=operator_result_values
309  deallocate(operator_result_values)
310  generic=>operator_result
311  call check_thread_status(forthread_rwlock_wrlock(timestep_entry%completed_fields_rwlock))
312  call c_put_generic(timestep_entry%completed_fields, specific_activity%result_name, generic, .false.)
313  call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
314  call check_thread_status(forthread_rwlock_wrlock(timestep_entry%outstanding_fields_rwlock))
315  call c_remove(timestep_entry%outstanding_fields, specific_activity%result_name)
316  call check_thread_status(forthread_rwlock_unlock(timestep_entry%outstanding_fields_rwlock))
318  else
320  end if
321  end function handle_operator_completion
322 
329  subroutine handle_completion(io_configuration, values, field_name, timestep)
330  type(io_configuration_type), intent(inout) :: io_configuration
331  real(DEFAULT_PRECISION), dimension(:) :: values
332  character(len=STRING_LENGTH) :: field_name
333  integer :: timestep
334 
335  type(all_diagnostics_at_timestep_type), pointer :: diagnostics_by_timestep
336  type(data_values_type), pointer :: result_to_add
337  type(iterator_type) :: iterator
338 
339  diagnostics_by_timestep=>get_diagnostics_by_timestep(timestep, .true.)
340  call check_thread_status(forthread_mutex_lock(diagnostics_by_timestep%completed_num_mutex))
341  diagnostics_by_timestep%completed_num=diagnostics_by_timestep%completed_num+1
342  call check_thread_status(forthread_mutex_unlock(diagnostics_by_timestep%completed_num_mutex))
343  if (.not. c_is_empty(diagnostics_by_timestep%diagnostic_entries)) then
344  iterator=c_get_iterator(diagnostics_by_timestep%diagnostic_entries)
345  do while (c_has_next(iterator))
346  allocate(result_to_add)
347  allocate(result_to_add%values(size(values)), source=values)
348  call handle_completion_for_specific_monc_timestep_entry(io_configuration, result_to_add, &
349  field_name, retrieve_next_specific_monc_timestep_entry(iterator), diagnostics_by_timestep)
350  end do
351  end if
352  call check_thread_status(forthread_rwlock_wrlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
353  call c_remove(diagnostics_by_timestep%communication_corresponding_activities, trim(field_name))
354  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
355  call check_thread_status(forthread_mutex_lock(diagnostics_by_timestep%completed_num_mutex))
356  diagnostics_by_timestep%completed_num=diagnostics_by_timestep%completed_num-1
357  call check_thread_status(forthread_mutex_unlock(diagnostics_by_timestep%completed_num_mutex))
358  end subroutine handle_completion
359 
367  subroutine handle_completion_for_specific_monc_timestep_entry(io_configuration, result_to_add, &
368  field_name, timestep_entry, diagnostics_by_timestep)
369  type(io_configuration_type), intent(inout) :: io_configuration
370  type(data_values_type), pointer, intent(in) :: result_to_add
371  character(len=STRING_LENGTH), intent(in) :: field_name
372  type(diagnostics_at_timestep_type), pointer :: timestep_entry
373  type(all_diagnostics_at_timestep_type), intent(inout) :: diagnostics_by_timestep
374 
375  logical :: entry_in_completed_diagnostics
376  type(diagnostics_activity_type), pointer :: activity
377  class(*), pointer :: generic
378  integer :: i
379 
380  activity=>get_comm_activity_from_fieldname(diagnostics_by_timestep, field_name)
381 
382  generic=>result_to_add
383  call check_thread_status(forthread_rwlock_wrlock(timestep_entry%completed_fields_rwlock))
384  call c_put_generic(timestep_entry%completed_fields, trim(activity%result_name), generic, .false.)
385  call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
386 
387  call check_thread_status(forthread_rwlock_wrlock(timestep_entry%outstanding_fields_rwlock))
388  ! is field name here correct?
389  call c_remove(timestep_entry%outstanding_fields, trim(field_name))
390  call check_thread_status(forthread_rwlock_unlock(timestep_entry%outstanding_fields_rwlock))
391 
392  do i=1, size(diagnostic_definitions)
393  if (activity%result_name == diagnostic_definitions(i)%diagnostic_name) then
394  if (diagnostic_definitions(i)%collective) then
395  call handle_diagnostic_calculation_completed(io_configuration, i, timestep_entry, diagnostics_by_timestep)
396  else
397  call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
398  entry_in_completed_diagnostics=c_contains(diagnostics_by_timestep%completed_diagnostics, &
399  diagnostic_definitions(i)%diagnostic_name)
400  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
401  if (.not. entry_in_completed_diagnostics) then
402  call handle_diagnostic_calculation_completed(io_configuration, i, timestep_entry, diagnostics_by_timestep)
403  end if
404  end if
405  exit
406  end if
407  end do
408  call check_all_activities_against_completed_fields(io_configuration, timestep_entry, diagnostics_by_timestep)
410 
414  subroutine handle_diagnostic_calculation_completed(io_configuration, diagnostic_index, timestep_entry, diagnostics_by_timestep)
415  type(io_configuration_type), intent(inout) :: io_configuration
416  integer, intent(in) :: diagnostic_index
417  type(diagnostics_at_timestep_type), intent(inout) :: timestep_entry
418  type(all_diagnostics_at_timestep_type), intent(inout) :: diagnostics_by_timestep
419 
420  type(data_values_type), pointer :: diagnostics_value_entry
421  type(iterator_type) :: iterator
422  type(diagnostics_at_timestep_type), pointer :: activity_at_index
423 
424  call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
425  diagnostics_value_entry=>get_data_value_by_field_name(timestep_entry%completed_fields, &
426  trim(diagnostic_definitions(diagnostic_index)%diagnostic_name))
427  call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
428 
429  if (diagnostic_definitions(diagnostic_index)%collective) then
430  call provide_field_to_writer_federator(io_configuration, diagnostic_definitions(diagnostic_index)%diagnostic_name, &
431  diagnostic_definitions(diagnostic_index)%diagnostic_namespace, diagnostics_value_entry%values, &
432  timestep_entry%timestep, timestep_entry%time, &
433  diagnostic_definitions(diagnostic_index)%generation_timestep_frequency, timestep_entry%source)
434  call check_thread_status(forthread_mutex_lock(timestep_entry%deletion_metric_mutex))
435  timestep_entry%number_diags_outstanding=timestep_entry%number_diags_outstanding-1
436  call check_thread_status(forthread_mutex_unlock(timestep_entry%deletion_metric_mutex))
437  if (allocated(diagnostics_value_entry%values)) deallocate(diagnostics_value_entry%values)
438  else
439  call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
440  if (.not. c_contains(diagnostics_by_timestep%completed_diagnostics, &
441  diagnostic_definitions(diagnostic_index)%diagnostic_name)) then
442  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
443  call check_thread_status(forthread_rwlock_wrlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
444  if (.not. c_contains(diagnostics_by_timestep%completed_diagnostics,&
445  diagnostic_definitions(diagnostic_index)%diagnostic_name)) then
446  call c_add_string(diagnostics_by_timestep%completed_diagnostics, &
447  diagnostic_definitions(diagnostic_index)%diagnostic_name)
448  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
449  call provide_field_to_writer_federator(io_configuration, diagnostic_definitions(diagnostic_index)%diagnostic_name, &
450  diagnostic_definitions(diagnostic_index)%diagnostic_namespace, diagnostics_value_entry%values, &
451  timestep_entry%timestep, timestep_entry%time, &
452  diagnostic_definitions(diagnostic_index)%generation_timestep_frequency)
453  iterator=c_get_iterator(diagnostics_by_timestep%diagnostic_entries)
454  do while (c_has_next(iterator))
455  activity_at_index=>retrieve_next_specific_monc_timestep_entry(iterator)
456  call check_thread_status(forthread_mutex_lock(activity_at_index%deletion_metric_mutex))
457  activity_at_index%number_diags_outstanding=activity_at_index%number_diags_outstanding-1
458  call check_thread_status(forthread_mutex_unlock(activity_at_index%deletion_metric_mutex))
459  end do
460  if (allocated(diagnostics_value_entry%values)) deallocate(diagnostics_value_entry%values)
461  else
462  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
463  end if
464  else
465  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
466  end if
467  end if
469 
474  logical function are_fields_available_for_activity(timestep_entry, activity)
475  type(diagnostics_at_timestep_type), intent(inout) :: timestep_entry
476  type(diagnostics_activity_type) :: activity
477 
478  character(len=STRING_LENGTH) :: field_name
479  type(iterator_type) :: iterator
480 
482  if (.not. c_is_empty(activity%required_fields)) then
483  call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
484  iterator=c_get_iterator(activity%required_fields)
485  do while (c_has_next(iterator))
486  field_name=c_next_string(iterator)
487  if (.not. c_contains(timestep_entry%completed_fields, field_name)) then
489  exit
490  end if
491  end do
492  call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
493  end if
495 
502  subroutine perform_inter_io_communication(io_configuration, timestep_entry, all_entries_at_timestep, &
503  activity, value_to_send, communication_field_name)
504  type(io_configuration_type), intent(inout) :: io_configuration
505  type(diagnostics_at_timestep_type), intent(inout) :: timestep_entry
506  type(all_diagnostics_at_timestep_type), intent(inout) :: all_entries_at_timestep
507  type(diagnostics_activity_type), pointer, intent(in) :: activity
508  type(data_values_type), intent(in) :: value_to_send
509  character(len=STRING_LENGTH), intent(in) :: communication_field_name
510 
511  class(*), pointer :: generic
512 
513  generic=>activity
514  call check_thread_status(forthread_rwlock_wrlock(all_entries_at_timestep%communication_corresponding_activities_rwlock))
515  call c_put_generic(all_entries_at_timestep%communication_corresponding_activities, trim(communication_field_name), &
516  generic, .false.)
517  call check_thread_status(forthread_rwlock_unlock(all_entries_at_timestep%communication_corresponding_activities_rwlock))
518 
519  if (activity%activity_type == reduction_type) then
520  call perform_inter_io_reduction(io_configuration, value_to_send%values, size(value_to_send%values), &
521  communication_field_name, activity%communication_operator, activity%root, timestep_entry%timestep, handle_completion)
522  if (activity%root .ne. io_configuration%my_io_rank) then
523  call check_thread_status(forthread_mutex_lock(timestep_entry%deletion_metric_mutex))
524  timestep_entry%number_diags_outstanding=timestep_entry%number_diags_outstanding-1
525  call check_thread_status(forthread_mutex_unlock(timestep_entry%deletion_metric_mutex))
526  end if
527  else if (activity%activity_type == allreduction_type) then
528  call perform_inter_io_allreduction(io_configuration, value_to_send%values, size(value_to_send%values), &
529  communication_field_name, activity%communication_operator, activity%root, timestep_entry%timestep, handle_completion)
530  else if (activity%activity_type == broadcast_type) then
531  call perform_inter_io_broadcast(io_configuration, value_to_send%values, size(value_to_send%values), &
532  communication_field_name, activity%root, timestep_entry%timestep, handle_completion)
533  end if
534  end subroutine perform_inter_io_communication
535 
542  subroutine issue_communication_calls(io_configuration, timestep_entry, diagnostics_by_timestep, source, data_id, data_dump)
543  type(io_configuration_type), intent(inout) :: io_configuration
544  type(diagnostics_at_timestep_type), intent(inout) :: timestep_entry
545  integer, intent(in) :: source, data_id
546  character, dimension(:), allocatable, intent(in) :: data_dump
547  type(all_diagnostics_at_timestep_type), intent(inout) :: diagnostics_by_timestep
548 
549  logical :: completed_diagnostics_entry
550  integer :: j, num_diags
551  type(iterator_type) :: iterator
552  type(diagnostics_activity_type), pointer :: activity
553  character(len=STRING_LENGTH) :: communication_field_name, activity_diag_key
554 
555  call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
556  num_diags=size(diagnostic_definitions)
557  do j=1, num_diags
558  call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
559  completed_diagnostics_entry=c_contains(diagnostics_by_timestep%completed_diagnostics, &
560  diagnostic_definitions(j)%diagnostic_name)
561  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
562  if (.not. completed_diagnostics_entry) then
563  iterator=c_get_iterator(diagnostic_definitions(j)%activities)
564  do while (c_has_next(iterator))
565  activity=>retrieve_next_activity(iterator)
566  activity_diag_key=generate_activity_diagnostic_key(diagnostic_definitions(j), activity)
567  if (.not. c_contains(timestep_entry%completed_activities, activity_diag_key)) then
568  if ((activity%activity_type == reduction_type .or. activity%activity_type == broadcast_type &
569  .or. activity%activity_type == allreduction_type)) then
570  if (.not. c_is_empty(activity%required_fields)) then
571  communication_field_name=c_get_string(activity%required_fields, 1)
572  if (is_field_present(io_configuration, source, data_id, communication_field_name)) then
573  call c_add_string(timestep_entry%completed_activities, activity_diag_key)
574  call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
575  call perform_inter_io_communication(io_configuration, timestep_entry, diagnostics_by_timestep, activity, &
576  get_value_from_monc_data(io_configuration, source, data_id, data_dump, communication_field_name), &
577  communication_field_name)
578  call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
579  end if
580  end if
581  end if
582  end if
583  end do
584  end if
585  end do
586  call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
587  end subroutine issue_communication_calls
588 
596  subroutine check_diagnostics_entries_against_data(io_configuration, source, data_id, data_dump, &
597  timestep_diagnostics_entry)
598  type(io_configuration_type), intent(inout) :: io_configuration
599  integer, intent(in) :: source, data_id
600  character, dimension(:), allocatable, intent(in) :: data_dump
601  type(diagnostics_at_timestep_type), intent(inout) :: timestep_diagnostics_entry
602 
603  type(iterator_type) :: iterator
604  character(len=STRING_LENGTH) :: field_name
605  type(data_values_type), pointer :: field_value
606  type(hashset_type) :: removed_entries
607  class(*), pointer :: generic
608 
609  if (.not. c_is_empty(timestep_diagnostics_entry%outstanding_fields)) then
610  call check_thread_status(forthread_rwlock_rdlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
611  iterator=c_get_iterator(timestep_diagnostics_entry%outstanding_fields)
612  do while (c_has_next(iterator))
613  field_name=c_next_string(iterator)
614  if (is_field_present(io_configuration, source, data_id, field_name)) then
615  field_value=>get_value_from_monc_data(io_configuration, source, data_id, data_dump, field_name)
616  generic=>field_value
617  call check_thread_status(forthread_rwlock_wrlock(timestep_diagnostics_entry%completed_fields_rwlock))
618  call c_put_generic(timestep_diagnostics_entry%completed_fields, trim(field_name), generic, .false.)
619  call check_thread_status(forthread_rwlock_unlock(timestep_diagnostics_entry%completed_fields_rwlock))
620  call c_add_string(removed_entries, field_name)
621  end if
622  end do
623  call check_thread_status(forthread_rwlock_unlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
624  if (.not. c_is_empty(removed_entries)) then
625  iterator=c_get_iterator(removed_entries)
626  call check_thread_status(forthread_rwlock_wrlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
627  do while (c_has_next(iterator))
628  call c_remove(timestep_diagnostics_entry%outstanding_fields, c_next_string(iterator))
629  end do
630  call check_thread_status(forthread_rwlock_unlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
631  end if
632  end if
633  call c_free(removed_entries)
635 
638  subroutine clean_diagnostic_states(current_timestep)
639  integer, intent(in) :: current_timestep
640 
641  integer :: have_lock, outstanding_diags, outstanding_datas
642  type(list_type) :: entries_to_remove
643  type(iterator_type) :: iterator, all_diagnostics_iterator
644  type(mapentry_type) :: all_diag_mapentry
645  type(all_diagnostics_at_timestep_type), pointer :: specific_all_diagnostics_for_ts
646  type(diagnostics_at_timestep_type), pointer :: specific_monc_timestep_entry
647  logical :: all_completed
648  character(len=STRING_LENGTH) :: entry_key
649 
650  have_lock=forthread_mutex_trylock(clean_progress_mutex)
651  if (have_lock == 0) then
652  if (previous_viewed_timestep .ne. current_timestep) then
654  previous_viewed_timestep=current_timestep
655  end if
658  call check_thread_status(forthread_rwlock_rdlock(all_diagnostics_per_timestep_rwlock))
659  all_diagnostics_iterator=c_get_iterator(all_diagnostics_at_timestep)
660  do while(c_has_next(all_diagnostics_iterator))
661  all_diag_mapentry=c_next_mapentry(all_diagnostics_iterator)
662  specific_all_diagnostics_for_ts=>retrieve_diagnostics(all_diag_mapentry)
663  call check_thread_status(forthread_mutex_lock(specific_all_diagnostics_for_ts%completed_num_mutex))
664  if (specific_all_diagnostics_for_ts%completed_num == 0) then
665  iterator=c_get_iterator(specific_all_diagnostics_for_ts%diagnostic_entries)
666  all_completed=.true.
667  do while (c_has_next(iterator))
668  specific_monc_timestep_entry=>retrieve_next_specific_monc_timestep_entry(iterator)
669  call check_thread_status(forthread_mutex_lock(specific_monc_timestep_entry%deletion_metric_mutex))
670  outstanding_diags=specific_monc_timestep_entry%number_diags_outstanding
671  outstanding_datas=specific_monc_timestep_entry%number_datas_outstanding
672  call check_thread_status(forthread_mutex_unlock(specific_monc_timestep_entry%deletion_metric_mutex))
673  if (outstanding_diags .gt. 0 .or. outstanding_datas .gt. 0) then
674  all_completed=.false.
675  exit
676  end if
677  end do
678  if (all_completed) then
679  call c_add_string(entries_to_remove, all_diag_mapentry%key)
680  end if
681  end if
682  call check_thread_status(forthread_mutex_unlock(specific_all_diagnostics_for_ts%completed_num_mutex))
683  end do
684  call check_thread_status(forthread_rwlock_unlock(all_diagnostics_per_timestep_rwlock))
685 
686  if (.not. c_is_empty(entries_to_remove)) then
687  call check_thread_status(forthread_rwlock_wrlock(all_diagnostics_per_timestep_rwlock))
688  iterator=c_get_iterator(entries_to_remove)
689  do while (c_has_next(iterator))
690  entry_key=c_next_string(iterator)
691  call deallocate_diagnostics_at_timestep(entry_key)
692  call c_remove(all_diagnostics_at_timestep, entry_key)
693  end do
694  call check_thread_status(forthread_rwlock_unlock(all_diagnostics_per_timestep_rwlock))
695  call c_free(entries_to_remove)
696  end if
697  end if
698  call check_thread_status(forthread_mutex_unlock(clean_progress_mutex))
699  end if
700  end subroutine clean_diagnostic_states
701 
705  subroutine deallocate_diagnostics_at_timestep(key)
706  character(len=*), intent(in) :: key
707 
708  type(all_diagnostics_at_timestep_type), pointer :: all_diagnostics_at_ts
709  type(diagnostics_at_timestep_type), pointer :: specific_monc_timestep_entry
710  type(data_values_type), pointer :: field_data_value
711  integer :: cfentries, j
712  type(iterator_type) :: iterator, completed_fields_iterator
713  class(*), pointer :: generic
714 
715  all_diagnostics_at_ts=>get_diagnostic_by_key(key)
716  if (associated(all_diagnostics_at_ts)) then
717  iterator=c_get_iterator(all_diagnostics_at_ts%diagnostic_entries)
718  do while (c_has_next(iterator))
719  specific_monc_timestep_entry=>retrieve_next_specific_monc_timestep_entry(iterator)
720  call check_thread_status(forthread_rwlock_destroy(specific_monc_timestep_entry%completed_fields_rwlock))
721  call check_thread_status(forthread_rwlock_destroy(specific_monc_timestep_entry%outstanding_fields_rwlock))
722  call check_thread_status(forthread_mutex_destroy(specific_monc_timestep_entry%activity_completion_mutex))
723  call check_thread_status(forthread_mutex_destroy(specific_monc_timestep_entry%deletion_metric_mutex))
724  call c_free(specific_monc_timestep_entry%outstanding_fields)
725  call c_free(specific_monc_timestep_entry%completed_activities)
726  completed_fields_iterator=c_get_iterator(specific_monc_timestep_entry%completed_fields)
727  do while (c_has_next(completed_fields_iterator))
728  field_data_value=>get_data_value_from_map_entry(c_next_mapentry(completed_fields_iterator))
729  if (allocated(field_data_value%values)) deallocate(field_data_value%values)
730  deallocate(field_data_value)
731  end do
732  call c_free(specific_monc_timestep_entry%completed_fields)
733  end do
734  call c_free(all_diagnostics_at_ts%completed_diagnostics)
735  call c_free(all_diagnostics_at_ts%communication_corresponding_activities)
736  iterator=c_get_iterator(all_diagnostics_at_ts%diagnostic_entries)
737  do while (c_has_next(iterator))
738  generic=>c_next_generic(iterator)
739  if (associated(generic)) deallocate(generic)
740  end do
741  call c_free(all_diagnostics_at_ts%diagnostic_entries)
742  call check_thread_status(forthread_rwlock_destroy(all_diagnostics_at_ts%communication_corresponding_activities_rwlock))
743  call check_thread_status(forthread_rwlock_destroy(all_diagnostics_at_ts%completed_diagnostics_rwlock))
744  call check_thread_status(forthread_mutex_destroy(all_diagnostics_at_ts%completed_num_mutex))
745  end if
747 
755  function get_value_from_monc_data(io_configuration, source, data_id, data_dump, field_name)
756  type(io_configuration_type), intent(inout) :: io_configuration
757  integer, intent(in) :: source, data_id
758  character, dimension(:), allocatable, intent(in) :: data_dump
759  character(len=*), intent(in) :: field_name
760  type(data_values_type), pointer :: get_value_from_monc_data
761 
762  integer :: field_data_type, i
763  integer, dimension(:), allocatable :: int_values
764 
765  allocate(get_value_from_monc_data)
766  field_data_type=get_datatype_of_field(io_configuration%data_definitions(data_id)%fields, field_name)
767  if (field_data_type == 0) then
768  call log_log(log_error, "No data type for field '"//trim(field_name)//"'")
769  end if
770  get_value_from_monc_data%dimensions=get_number_field_dimensions(io_configuration, field_name, source, data_id)
771  if (field_data_type == double_data_type) then
772  get_value_from_monc_data%values=get_array_double_from_monc(io_configuration, source, data_id, data_dump, field_name)
773  else if (field_data_type == integer_data_type) then
774  int_values=get_array_integer_from_monc(io_configuration, source, data_id, data_dump, field_name)
775  allocate(get_value_from_monc_data%values(size(int_values)))
776  do i=1, size(int_values)
777  get_value_from_monc_data%values(i)=conv_to_real(int_values(i))
778  end do
779  deallocate(int_values)
780  end if
781  end function get_value_from_monc_data
782 
787  integer function get_datatype_of_field(fields, field_name)
788  type(io_configuration_field_type), dimension(:), intent(in) :: fields
789  character(len=*), intent(in) :: field_name
790 
791  integer :: i
792 
793  do i=1, size(fields)
794  if (fields(i)%name .eq. field_name) then
795  get_datatype_of_field=fields(i)%data_type
796  return
797  end if
798  end do
800  end function get_datatype_of_field
801 
807  function find_or_register_timestep_entry(io_configuration, timestep, source, time)
808  type(io_configuration_type), intent(inout) :: io_configuration
809  integer, intent(in) :: timestep, source
810  real(kind=DEFAULT_PRECISION), intent(in) :: time
811  type(diagnostics_at_timestep_type), pointer :: find_or_register_timestep_entry
812 
813  class(*), pointer :: generic
814  type(all_diagnostics_at_timestep_type), pointer :: all_diags_by_timestep
815 
816  find_or_register_timestep_entry=>get_timestep_entry(timestep, source, .true.)
817  if (.not. associated(find_or_register_timestep_entry)) then
818  call check_thread_status(forthread_rwlock_wrlock(timestep_entries_rwlock))
819  find_or_register_timestep_entry=>get_timestep_entry(timestep, source, .false.)
820  if (.not. associated(find_or_register_timestep_entry)) then
821  find_or_register_timestep_entry=>create_timestep_entry(io_configuration, timestep, time, source)
822  generic=>find_or_register_timestep_entry
823  call c_put_generic(diagnostics_per_monc_at_timestep, conv_to_string(timestep)//"#"//conv_to_string(source), &
824  generic, .false.)
825  all_diags_by_timestep=>find_or_add_diagnostics_by_timestep(timestep)
826  call check_thread_status(forthread_rwlock_wrlock(all_diagnostics_per_timestep_rwlock))
827  call c_add_generic(all_diags_by_timestep%diagnostic_entries, generic, .false.)
828  call check_thread_status(forthread_rwlock_unlock(all_diagnostics_per_timestep_rwlock))
829  end if
830  call check_thread_status(forthread_rwlock_unlock(timestep_entries_rwlock))
831  end if
832  end function find_or_register_timestep_entry
833 
837  function create_timestep_entry(io_configuration, timestep, time, source)
838  type(io_configuration_type), intent(inout) :: io_configuration
839  integer, intent(in) :: timestep, source
840  real(kind=DEFAULT_PRECISION), intent(in) :: time
841  type(diagnostics_at_timestep_type), pointer :: create_timestep_entry
842 
843  type(iterator_type) :: iterator
844  integer :: i, matched_datadefn_index
845 
846  allocate(create_timestep_entry)
847  create_timestep_entry%timestep=timestep
848  create_timestep_entry%time=time
849  create_timestep_entry%number_diags_outstanding=c_size(available_fields)
850  create_timestep_entry%source=source
851  create_timestep_entry%source_location=get_monc_location(io_configuration, source)
852  create_timestep_entry%number_datas_outstanding=0
853  do i=1, size(io_configuration%registered_moncs(create_timestep_entry%source_location)%definition_names)
854  matched_datadefn_index=retrieve_data_definition(io_configuration, &
855  io_configuration%registered_moncs(create_timestep_entry%source_location)%definition_names(i))
856  if (matched_datadefn_index .gt. 0) then
857  if (io_configuration%data_definitions(matched_datadefn_index)%frequency .gt. 0) then
858  if (mod(timestep, io_configuration%data_definitions(matched_datadefn_index)%frequency) == 0) then
859  create_timestep_entry%number_datas_outstanding=create_timestep_entry%number_datas_outstanding+1
860  end if
861  end if
862  else
863  call log_log(log_warn, "IO server can not find data definition with name "&
864  //io_configuration%registered_moncs(create_timestep_entry%source_location)%definition_names(i))
865  end if
866  end do
867  iterator=c_get_iterator(all_outstanding_fields)
868  do while (c_has_next(iterator))
869  call c_add_string(create_timestep_entry%outstanding_fields, c_next_string(iterator))
870  end do
871  call check_thread_status(forthread_mutex_init(create_timestep_entry%activity_completion_mutex, -1))
872  call check_thread_status(forthread_mutex_init(create_timestep_entry%deletion_metric_mutex, -1))
873  call check_thread_status(forthread_rwlock_init(create_timestep_entry%completed_fields_rwlock, -1))
874  call check_thread_status(forthread_rwlock_init(create_timestep_entry%outstanding_fields_rwlock, -1))
875  end function create_timestep_entry
876 
879  subroutine add_required_fields_if_needed(required_fields)
880  type(list_type), intent(inout) :: required_fields
881 
882  type(iterator_type) :: iterator
883 
884  if (.not. c_is_empty(required_fields)) then
885  iterator=c_get_iterator(required_fields)
886  do while (c_has_next(iterator))
887  call c_add_string(all_outstanding_fields, c_next_string(iterator))
888  end do
889  end if
890  end subroutine add_required_fields_if_needed
891 
895  function retrieve_next_activity(iterator)
896  type(iterator_type), intent(inout) :: iterator
897  type(diagnostics_activity_type), pointer :: retrieve_next_activity
898 
899  class(*), pointer :: generic
900 
901  generic=>c_next_generic(iterator)
902 
903  if (associated(generic)) then
904  select type(generic)
906  retrieve_next_activity=>generic
907  end select
908  else
909  retrieve_next_activity=>null()
910  end if
911  end function retrieve_next_activity
912 
918  function get_timestep_entry(timestep, source, do_lock)
919  integer, intent(in) :: timestep, source
920  logical, intent(in) :: do_lock
921  type(diagnostics_at_timestep_type), pointer :: get_timestep_entry
922 
923  class(*), pointer :: generic
924 
925  if (do_lock) call check_thread_status(forthread_rwlock_rdlock(timestep_entries_rwlock))
926  generic=>c_get_generic(diagnostics_per_monc_at_timestep, conv_to_string(timestep)//"#"//conv_to_string(source))
927  if (do_lock) call check_thread_status(forthread_rwlock_unlock(timestep_entries_rwlock))
928  if (associated(generic)) then
929  select type(generic)
931  get_timestep_entry=>generic
932  end select
933  else
934  get_timestep_entry=>null()
935  end if
936  end function get_timestep_entry
937 
942  function find_or_add_diagnostics_by_timestep(timestep)
943  integer, intent(in) :: timestep
944  type(all_diagnostics_at_timestep_type), pointer :: find_or_add_diagnostics_by_timestep
945 
946  class(*), pointer :: generic
947 
948  find_or_add_diagnostics_by_timestep=>get_diagnostics_by_timestep(timestep, .true.)
949  if (.not. associated(find_or_add_diagnostics_by_timestep)) then
950  call check_thread_status(forthread_rwlock_wrlock(all_diagnostics_per_timestep_rwlock))
951  find_or_add_diagnostics_by_timestep=>get_diagnostics_by_timestep(timestep, .false.)
952  if (.not. associated(find_or_add_diagnostics_by_timestep)) then
953  allocate(find_or_add_diagnostics_by_timestep)
954  call check_thread_status(forthread_rwlock_init(&
955  find_or_add_diagnostics_by_timestep%communication_corresponding_activities_rwlock, -1))
956  call check_thread_status(forthread_rwlock_init(find_or_add_diagnostics_by_timestep%completed_diagnostics_rwlock, -1))
957  call check_thread_status(forthread_mutex_init(find_or_add_diagnostics_by_timestep%completed_num_mutex, -1))
958  find_or_add_diagnostics_by_timestep%completed_num=0
959  generic=>find_or_add_diagnostics_by_timestep
960  call c_put_generic(all_diagnostics_at_timestep, conv_to_string(timestep), generic, .false.)
961  end if
962  call check_thread_status(forthread_rwlock_unlock(all_diagnostics_per_timestep_rwlock))
963  end if
965 
970  function get_diagnostics_by_timestep(timestep, do_lock)
971  integer, intent(in) :: timestep
972  logical, intent(in) :: do_lock
973  type(all_diagnostics_at_timestep_type), pointer :: get_diagnostics_by_timestep
974 
975  class(*), pointer :: generic
976 
977  if (do_lock) call check_thread_status(forthread_rwlock_rdlock(all_diagnostics_per_timestep_rwlock))
978  generic=>c_get_generic(all_diagnostics_at_timestep, conv_to_string(timestep))
979  if (do_lock) call check_thread_status(forthread_rwlock_unlock(all_diagnostics_per_timestep_rwlock))
980  if (associated(generic)) then
981  select type(generic)
983  get_diagnostics_by_timestep=>generic
984  end select
985  else
986  get_diagnostics_by_timestep=>null()
987  end if
988  end function get_diagnostics_by_timestep
989 
994  type(iterator_type), intent(inout) :: iterator
995  type(diagnostics_at_timestep_type), pointer :: retrieve_next_specific_monc_timestep_entry
996 
997  class(*), pointer :: generic
998 
999  generic=>c_next_generic(iterator)
1000 
1001  if (associated(generic)) then
1002  select type(generic)
1004  retrieve_next_specific_monc_timestep_entry=>generic
1005  end select
1006  else
1007  retrieve_next_specific_monc_timestep_entry=>null()
1008  end if
1010 
1014  function get_diagnostic_by_key(key)
1015  character(len=*), intent(in) :: key
1016 
1017  type(all_diagnostics_at_timestep_type), pointer :: get_diagnostic_by_key
1018 
1019  class(*), pointer :: generic
1020 
1021  generic=>c_get_generic(all_diagnostics_at_timestep, key)
1022  if (associated(generic)) then
1023  select type(generic)
1025  get_diagnostic_by_key=>generic
1026  end select
1027  else
1028  get_diagnostic_by_key=>null()
1029  end if
1030  end function get_diagnostic_by_key
1031 
1035  function retrieve_diagnostics(mapentry)
1036  type(mapentry_type), intent(in) :: mapentry
1037  type(all_diagnostics_at_timestep_type), pointer :: retrieve_diagnostics
1038 
1039  class(*), pointer :: generic
1040 
1041  generic=>c_get_generic(mapentry)
1042 
1043  if (associated(generic)) then
1044  select type(generic)
1046  retrieve_diagnostics=>generic
1047  end select
1048  else
1049  retrieve_diagnostics=>null()
1050  end if
1051  end function retrieve_diagnostics
1052 
1057  function get_comm_activity_from_fieldname(diagnostics_by_timestep, field_name)
1058  type(all_diagnostics_at_timestep_type), intent(inout) :: diagnostics_by_timestep
1059  character(len=*), intent(in) :: field_name
1060  type(diagnostics_activity_type), pointer :: get_comm_activity_from_fieldname
1061 
1062  class(*), pointer :: generic
1063 
1064  call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
1065  generic=>c_get_generic(diagnostics_by_timestep%communication_corresponding_activities, field_name)
1066  call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
1067  if (associated(generic)) then
1068  select type(generic)
1069  type is(diagnostics_activity_type)
1070  get_comm_activity_from_fieldname=>generic
1071  end select
1072  else
1073  get_comm_activity_from_fieldname=>null()
1074  end if
1075  end function get_comm_activity_from_fieldname
1076 
1081  function get_misc_action_at_index(action_members, index)
1082  type(list_type), intent(inout) :: action_members
1083  integer, intent(in) :: index
1084  type(io_configuration_misc_item_type), pointer :: get_misc_action_at_index
1085 
1086  class(*), pointer :: generic
1087 
1088  generic=>c_get_generic(action_members, index)
1089  if (associated(generic)) then
1090  select type(generic)
1091  type is(io_configuration_misc_item_type)
1092  get_misc_action_at_index=>generic
1093  end select
1094  else
1095  get_misc_action_at_index=>null()
1096  end if
1097  end function get_misc_action_at_index
1098 
1103  subroutine define_diagnostics(io_configuration, diagnostic_generation_frequency)
1104  type(io_configuration_type), intent(inout) :: io_configuration
1105  type(hashmap_type), intent(out) :: diagnostic_generation_frequency
1106 
1107  integer :: i, j, entries, action_entities, activity_freq
1108  type(io_configuration_misc_item_type), pointer :: misc_action
1109  type(diagnostics_activity_type), pointer :: item
1110  character(len=STRING_LENGTH) :: activity_name
1111 
1112  class(*), pointer :: generic
1113 
1114  entries=io_configuration%number_of_diagnostics
1115  if (entries .gt. 0) then
1116  allocate(diagnostic_definitions(entries))
1117 
1118  do i=1, entries
1119  diagnostic_definitions(i)%uuid=conv_to_string(i)
1120  diagnostic_definitions(i)%generation_timestep_frequency=0
1121  diagnostic_definitions(i)%diagnostic_name=io_configuration%diagnostics(i)%name
1122  diagnostic_definitions(i)%diagnostic_namespace=io_configuration%diagnostics(i)%namespace
1123  diagnostic_definitions(i)%collective=io_configuration%diagnostics(i)%collective
1124  action_entities=c_size(io_configuration%diagnostics(i)%members)
1125  if (action_entities .gt. 0) then
1126  do j=1, action_entities
1127  misc_action=>get_misc_action_at_index(io_configuration%diagnostics(i)%members, j)
1128  allocate(item)
1129  item%uuid=conv_to_string(j)
1130  item%result_name=c_get_string(misc_action%embellishments, "result")
1131  if (c_contains(misc_action%embellishments, "root")) then
1132  if (get_action_attribute_string(misc_action%embellishments, "root") .eq. "auto") then
1133  item%root=mod(i, io_configuration%number_of_io_servers)
1134  else
1135  item%root=get_action_attribute_integer(misc_action%embellishments, "root")
1136  end if
1137  else
1138  item%root=-1
1139  end if
1140  if (misc_action%type .eq. "operator") then
1141  activity_name=c_get_string(misc_action%embellishments, "name")
1142  item%activity_name=activity_name
1143  item%required_fields=get_operator_required_fields(activity_name, misc_action%embellishments)
1144  item%activity_attributes=misc_action%embellishments
1145  item%operator_procedure=>get_operator_perform_procedure(activity_name)
1146  item%activity_type=operator_type
1147  else if (misc_action%type .eq. "communication") then
1148  if (item%root .lt. 0) call log_log(log_error, "Root must be supplied and 0 or greater for communication actions")
1149  activity_name=c_get_string(misc_action%embellishments, "name")
1150  if (activity_name .eq. "reduction" .or. activity_name .eq. "allreduction") then
1151  call c_add_string(item%required_fields, c_get_string(misc_action%embellishments, "field"))
1152  item%activity_type=merge(reduction_type, allreduction_type, activity_name .eq. "reduction")
1153  item%communication_operator=get_reduction_operator(c_get_string(misc_action%embellishments, "operator"))
1154  else if (activity_name .eq. "broadcast") then
1155  call c_add_string(item%required_fields, c_get_string(misc_action%embellishments, "field"))
1156  item%activity_type=broadcast_type
1157  end if
1158  end if
1159  call add_required_fields_if_needed(item%required_fields)
1160  activity_freq=get_diagnostic_generation_frequency(io_configuration, item%required_fields)
1161  if (diagnostic_definitions(i)%generation_timestep_frequency .lt. activity_freq) then
1162  diagnostic_definitions(i)%generation_timestep_frequency=activity_freq
1163  end if
1164  generic=>item
1165  call c_add_generic(diagnostic_definitions(i)%activities, generic, .false.)
1166  end do
1167  end if
1168  call c_put_integer(diagnostic_generation_frequency, diagnostic_definitions(i)%diagnostic_name, &
1169  diagnostic_definitions(i)%generation_timestep_frequency)
1170  call process_auto_dimensions(io_configuration, io_configuration%diagnostics(i), i)
1171  end do
1172  end if
1173  end subroutine define_diagnostics
1174 
1179  function get_diagnostic_activity_by_result_name(result_name, diagnostic_entry_index)
1180  character(len=STRING_LENGTH), intent(inout) :: result_name
1181  integer, intent(in) :: diagnostic_entry_index
1182  type(diagnostics_activity_type), pointer :: get_diagnostic_activity_by_result_name
1183 
1184  type(iterator_type) :: iterator
1185 
1186  iterator=c_get_iterator(diagnostic_definitions(diagnostic_entry_index)%activities)
1187  do while (c_has_next(iterator))
1188  get_diagnostic_activity_by_result_name=>retrieve_next_activity(iterator)
1189  if (get_diagnostic_activity_by_result_name%result_name == result_name) then
1190  return
1191  end if
1192  end do
1193  get_diagnostic_activity_by_result_name=>null()
1195 
1200  subroutine process_auto_dimensions(io_configuration, diagnostic_configuration, entry_index)
1201  type(io_configuration_type), intent(inout) :: io_configuration
1202  type(io_configuration_diagnostic_field_type), intent(inout) :: diagnostic_configuration
1203  integer, intent(in) :: entry_index
1204 
1205  integer :: i, auto_index, diag_modified_dim_size
1206  character(len=STRING_LENGTH) :: specific_dimension
1207  type(diagnostics_activity_type), pointer :: diagnostic_activity
1208 
1209  diagnostic_activity=>get_diagnostic_activity_by_result_name(diagnostic_definitions(entry_index)%diagnostic_name, entry_index)
1210  if (associated(diagnostic_activity)) then
1211  if (diagnostic_activity%activity_type==operator_type) then
1212  do i=1, diagnostic_configuration%dimensions
1213  auto_index=index(diagnostic_configuration%dim_size_defns(i), "-auto")
1214  if (auto_index .ne. 0) then
1215  specific_dimension=diagnostic_configuration%dim_size_defns(i)(1:auto_index-1)
1216  diag_modified_dim_size=get_operator_auto_size(io_configuration, diagnostic_activity%activity_name, &
1217  specific_dimension, diagnostic_activity%activity_attributes)
1218  if (diag_modified_dim_size .ge. 0) then
1219  specific_dimension=trim(specific_dimension)//"_"//trim(conv_to_string(diag_modified_dim_size))
1220  diagnostic_configuration%dim_size_defns(i)=specific_dimension
1221  call c_put_integer(io_configuration%dimension_sizing, specific_dimension, diag_modified_dim_size)
1222  else
1223  diagnostic_configuration%dim_size_defns(i)=specific_dimension
1224  end if
1225  end if
1226  end do
1227  end if
1228  end if
1229  end subroutine process_auto_dimensions
1230 
1235  integer function get_diagnostic_generation_frequency(io_configuration, required_fields)
1236  type(io_configuration_type), intent(inout) :: io_configuration
1237  type(list_type), intent(inout) :: required_fields
1238 
1239  integer :: field_freq
1240  type(iterator_type) :: iterator
1241 
1243  iterator=c_get_iterator(required_fields)
1244  do while (c_has_next(iterator))
1245  field_freq=get_field_frequency(io_configuration, c_next_string(iterator))
1247  end do
1249 
1254  integer function get_field_frequency(io_configuration, field_name)
1255  type(io_configuration_type), intent(inout) :: io_configuration
1256  character(len=*), intent(in) :: field_name
1257 
1258  integer :: i, j
1259  do i=1, io_configuration%number_of_data_definitions
1260  do j=1, io_configuration%data_definitions(i)%number_of_data_fields
1261  if (io_configuration%data_definitions(i)%fields(j)%name == field_name) then
1262  get_field_frequency=io_configuration%data_definitions(i)%frequency
1263  return
1264  end if
1265  end do
1266  end do
1268  end function get_field_frequency
1269 
1274  character(len=STRING_LENGTH) function generate_activity_diagnostic_key(diagnostic, activity)
1275  type(diagnostics_type), intent(in) :: diagnostic
1276  type(diagnostics_activity_type), intent(in) :: activity
1277 
1278  generate_activity_diagnostic_key=trim(diagnostic%uuid)//"#"//trim(activity%uuid)
1280 end module diagnostic_federator_mod
logical function, public check_broadcast_inter_io_for_completion(io_configuration)
Checks the statuses for broadcast completion and returns whether they are all finished or not...
type(hashmap_type), volatile all_diagnostics_at_timestep
integer function, dimension(:), allocatable, public get_array_integer_from_monc(io_configuration, source, data_id, data_dump, key)
Retreives an array of integers with a corresponding key from the raw data dump. The size depends on t...
Definition: datautils.F90:529
type(all_diagnostics_at_timestep_type) function, pointer find_or_add_diagnostics_by_timestep(timestep)
Finds or adds diagnostics by timestep. This is used to maintain a list of all diagnostic entries for ...
type(hashmap_type) function, public determine_diagnostics_fields_available(monc_field_names)
Determines the diagnostics fields that are available based upon the input MONC fields on registration...
integer, volatile previous_clean_point
logical function, public check_allreduction_inter_io_for_completion(io_configuration)
Determines whether this all reduction inter IO functionality has completed or not.
integer function forthread_rwlock_init(rwlock_id, attr_id)
Definition: forthread.F90:504
All reduction, which does a reduce and then broadcasts the data to all IO servers.
Gets a specific generic element out of the list, stack, queue or map with the corresponding key...
Returns whether a collection is empty.
Puts an integer key-value pair into the map.
integer, parameter operator_type
integer function forthread_mutex_unlock(mutex_id)
Definition: forthread.F90:302
subroutine, public init_reduction_inter_io(io_configuration)
Initialises the reduction action.
subroutine handle_completion(io_configuration, values, field_name, timestep)
Handles inter io reduction completion, it adds the resulting value to the appropriate completion list...
type(diagnostics_at_timestep_type) function, pointer get_timestep_entry(timestep, source, do_lock)
Retrieves the timestep at a specific timestep and source MONC.
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
subroutine handle_completion_for_specific_monc_timestep_entry(io_configuration, result_to_add, field_name, timestep_entry, diagnostics_by_timestep)
This handles inter IO completion for a specific timestep entry. This is required as at a timestep the...
type(diagnostics_at_timestep_type) function, pointer retrieve_next_specific_monc_timestep_entry(iterator)
Retrieves the next MONC timestep entry from the all diagnostics based upon a collections iterator...
integer function forthread_mutex_destroy(mutex_id)
Definition: forthread.F90:265
subroutine define_diagnostics(io_configuration, diagnostic_generation_frequency)
Based upon the IO configuration this will define the diagnostics structure. It is done once at initia...
type(list_type) function, public get_operator_required_fields(operator_name, action_attributes)
Retrieves the list of fields required by an operator before it can run.
Definition: operator.F90:77
integer function get_datatype_of_field(fields, field_name)
Retrieves the data type of a field or 0 if the field was not found.
integer, volatile clean_progress_mutex
subroutine, public perform_inter_io_reduction(io_configuration, field_values, field_size, reduction_field_name, reduction_op, root, timestep, completion_procedure)
Actually handles the processing for this data wrt the vertical reduction.
Contains functionality for managing and extracting data from the raw data dumps that the IO server re...
Definition: datautils.F90:3
logical function, public check_reduction_inter_io_for_completion(io_configuration)
Checks this action for completion, when all are completed then the IO server can shutdown as this is ...
Logging utility.
Definition: logging.F90:2
subroutine check_all_activities_against_completed_fields(io_configuration, timestep_entry, diagnostics_by_timestep)
Checks all pending activities against the completed fields and runs them if the required fields are n...
type(diagnostics_activity_type) function, pointer get_diagnostic_activity_by_result_name(result_name, diagnostic_entry_index)
Retrives a diagnostic activity based upon its result name or null if none is found.
subroutine perform_inter_io_communication(io_configuration, timestep_entry, all_entries_at_timestep, activity, value_to_send, communication_field_name)
Performs the actual inter IO communication by calling out to the appropriate inter IO module...
integer function get_diagnostic_generation_frequency(io_configuration, required_fields)
Retrieves the max diagnostic generation frequency for a set of fields.
logical function, public get_scalar_logical_from_monc(io_configuration, source, data_id, data_dump, key)
Retrieves a single logical element (scalar) from the data dump.
Definition: datautils.F90:328
logical function are_fields_available_for_activity(timestep_entry, activity)
Determines whether the fields required for an activity are available so that activity can be run...
integer, parameter broadcast_type
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
Definition: datadefn.F90:17
subroutine, public init_global_callback_inter_io(io_configuration)
Initialises the global callback.
The writer field manager will manage aspects of the fields being provided to the writer federator...
logical function handle_operator_completion(io_configuration, timestep_entry, specific_activity)
Handles the completion of the operator.
integer function, public get_action_attribute_integer(action_attributes, field_name)
Retrieves the name of a field from the attributes specified in the configuration. ...
Definition: datautils.F90:114
integer function forthread_rwlock_rdlock(lock_id)
Definition: forthread.F90:514
integer function get_field_frequency(io_configuration, field_name)
Retrieves the generation frequency for a specific field.
subroutine process_auto_dimensions(io_configuration, diagnostic_configuration, entry_index)
Processes all auto dimensions by looking them up and resolving them based upon the operators...
type(io_configuration_misc_item_type) function, pointer get_misc_action_at_index(action_members, index)
Retrieves a misc action from the parsed user XML configuration at a specific index.
Contains common definitions for the data and datatypes used by MONC.
Definition: datadefn.F90:2
Adds an integer element to the end of the list.
logical function, public check_diagnostic_federator_for_completion(io_configuration)
Checks whether the diagnostics federator has completed or not, this is really checking all the underl...
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
Definition: collections.F90:94
subroutine, public finalise_broadcast_inter_io()
Finalises the broadcast inter IO functionality.
integer function forthread_mutex_init(mutex_id, attr_id)
Definition: forthread.F90:274
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
integer, parameter, public double_data_type
Definition: ioclient.F90:40
type(diagnostics_activity_type) function, pointer get_comm_activity_from_fieldname(diagnostics_by_timestep, field_name)
Retrieves a communication activity from its field name.
Converts data types to strings.
Definition: conversions.F90:36
integer function forthread_rwlock_wrlock(lock_id)
Definition: forthread.F90:532
Operator federator which manages the different operators which are available. Operators take in any n...
Definition: operator.F90:3
subroutine, public finalise_allreduction_inter_io(io_configuration)
Finalises the all reduction inter IO functionality.
integer function forthread_mutex_trylock(mutex_id)
Definition: forthread.F90:293
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
type(diagnostics_activity_type) function, pointer retrieve_next_activity(iterator)
Retrieves the next activity in a collection being iterated over by an iterator.
real(kind=double_precision) function, dimension(:), allocatable, public get_array_double_from_monc(io_configuration, source, data_id, data_dump, key)
Retreives an array of doubles with a corresponding key from the raw data dump. The size depends on th...
Definition: datautils.F90:478
This defines some constants and procedures that are useful to the IO server and clients that call it...
Definition: ioclient.F90:3
integer, parameter, public integer_data_type
Definition: ioclient.F90:40
Broadcast inter IO communication which sends a value from one IO server to all others. This tracks field name and timestep and only issues one call (and one results call to completion) for that combination.
character(len=string_length) function generate_activity_diagnostic_key(diagnostic, activity)
Generates a unique key for an activity within a diagnostic, which is unique amongst all diagnostics a...
logical function, public is_field_present(io_configuration, source, data_id, key)
Definition: datautils.F90:146
Map data structure that holds string (length 20 maximum) key value pairs.
Definition: collections.F90:86
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Definition: threadpool.F90:5
Global callback inter IO, which registers the callback with identifiers and then the procedure is act...
integer, parameter reduction_type
Returns the number of elements in the collection.
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
Definition: threadpool.F90:229
integer, parameter allreduction_type
integer function forthread_mutex_lock(mutex_id)
Definition: forthread.F90:284
subroutine, public perform_inter_io_broadcast(io_configuration, field_values, field_size, field_name, root, timestep, completion_procedure)
Performs an inter IO broadcast of data from the root to all other IO servers. Note that this is on th...
subroutine, public finalise_operators()
Finalises any operators that require finalisation.
Definition: operator.F90:48
subroutine clean_diagnostic_states(current_timestep)
Cleans the diagnostic states if required (based on the timestep period)
Collection data structures.
Definition: collections.F90:7
subroutine handle_diagnostic_calculation_completed(io_configuration, diagnostic_index, timestep_entry, diagnostics_by_timestep)
Handles completion of a diagnostic calculation and will then pass this onto interested parties...
integer function, public get_monc_location(io_configuration, source)
A helper function to get the location of a MONC's configuration in the IO data structure.
subroutine, public init_broadcast_inter_io(io_configuration)
Initialises the broadcast inter IO functionality.
procedure(perform_activity) function, pointer, public get_operator_perform_procedure(operator_name)
Retrieves the operator execution procedure of an operator with a specific name.
Definition: operator.F90:55
type(all_diagnostics_at_timestep_type) function, pointer retrieve_diagnostics(mapentry)
Retrieves the all diagnostics at a specific timestep from its map entry.
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Definition: logging.F90:12
type(hashmap_type), volatile diagnostics_per_monc_at_timestep
type(hashset_type), volatile all_outstanding_fields
Converts data types to real.
Definition: conversions.F90:58
integer, parameter perform_clean_every
A wrapper type containing all the diagnostics for MONC source processes at a specific timestep...
integer, parameter, public string_length
Default length of strings.
Definition: datadefn.F90:10
logical function, public get_action_attribute_logical(action_attributes, field_name)
Retrieves a logical value from the attribute which corresponds to a specific key. ...
Definition: datautils.F90:128
subroutine check_diagnostics_entries_against_data(io_configuration, source, data_id, data_dump, timestep_diagnostics_entry)
Checks the outstanding fields of a time step entry against the data recieved from MONC and moves any ...
character(len=string_length) function, public get_action_attribute_string(action_attributes, field_name)
Retrieves the name of a field from the attributes specified in the configuration. ...
Definition: datautils.F90:101
integer function, public get_number_field_dimensions(io_configuration, field_name, source, data_id)
Retrieves the number of field dimensions that a specific field has from a MONC process within a data ...
List data structure which implements a doubly linked list. This list will preserve its order...
Definition: collections.F90:60
subroutine, public init_allreduction_inter_io(io_configuration)
Initialises the all reduction inter IO functionality.
type(diagnostics_type), dimension(:), allocatable, volatile diagnostic_definitions
Adds a generic element to the end of the list.
integer function, public retrieve_data_definition(io_configuration, key)
Retrieves a specific data definition from the configuration which matches a key.
subroutine, public finalise_global_callback_inter_io(io_configuration)
Finalises the global callback.
integer function forthread_rwlock_destroy(rwlock_id)
Definition: forthread.F90:495
subroutine add_required_fields_if_needed(required_fields)
Adds the required fields of an activity to the overall required fields which are cloned for each new ...
integer, volatile previous_viewed_timestep
subroutine deallocate_diagnostics_at_timestep(key)
Deallocates all the diagnostics at a specific timestep, this removes all the individual MONC timestep...
subroutine, public finalise_reduction_inter_io(io_configuration)
Finalises the reduction action, waiting for all outstanding requests and then freeing data...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
subroutine, public pass_fields_to_diagnostics_federator(io_configuration, source, data_id, data_dump)
Entry point into the diagnostics federator this runs the diagnostics, executing the defined rules bas...
integer function forthread_rwlock_unlock(lock_id)
Definition: forthread.F90:550
type(data_values_type) function, pointer, public get_data_value_from_map_entry(map_entry)
Retrieves the data value (wrapper) by field name or null if no entry was found in the provided map en...
Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as...
type(hashmap_type) function, public initialise_diagnostic_federator(io_configuration)
Initialises the diagnostics action and sets up the diagnostics master definitions.
Puts a generic key-value pair into the map.
integer function, public get_scalar_integer_from_monc(io_configuration, source, data_id, data_dump, key)
Retrieves a single integer element (scalar) from the data dump.
Definition: datautils.F90:372
real(kind=double_precision) function, public get_scalar_real_from_monc(io_configuration, source, data_id, data_dump, key)
Retreives a scalar real with a corresponding key from the raw data dump.
Definition: datautils.F90:416
type(diagnostics_at_timestep_type) function, pointer find_or_register_timestep_entry(io_configuration, timestep, source, time)
Locates or registers a new (if it does not exist) time step entry based upon the timestep and source ...
integer function, public get_reduction_operator(op_string)
Given the map of action attributes this procedure will identify the reduction operator that has been ...
integer, volatile all_diagnostics_per_timestep_rwlock
type(data_values_type) function, pointer get_value_from_monc_data(io_configuration, source, data_id, data_dump, field_name)
Retrieves a value from the communicated MONC data. If this was an integer then converts to a real...
Retrieves the generic value held at the specific map index or null if index > map elements...
type(all_diagnostics_at_timestep_type) function, pointer get_diagnostics_by_timestep(timestep, do_lock)
Retrieves the diagnostics list (each MONC source) at a specific timestep.
Hashset structure which will store unique strings. The hashing aspect means that lookup is very fast ...
Configuration associated with the representation of a specific data field.
Determines whether or not a map contains a specific key.
subroutine, public perform_inter_io_allreduction(io_configuration, field_values, field_size, field_name, reduction_op, root, timestep, completion_procedure)
Performs the all reduction inter IO reduction.
Adds a string to the end of the list.
subroutine, public finalise_diagnostic_federator(io_configuration)
Finalises the diagnostics federator, waiting for all outstanding requests and then freeing data...
type(hashset_type), volatile available_fields
integer, volatile timestep_entries_rwlock
type(all_diagnostics_at_timestep_type) function, pointer get_diagnostic_by_key(key)
Retrieves all diagnostics at a timestep by its key.
integer function, public get_operator_auto_size(io_configuration, operator_name, auto_dimension, action_attributes)
Definition: operator.F90:96
Gets a specific string element out of the list, stack, queue or map with the corresponding key...
Parses the XML configuration file to produce the io configuration description which contains the data...
This diagnostics federator will take in data fields sent from a MONC, perform operators on these as r...
subroutine issue_communication_calls(io_configuration, timestep_entry, diagnostics_by_timestep, source, data_id, data_dump)
Issues any inter io communucation calls that are appropriate based upon the data recieved from MONC...
subroutine, public initialise_operators()
Initialises any operators that require initialisation.
Definition: operator.F90:43
Removes a specific element from the list or map.
type(diagnostics_at_timestep_type) function, pointer create_timestep_entry(io_configuration, timestep, time, source)
Creates a timestep entry and processes all members, determining activities their required fields etc...