43 integer :: communication_corresponding_activities_rwlock, completed_diagnostics_rwlock, completed_num, completed_num_mutex
51 integer :: timestep, completed_fields_rwlock, outstanding_fields_rwlock, activity_completion_mutex, source, &
52 source_location, number_diags_outstanding, number_datas_outstanding, deletion_metric_mutex
53 real(kind=DEFAULT_PRECISION) :: time
60 character(len=STRING_LENGTH) :: diagnostic_name, diagnostic_namespace, uuid
62 integer :: generation_timestep_frequency
68 integer :: activity_type, communication_operator, root
69 real(kind=DEFAULT_PRECISION) :: result
72 character(len=STRING_LENGTH) :: result_name, activity_name, uuid
111 type(io_configuration_type),
intent(inout) :: io_configuration
125 type(io_configuration_type),
intent(inout) :: io_configuration
127 call finalise_broadcast_inter_io()
128 call finalise_reduction_inter_io(io_configuration)
129 call finalise_allreduction_inter_io(io_configuration)
130 call finalise_global_callback_inter_io(io_configuration)
134 call finalise_operators()
141 type(hashmap_type) function determine_diagnostics_fields_available(monc_field_names)
142 type(hashset_type),
intent(inout) :: monc_field_names
144 integer :: i, k, num_fields, diag_root
146 type(hashset_type) :: result_names_for_activities
147 type(iterator_type) :: required_fields_iterator, activities_iterator
148 character(len=STRING_LENGTH) :: specific_field_name
149 logical :: diagnostic_provided
155 diagnostic_provided=.true.
157 do while (c_has_next(activities_iterator))
159 call c_add_string(result_names_for_activities, specific_activity%result_name)
163 do while (c_has_next(activities_iterator))
165 if (specific_activity%root .ne. -1 .and. diag_root == -1) diag_root=specific_activity%root
166 required_fields_iterator=c_get_iterator(specific_activity%required_fields)
167 do while (c_has_next(required_fields_iterator))
168 specific_field_name=c_next_string(required_fields_iterator)
169 if (.not. c_contains(result_names_for_activities, specific_field_name) .and. &
170 .not. c_contains(monc_field_names, specific_field_name))
then 171 diagnostic_provided=.false.
175 if (.not. diagnostic_provided)
exit 177 if (diagnostic_provided)
then 181 call c_free(result_names_for_activities)
192 type(io_configuration_type),
intent(inout) :: io_configuration
193 integer,
intent(in) :: source, data_id
194 character,
dimension(:),
allocatable,
intent(in) :: data_dump
197 real(kind=DEFAULT_PRECISION) :: time
198 logical :: terminated
205 if (is_field_present(io_configuration, source, data_id,
"timestep") .and. &
206 is_field_present(io_configuration, source, data_id,
"time"))
then 207 timestep=get_scalar_integer_from_monc(io_configuration, source, data_id, data_dump,
"timestep")
208 time=get_scalar_real_from_monc(io_configuration, source, data_id, data_dump,
"time")
212 if (is_field_present(io_configuration, source, data_id,
"terminated"))
then 213 terminated=get_scalar_logical_from_monc(io_configuration, source, data_id, data_dump,
"terminated")
220 call check_thread_status(forthread_mutex_lock(timestep_entry%deletion_metric_mutex))
221 timestep_entry%number_datas_outstanding=timestep_entry%number_datas_outstanding-1
222 call check_thread_status(forthread_mutex_unlock(timestep_entry%deletion_metric_mutex))
224 call log_log(log_warn,
"Can not run the diagnostics federator without a timestep and time field in the MONC data")
232 type(io_configuration_type),
intent(inout) :: io_configuration
236 integer :: j, num_diags
238 character(len=STRING_LENGTH) :: field_name, activity_diag_key
239 logical :: updated_entry, entry_in_completed_diagnostics, operator_produced_values
240 type(data_values_type) :: value_to_send
241 type(iterator_type) :: activities_iterator
245 call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
246 do while (updated_entry)
247 updated_entry=.false.
250 call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
251 entry_in_completed_diagnostics=c_contains(diagnostics_by_timestep%completed_diagnostics, &
253 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
256 do while (c_has_next(activities_iterator))
259 if (.not. c_contains(timestep_entry%completed_activities, activity_diag_key))
then 261 call c_add_string(timestep_entry%completed_activities, activity_diag_key)
265 if (operator_produced_values .and. activity%result_name ==
diagnostic_definitions(j)%diagnostic_name)
then 270 field_name=c_get_string(activity%required_fields, 1)
271 call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
272 value_to_send=get_data_value_by_field_name(timestep_entry%completed_fields, field_name)
273 call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
274 call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
276 activity, value_to_send, field_name)
277 call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
285 call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
292 type(io_configuration_type),
intent(inout) :: io_configuration
296 type(data_values_type),
pointer :: operator_result
297 real(kind=DEFAULT_PRECISION),
dimension(:),
allocatable :: operator_result_values
298 class(*),
pointer :: generic
300 call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
301 call specific_activity%operator_procedure(io_configuration, timestep_entry%completed_fields, &
302 specific_activity%activity_attributes, timestep_entry%source_location, timestep_entry%source, operator_result_values)
303 call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
305 if (
allocated(operator_result_values))
then 306 allocate(operator_result)
307 allocate(operator_result%values(
size(operator_result_values)), source=operator_result_values)
308 operator_result%values=operator_result_values
309 deallocate(operator_result_values)
310 generic=>operator_result
311 call check_thread_status(forthread_rwlock_wrlock(timestep_entry%completed_fields_rwlock))
312 call c_put_generic(timestep_entry%completed_fields, specific_activity%result_name, generic, .false.)
313 call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
314 call check_thread_status(forthread_rwlock_wrlock(timestep_entry%outstanding_fields_rwlock))
315 call c_remove(timestep_entry%outstanding_fields, specific_activity%result_name)
316 call check_thread_status(forthread_rwlock_unlock(timestep_entry%outstanding_fields_rwlock))
330 type(io_configuration_type),
intent(inout) :: io_configuration
331 real(DEFAULT_PRECISION),
dimension(:) :: values
332 character(len=STRING_LENGTH) :: field_name
336 type(data_values_type),
pointer :: result_to_add
337 type(iterator_type) :: iterator
340 call check_thread_status(forthread_mutex_lock(diagnostics_by_timestep%completed_num_mutex))
341 diagnostics_by_timestep%completed_num=diagnostics_by_timestep%completed_num+1
342 call check_thread_status(forthread_mutex_unlock(diagnostics_by_timestep%completed_num_mutex))
343 if (.not. c_is_empty(diagnostics_by_timestep%diagnostic_entries))
then 344 iterator=c_get_iterator(diagnostics_by_timestep%diagnostic_entries)
345 do while (c_has_next(iterator))
346 allocate(result_to_add)
347 allocate(result_to_add%values(
size(values)), source=values)
352 call check_thread_status(forthread_rwlock_wrlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
353 call c_remove(diagnostics_by_timestep%communication_corresponding_activities, trim(field_name))
354 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
355 call check_thread_status(forthread_mutex_lock(diagnostics_by_timestep%completed_num_mutex))
356 diagnostics_by_timestep%completed_num=diagnostics_by_timestep%completed_num-1
357 call check_thread_status(forthread_mutex_unlock(diagnostics_by_timestep%completed_num_mutex))
368 field_name, timestep_entry, diagnostics_by_timestep)
369 type(io_configuration_type),
intent(inout) :: io_configuration
370 type(data_values_type),
pointer,
intent(in) :: result_to_add
371 character(len=STRING_LENGTH),
intent(in) :: field_name
375 logical :: entry_in_completed_diagnostics
377 class(*),
pointer :: generic
382 generic=>result_to_add
383 call check_thread_status(forthread_rwlock_wrlock(timestep_entry%completed_fields_rwlock))
384 call c_put_generic(timestep_entry%completed_fields, trim(activity%result_name), generic, .false.)
385 call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
387 call check_thread_status(forthread_rwlock_wrlock(timestep_entry%outstanding_fields_rwlock))
389 call c_remove(timestep_entry%outstanding_fields, trim(field_name))
390 call check_thread_status(forthread_rwlock_unlock(timestep_entry%outstanding_fields_rwlock))
397 call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
398 entry_in_completed_diagnostics=c_contains(diagnostics_by_timestep%completed_diagnostics, &
400 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
401 if (.not. entry_in_completed_diagnostics)
then 415 type(io_configuration_type),
intent(inout) :: io_configuration
416 integer,
intent(in) :: diagnostic_index
420 type(data_values_type),
pointer :: diagnostics_value_entry
421 type(iterator_type) :: iterator
424 call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
425 diagnostics_value_entry=>get_data_value_by_field_name(timestep_entry%completed_fields, &
427 call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
430 call provide_field_to_writer_federator(io_configuration,
diagnostic_definitions(diagnostic_index)%diagnostic_name, &
432 timestep_entry%timestep, timestep_entry%time, &
434 call check_thread_status(forthread_mutex_lock(timestep_entry%deletion_metric_mutex))
435 timestep_entry%number_diags_outstanding=timestep_entry%number_diags_outstanding-1
436 call check_thread_status(forthread_mutex_unlock(timestep_entry%deletion_metric_mutex))
437 if (
allocated(diagnostics_value_entry%values))
deallocate(diagnostics_value_entry%values)
439 call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
440 if (.not. c_contains(diagnostics_by_timestep%completed_diagnostics, &
442 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
443 call check_thread_status(forthread_rwlock_wrlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
444 if (.not. c_contains(diagnostics_by_timestep%completed_diagnostics,&
446 call c_add_string(diagnostics_by_timestep%completed_diagnostics, &
448 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
449 call provide_field_to_writer_federator(io_configuration,
diagnostic_definitions(diagnostic_index)%diagnostic_name, &
451 timestep_entry%timestep, timestep_entry%time, &
453 iterator=c_get_iterator(diagnostics_by_timestep%diagnostic_entries)
454 do while (c_has_next(iterator))
456 call check_thread_status(forthread_mutex_lock(activity_at_index%deletion_metric_mutex))
457 activity_at_index%number_diags_outstanding=activity_at_index%number_diags_outstanding-1
458 call check_thread_status(forthread_mutex_unlock(activity_at_index%deletion_metric_mutex))
460 if (
allocated(diagnostics_value_entry%values))
deallocate(diagnostics_value_entry%values)
462 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
465 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
478 character(len=STRING_LENGTH) :: field_name
479 type(iterator_type) :: iterator
482 if (.not. c_is_empty(activity%required_fields))
then 483 call check_thread_status(forthread_rwlock_rdlock(timestep_entry%completed_fields_rwlock))
484 iterator=c_get_iterator(activity%required_fields)
485 do while (c_has_next(iterator))
486 field_name=c_next_string(iterator)
487 if (.not. c_contains(timestep_entry%completed_fields, field_name))
then 492 call check_thread_status(forthread_rwlock_unlock(timestep_entry%completed_fields_rwlock))
503 activity, value_to_send, communication_field_name)
504 type(io_configuration_type),
intent(inout) :: io_configuration
508 type(data_values_type),
intent(in) :: value_to_send
509 character(len=STRING_LENGTH),
intent(in) :: communication_field_name
511 class(*),
pointer :: generic
514 call check_thread_status(forthread_rwlock_wrlock(all_entries_at_timestep%communication_corresponding_activities_rwlock))
515 call c_put_generic(all_entries_at_timestep%communication_corresponding_activities, trim(communication_field_name), &
517 call check_thread_status(forthread_rwlock_unlock(all_entries_at_timestep%communication_corresponding_activities_rwlock))
520 call perform_inter_io_reduction(io_configuration, value_to_send%values,
size(value_to_send%values), &
521 communication_field_name, activity%communication_operator, activity%root, timestep_entry%timestep,
handle_completion)
522 if (activity%root .ne. io_configuration%my_io_rank)
then 523 call check_thread_status(forthread_mutex_lock(timestep_entry%deletion_metric_mutex))
524 timestep_entry%number_diags_outstanding=timestep_entry%number_diags_outstanding-1
525 call check_thread_status(forthread_mutex_unlock(timestep_entry%deletion_metric_mutex))
528 call perform_inter_io_allreduction(io_configuration, value_to_send%values,
size(value_to_send%values), &
529 communication_field_name, activity%communication_operator, activity%root, timestep_entry%timestep,
handle_completion)
531 call perform_inter_io_broadcast(io_configuration, value_to_send%values,
size(value_to_send%values), &
532 communication_field_name, activity%root, timestep_entry%timestep,
handle_completion)
542 subroutine issue_communication_calls(io_configuration, timestep_entry, diagnostics_by_timestep, source, data_id, data_dump)
543 type(io_configuration_type),
intent(inout) :: io_configuration
545 integer,
intent(in) :: source, data_id
546 character,
dimension(:),
allocatable,
intent(in) :: data_dump
549 logical :: completed_diagnostics_entry
550 integer :: j, num_diags
551 type(iterator_type) :: iterator
553 character(len=STRING_LENGTH) :: communication_field_name, activity_diag_key
555 call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
558 call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
559 completed_diagnostics_entry=c_contains(diagnostics_by_timestep%completed_diagnostics, &
561 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%completed_diagnostics_rwlock))
562 if (.not. completed_diagnostics_entry)
then 564 do while (c_has_next(iterator))
567 if (.not. c_contains(timestep_entry%completed_activities, activity_diag_key))
then 570 if (.not. c_is_empty(activity%required_fields))
then 571 communication_field_name=c_get_string(activity%required_fields, 1)
572 if (is_field_present(io_configuration, source, data_id, communication_field_name))
then 573 call c_add_string(timestep_entry%completed_activities, activity_diag_key)
574 call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
577 communication_field_name)
578 call check_thread_status(forthread_mutex_lock(timestep_entry%activity_completion_mutex))
586 call check_thread_status(forthread_mutex_unlock(timestep_entry%activity_completion_mutex))
597 timestep_diagnostics_entry)
598 type(io_configuration_type),
intent(inout) :: io_configuration
599 integer,
intent(in) :: source, data_id
600 character,
dimension(:),
allocatable,
intent(in) :: data_dump
603 type(iterator_type) :: iterator
604 character(len=STRING_LENGTH) :: field_name
605 type(data_values_type),
pointer :: field_value
606 type(hashset_type) :: removed_entries
607 class(*),
pointer :: generic
609 if (.not. c_is_empty(timestep_diagnostics_entry%outstanding_fields))
then 610 call check_thread_status(forthread_rwlock_rdlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
611 iterator=c_get_iterator(timestep_diagnostics_entry%outstanding_fields)
612 do while (c_has_next(iterator))
613 field_name=c_next_string(iterator)
614 if (is_field_present(io_configuration, source, data_id, field_name))
then 617 call check_thread_status(forthread_rwlock_wrlock(timestep_diagnostics_entry%completed_fields_rwlock))
618 call c_put_generic(timestep_diagnostics_entry%completed_fields, trim(field_name), generic, .false.)
619 call check_thread_status(forthread_rwlock_unlock(timestep_diagnostics_entry%completed_fields_rwlock))
620 call c_add_string(removed_entries, field_name)
623 call check_thread_status(forthread_rwlock_unlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
624 if (.not. c_is_empty(removed_entries))
then 625 iterator=c_get_iterator(removed_entries)
626 call check_thread_status(forthread_rwlock_wrlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
627 do while (c_has_next(iterator))
628 call c_remove(timestep_diagnostics_entry%outstanding_fields, c_next_string(iterator))
630 call check_thread_status(forthread_rwlock_unlock(timestep_diagnostics_entry%outstanding_fields_rwlock))
633 call c_free(removed_entries)
639 integer,
intent(in) :: current_timestep
641 integer :: have_lock, outstanding_diags, outstanding_datas
642 type(list_type) :: entries_to_remove
643 type(iterator_type) :: iterator, all_diagnostics_iterator
644 type(mapentry_type) :: all_diag_mapentry
647 logical :: all_completed
648 character(len=STRING_LENGTH) :: entry_key
651 if (have_lock == 0)
then 660 do while(c_has_next(all_diagnostics_iterator))
661 all_diag_mapentry=c_next_mapentry(all_diagnostics_iterator)
663 call check_thread_status(forthread_mutex_lock(specific_all_diagnostics_for_ts%completed_num_mutex))
664 if (specific_all_diagnostics_for_ts%completed_num == 0)
then 665 iterator=c_get_iterator(specific_all_diagnostics_for_ts%diagnostic_entries)
667 do while (c_has_next(iterator))
669 call check_thread_status(forthread_mutex_lock(specific_monc_timestep_entry%deletion_metric_mutex))
670 outstanding_diags=specific_monc_timestep_entry%number_diags_outstanding
671 outstanding_datas=specific_monc_timestep_entry%number_datas_outstanding
672 call check_thread_status(forthread_mutex_unlock(specific_monc_timestep_entry%deletion_metric_mutex))
673 if (outstanding_diags .gt. 0 .or. outstanding_datas .gt. 0)
then 674 all_completed=.false.
678 if (all_completed)
then 679 call c_add_string(entries_to_remove, all_diag_mapentry%key)
682 call check_thread_status(forthread_mutex_unlock(specific_all_diagnostics_for_ts%completed_num_mutex))
686 if (.not. c_is_empty(entries_to_remove))
then 688 iterator=c_get_iterator(entries_to_remove)
689 do while (c_has_next(iterator))
690 entry_key=c_next_string(iterator)
695 call c_free(entries_to_remove)
706 character(len=*),
intent(in) :: key
710 type(data_values_type),
pointer :: field_data_value
711 integer :: cfentries, j
712 type(iterator_type) :: iterator, completed_fields_iterator
713 class(*),
pointer :: generic
716 if (
associated(all_diagnostics_at_ts))
then 717 iterator=c_get_iterator(all_diagnostics_at_ts%diagnostic_entries)
718 do while (c_has_next(iterator))
720 call check_thread_status(forthread_rwlock_destroy(specific_monc_timestep_entry%completed_fields_rwlock))
721 call check_thread_status(forthread_rwlock_destroy(specific_monc_timestep_entry%outstanding_fields_rwlock))
722 call check_thread_status(forthread_mutex_destroy(specific_monc_timestep_entry%activity_completion_mutex))
723 call check_thread_status(forthread_mutex_destroy(specific_monc_timestep_entry%deletion_metric_mutex))
724 call c_free(specific_monc_timestep_entry%outstanding_fields)
725 call c_free(specific_monc_timestep_entry%completed_activities)
726 completed_fields_iterator=c_get_iterator(specific_monc_timestep_entry%completed_fields)
727 do while (c_has_next(completed_fields_iterator))
728 field_data_value=>get_data_value_from_map_entry(c_next_mapentry(completed_fields_iterator))
729 if (
allocated(field_data_value%values))
deallocate(field_data_value%values)
730 deallocate(field_data_value)
732 call c_free(specific_monc_timestep_entry%completed_fields)
734 call c_free(all_diagnostics_at_ts%completed_diagnostics)
735 call c_free(all_diagnostics_at_ts%communication_corresponding_activities)
736 iterator=c_get_iterator(all_diagnostics_at_ts%diagnostic_entries)
737 do while (c_has_next(iterator))
738 generic=>c_next_generic(iterator)
739 if (
associated(generic))
deallocate(generic)
741 call c_free(all_diagnostics_at_ts%diagnostic_entries)
742 call check_thread_status(forthread_rwlock_destroy(all_diagnostics_at_ts%communication_corresponding_activities_rwlock))
743 call check_thread_status(forthread_rwlock_destroy(all_diagnostics_at_ts%completed_diagnostics_rwlock))
744 call check_thread_status(forthread_mutex_destroy(all_diagnostics_at_ts%completed_num_mutex))
756 type(io_configuration_type),
intent(inout) :: io_configuration
757 integer,
intent(in) :: source, data_id
758 character,
dimension(:),
allocatable,
intent(in) :: data_dump
759 character(len=*),
intent(in) :: field_name
760 type(data_values_type),
pointer :: get_value_from_monc_data
762 integer :: field_data_type, i
763 integer,
dimension(:),
allocatable :: int_values
765 allocate(get_value_from_monc_data)
767 if (field_data_type == 0)
then 768 call log_log(log_error,
"No data type for field '"//trim(field_name)//
"'")
770 get_value_from_monc_data%dimensions=get_number_field_dimensions(io_configuration, field_name, source, data_id)
771 if (field_data_type == double_data_type)
then 772 get_value_from_monc_data%values=get_array_double_from_monc(io_configuration, source, data_id, data_dump, field_name)
773 else if (field_data_type == integer_data_type)
then 774 int_values=get_array_integer_from_monc(io_configuration, source, data_id, data_dump, field_name)
775 allocate(get_value_from_monc_data%values(
size(int_values)))
776 do i=1,
size(int_values)
777 get_value_from_monc_data%values(i)=conv_to_real(int_values(i))
779 deallocate(int_values)
788 type(io_configuration_field_type),
dimension(:),
intent(in) :: fields
789 character(len=*),
intent(in) :: field_name
794 if (fields(i)%name .eq. field_name)
then 808 type(io_configuration_type),
intent(inout) :: io_configuration
809 integer,
intent(in) :: timestep, source
810 real(kind=DEFAULT_PRECISION),
intent(in) :: time
813 class(*),
pointer :: generic
817 if (.not.
associated(find_or_register_timestep_entry))
then 820 if (.not.
associated(find_or_register_timestep_entry))
then 822 generic=>find_or_register_timestep_entry
827 call c_add_generic(all_diags_by_timestep%diagnostic_entries, generic, .false.)
838 type(io_configuration_type),
intent(inout) :: io_configuration
839 integer,
intent(in) :: timestep, source
840 real(kind=DEFAULT_PRECISION),
intent(in) :: time
843 type(iterator_type) :: iterator
844 integer :: i, matched_datadefn_index
846 allocate(create_timestep_entry)
847 create_timestep_entry%timestep=timestep
848 create_timestep_entry%time=time
850 create_timestep_entry%source=source
851 create_timestep_entry%source_location=get_monc_location(io_configuration, source)
852 create_timestep_entry%number_datas_outstanding=0
853 do i=1,
size(io_configuration%registered_moncs(create_timestep_entry%source_location)%definition_names)
854 matched_datadefn_index=retrieve_data_definition(io_configuration, &
855 io_configuration%registered_moncs(create_timestep_entry%source_location)%definition_names(i))
856 if (matched_datadefn_index .gt. 0)
then 857 if (io_configuration%data_definitions(matched_datadefn_index)%frequency .gt. 0)
then 858 if (mod(timestep, io_configuration%data_definitions(matched_datadefn_index)%frequency) == 0)
then 859 create_timestep_entry%number_datas_outstanding=create_timestep_entry%number_datas_outstanding+1
863 call log_log(log_warn,
"IO server can not find data definition with name "&
864 //io_configuration%registered_moncs(create_timestep_entry%source_location)%definition_names(i))
868 do while (c_has_next(iterator))
869 call c_add_string(create_timestep_entry%outstanding_fields, c_next_string(iterator))
871 call check_thread_status(forthread_mutex_init(create_timestep_entry%activity_completion_mutex, -1))
872 call check_thread_status(forthread_mutex_init(create_timestep_entry%deletion_metric_mutex, -1))
873 call check_thread_status(forthread_rwlock_init(create_timestep_entry%completed_fields_rwlock, -1))
874 call check_thread_status(forthread_rwlock_init(create_timestep_entry%outstanding_fields_rwlock, -1))
880 type(list_type),
intent(inout) :: required_fields
882 type(iterator_type) :: iterator
884 if (.not. c_is_empty(required_fields))
then 885 iterator=c_get_iterator(required_fields)
886 do while (c_has_next(iterator))
896 type(iterator_type),
intent(inout) :: iterator
899 class(*),
pointer :: generic
901 generic=>c_next_generic(iterator)
903 if (
associated(generic))
then 906 retrieve_next_activity=>generic
909 retrieve_next_activity=>null()
919 integer,
intent(in) :: timestep, source
920 logical,
intent(in) :: do_lock
923 class(*),
pointer :: generic
928 if (
associated(generic))
then 931 get_timestep_entry=>generic
934 get_timestep_entry=>null()
943 integer,
intent(in) :: timestep
946 class(*),
pointer :: generic
949 if (.not.
associated(find_or_add_diagnostics_by_timestep))
then 952 if (.not.
associated(find_or_add_diagnostics_by_timestep))
then 953 allocate(find_or_add_diagnostics_by_timestep)
954 call check_thread_status(forthread_rwlock_init(&
955 find_or_add_diagnostics_by_timestep%communication_corresponding_activities_rwlock, -1))
956 call check_thread_status(forthread_rwlock_init(find_or_add_diagnostics_by_timestep%completed_diagnostics_rwlock, -1))
957 call check_thread_status(forthread_mutex_init(find_or_add_diagnostics_by_timestep%completed_num_mutex, -1))
958 find_or_add_diagnostics_by_timestep%completed_num=0
959 generic=>find_or_add_diagnostics_by_timestep
971 integer,
intent(in) :: timestep
972 logical,
intent(in) :: do_lock
975 class(*),
pointer :: generic
980 if (
associated(generic))
then 983 get_diagnostics_by_timestep=>generic
986 get_diagnostics_by_timestep=>null()
994 type(iterator_type),
intent(inout) :: iterator
997 class(*),
pointer :: generic
999 generic=>c_next_generic(iterator)
1001 if (
associated(generic))
then 1002 select type(generic)
1004 retrieve_next_specific_monc_timestep_entry=>generic
1007 retrieve_next_specific_monc_timestep_entry=>null()
1015 character(len=*),
intent(in) :: key
1019 class(*),
pointer :: generic
1022 if (
associated(generic))
then 1023 select type(generic)
1025 get_diagnostic_by_key=>generic
1028 get_diagnostic_by_key=>null()
1036 type(mapentry_type),
intent(in) :: mapentry
1039 class(*),
pointer :: generic
1041 generic=>c_get_generic(mapentry)
1043 if (
associated(generic))
then 1044 select type(generic)
1046 retrieve_diagnostics=>generic
1049 retrieve_diagnostics=>null()
1059 character(len=*),
intent(in) :: field_name
1062 class(*),
pointer :: generic
1064 call check_thread_status(forthread_rwlock_rdlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
1065 generic=>c_get_generic(diagnostics_by_timestep%communication_corresponding_activities, field_name)
1066 call check_thread_status(forthread_rwlock_unlock(diagnostics_by_timestep%communication_corresponding_activities_rwlock))
1067 if (
associated(generic))
then 1068 select type(generic)
1070 get_comm_activity_from_fieldname=>generic
1073 get_comm_activity_from_fieldname=>null()
1082 type(list_type),
intent(inout) :: action_members
1083 integer,
intent(in) :: index
1084 type(io_configuration_misc_item_type),
pointer :: get_misc_action_at_index
1086 class(*),
pointer :: generic
1088 generic=>c_get_generic(action_members, index)
1089 if (
associated(generic))
then 1090 select type(generic)
1091 type is(io_configuration_misc_item_type)
1092 get_misc_action_at_index=>generic
1095 get_misc_action_at_index=>null()
1104 type(io_configuration_type),
intent(inout) :: io_configuration
1105 type(hashmap_type),
intent(out) :: diagnostic_generation_frequency
1107 integer :: i, j, entries, action_entities, activity_freq
1108 type(io_configuration_misc_item_type),
pointer :: misc_action
1110 character(len=STRING_LENGTH) :: activity_name
1112 class(*),
pointer :: generic
1114 entries=io_configuration%number_of_diagnostics
1115 if (entries .gt. 0)
then 1124 action_entities=c_size(io_configuration%diagnostics(i)%members)
1125 if (action_entities .gt. 0)
then 1126 do j=1, action_entities
1129 item%uuid=conv_to_string(j)
1130 item%result_name=c_get_string(misc_action%embellishments,
"result")
1131 if (c_contains(misc_action%embellishments,
"root"))
then 1132 if (get_action_attribute_string(misc_action%embellishments,
"root") .eq.
"auto")
then 1133 item%root=mod(i, io_configuration%number_of_io_servers)
1135 item%root=get_action_attribute_integer(misc_action%embellishments,
"root")
1140 if (misc_action%type .eq.
"operator")
then 1141 activity_name=c_get_string(misc_action%embellishments,
"name")
1142 item%activity_name=activity_name
1143 item%required_fields=get_operator_required_fields(activity_name, misc_action%embellishments)
1144 item%activity_attributes=misc_action%embellishments
1145 item%operator_procedure=>get_operator_perform_procedure(activity_name)
1147 else if (misc_action%type .eq.
"communication")
then 1148 if (item%root .lt. 0)
call log_log(log_error,
"Root must be supplied and 0 or greater for communication actions")
1149 activity_name=c_get_string(misc_action%embellishments,
"name")
1150 if (activity_name .eq.
"reduction" .or. activity_name .eq.
"allreduction")
then 1151 call c_add_string(item%required_fields, c_get_string(misc_action%embellishments,
"field"))
1153 item%communication_operator=get_reduction_operator(c_get_string(misc_action%embellishments,
"operator"))
1154 else if (activity_name .eq.
"broadcast")
then 1155 call c_add_string(item%required_fields, c_get_string(misc_action%embellishments,
"field"))
1180 character(len=STRING_LENGTH),
intent(inout) :: result_name
1181 integer,
intent(in) :: diagnostic_entry_index
1184 type(iterator_type) :: iterator
1187 do while (c_has_next(iterator))
1189 if (get_diagnostic_activity_by_result_name%result_name == result_name)
then 1193 get_diagnostic_activity_by_result_name=>null()
1201 type(io_configuration_type),
intent(inout) :: io_configuration
1202 type(io_configuration_diagnostic_field_type),
intent(inout) :: diagnostic_configuration
1203 integer,
intent(in) :: entry_index
1205 integer :: i, auto_index, diag_modified_dim_size
1206 character(len=STRING_LENGTH) :: specific_dimension
1210 if (
associated(diagnostic_activity))
then 1212 do i=1, diagnostic_configuration%dimensions
1213 auto_index=index(diagnostic_configuration%dim_size_defns(i),
"-auto")
1214 if (auto_index .ne. 0)
then 1215 specific_dimension=diagnostic_configuration%dim_size_defns(i)(1:auto_index-1)
1216 diag_modified_dim_size=get_operator_auto_size(io_configuration, diagnostic_activity%activity_name, &
1217 specific_dimension, diagnostic_activity%activity_attributes)
1218 if (diag_modified_dim_size .ge. 0)
then 1219 specific_dimension=trim(specific_dimension)//
"_"//trim(conv_to_string(diag_modified_dim_size))
1220 diagnostic_configuration%dim_size_defns(i)=specific_dimension
1221 call c_put_integer(io_configuration%dimension_sizing, specific_dimension, diag_modified_dim_size)
1223 diagnostic_configuration%dim_size_defns(i)=specific_dimension
1236 type(io_configuration_type),
intent(inout) :: io_configuration
1237 type(list_type),
intent(inout) :: required_fields
1239 integer :: field_freq
1240 type(iterator_type) :: iterator
1243 iterator=c_get_iterator(required_fields)
1244 do while (c_has_next(iterator))
1255 type(io_configuration_type),
intent(inout) :: io_configuration
1256 character(len=*),
intent(in) :: field_name
1259 do i=1, io_configuration%number_of_data_definitions
1260 do j=1, io_configuration%data_definitions(i)%number_of_data_fields
1261 if (io_configuration%data_definitions(i)%fields(j)%name == field_name)
then logical function, public check_broadcast_inter_io_for_completion(io_configuration)
Checks the statuses for broadcast completion and returns whether they are all finished or not...
type(hashmap_type), volatile all_diagnostics_at_timestep
integer function, dimension(:), allocatable, public get_array_integer_from_monc(io_configuration, source, data_id, data_dump, key)
Retreives an array of integers with a corresponding key from the raw data dump. The size depends on t...
type(all_diagnostics_at_timestep_type) function, pointer find_or_add_diagnostics_by_timestep(timestep)
Finds or adds diagnostics by timestep. This is used to maintain a list of all diagnostic entries for ...
type(hashmap_type) function, public determine_diagnostics_fields_available(monc_field_names)
Determines the diagnostics fields that are available based upon the input MONC fields on registration...
integer, volatile previous_clean_point
logical function, public check_allreduction_inter_io_for_completion(io_configuration)
Determines whether this all reduction inter IO functionality has completed or not.
integer function forthread_rwlock_init(rwlock_id, attr_id)
All reduction, which does a reduce and then broadcasts the data to all IO servers.
Gets a specific generic element out of the list, stack, queue or map with the corresponding key...
Returns whether a collection is empty.
Puts an integer key-value pair into the map.
integer, parameter operator_type
integer function forthread_mutex_unlock(mutex_id)
subroutine, public init_reduction_inter_io(io_configuration)
Initialises the reduction action.
subroutine handle_completion(io_configuration, values, field_name, timestep)
Handles inter io reduction completion, it adds the resulting value to the appropriate completion list...
Overall IO configuration.
type(diagnostics_at_timestep_type) function, pointer get_timestep_entry(timestep, source, do_lock)
Retrieves the timestep at a specific timestep and source MONC.
integer, parameter, public log_error
Only log ERROR messages.
subroutine handle_completion_for_specific_monc_timestep_entry(io_configuration, result_to_add, field_name, timestep_entry, diagnostics_by_timestep)
This handles inter IO completion for a specific timestep entry. This is required as at a timestep the...
type(diagnostics_at_timestep_type) function, pointer retrieve_next_specific_monc_timestep_entry(iterator)
Retrieves the next MONC timestep entry from the all diagnostics based upon a collections iterator...
integer function forthread_mutex_destroy(mutex_id)
subroutine define_diagnostics(io_configuration, diagnostic_generation_frequency)
Based upon the IO configuration this will define the diagnostics structure. It is done once at initia...
type(list_type) function, public get_operator_required_fields(operator_name, action_attributes)
Retrieves the list of fields required by an operator before it can run.
integer function get_datatype_of_field(fields, field_name)
Retrieves the data type of a field or 0 if the field was not found.
integer, volatile clean_progress_mutex
subroutine, public perform_inter_io_reduction(io_configuration, field_values, field_size, reduction_field_name, reduction_op, root, timestep, completion_procedure)
Actually handles the processing for this data wrt the vertical reduction.
Contains functionality for managing and extracting data from the raw data dumps that the IO server re...
logical function, public check_reduction_inter_io_for_completion(io_configuration)
Checks this action for completion, when all are completed then the IO server can shutdown as this is ...
subroutine check_all_activities_against_completed_fields(io_configuration, timestep_entry, diagnostics_by_timestep)
Checks all pending activities against the completed fields and runs them if the required fields are n...
type(diagnostics_activity_type) function, pointer get_diagnostic_activity_by_result_name(result_name, diagnostic_entry_index)
Retrives a diagnostic activity based upon its result name or null if none is found.
subroutine perform_inter_io_communication(io_configuration, timestep_entry, all_entries_at_timestep, activity, value_to_send, communication_field_name)
Performs the actual inter IO communication by calling out to the appropriate inter IO module...
integer function get_diagnostic_generation_frequency(io_configuration, required_fields)
Retrieves the max diagnostic generation frequency for a set of fields.
logical function, public get_scalar_logical_from_monc(io_configuration, source, data_id, data_dump, key)
Retrieves a single logical element (scalar) from the data dump.
logical function are_fields_available_for_activity(timestep_entry, activity)
Determines whether the fields required for an activity are available so that activity can be run...
integer, parameter broadcast_type
integer, parameter, public default_precision
MPI communication type which we use for the prognostic and calculation data.
subroutine, public init_global_callback_inter_io(io_configuration)
Initialises the global callback.
The writer field manager will manage aspects of the fields being provided to the writer federator...
logical function handle_operator_completion(io_configuration, timestep_entry, specific_activity)
Handles the completion of the operator.
integer function, public get_action_attribute_integer(action_attributes, field_name)
Retrieves the name of a field from the attributes specified in the configuration. ...
integer function forthread_rwlock_rdlock(lock_id)
integer function get_field_frequency(io_configuration, field_name)
Retrieves the generation frequency for a specific field.
subroutine process_auto_dimensions(io_configuration, diagnostic_configuration, entry_index)
Processes all auto dimensions by looking them up and resolving them based upon the operators...
type(io_configuration_misc_item_type) function, pointer get_misc_action_at_index(action_members, index)
Retrieves a misc action from the parsed user XML configuration at a specific index.
Contains common definitions for the data and datatypes used by MONC.
Adds an integer element to the end of the list.
logical function, public check_diagnostic_federator_for_completion(io_configuration)
Checks whether the diagnostics federator has completed or not, this is really checking all the underl...
A hashmap structure, the same as a map but uses hashing for greatly improved performance when storing...
subroutine, public finalise_broadcast_inter_io()
Finalises the broadcast inter IO functionality.
integer function forthread_mutex_init(mutex_id, attr_id)
Conversion between common inbuilt FORTRAN data types.
integer, parameter, public double_data_type
type(diagnostics_activity_type) function, pointer get_comm_activity_from_fieldname(diagnostics_by_timestep, field_name)
Retrieves a communication activity from its field name.
Converts data types to strings.
integer function forthread_rwlock_wrlock(lock_id)
Operator federator which manages the different operators which are available. Operators take in any n...
subroutine, public finalise_allreduction_inter_io(io_configuration)
Finalises the all reduction inter IO functionality.
integer function forthread_mutex_trylock(mutex_id)
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
type(diagnostics_activity_type) function, pointer retrieve_next_activity(iterator)
Retrieves the next activity in a collection being iterated over by an iterator.
real(kind=double_precision) function, dimension(:), allocatable, public get_array_double_from_monc(io_configuration, source, data_id, data_dump, key)
Retreives an array of doubles with a corresponding key from the raw data dump. The size depends on th...
This defines some constants and procedures that are useful to the IO server and clients that call it...
integer, parameter, public integer_data_type
Broadcast inter IO communication which sends a value from one IO server to all others. This tracks field name and timestep and only issues one call (and one results call to completion) for that combination.
character(len=string_length) function generate_activity_diagnostic_key(diagnostic, activity)
Generates a unique key for an activity within a diagnostic, which is unique amongst all diagnostics a...
logical function, public is_field_present(io_configuration, source, data_id, key)
Map data structure that holds string (length 20 maximum) key value pairs.
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Global callback inter IO, which registers the callback with identifiers and then the procedure is act...
integer, parameter reduction_type
Returns the number of elements in the collection.
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
integer, parameter allreduction_type
integer function forthread_mutex_lock(mutex_id)
subroutine, public perform_inter_io_broadcast(io_configuration, field_values, field_size, field_name, root, timestep, completion_procedure)
Performs an inter IO broadcast of data from the root to all other IO servers. Note that this is on th...
subroutine, public finalise_operators()
Finalises any operators that require finalisation.
subroutine clean_diagnostic_states(current_timestep)
Cleans the diagnostic states if required (based on the timestep period)
Collection data structures.
subroutine handle_diagnostic_calculation_completed(io_configuration, diagnostic_index, timestep_entry, diagnostics_by_timestep)
Handles completion of a diagnostic calculation and will then pass this onto interested parties...
integer function, public get_monc_location(io_configuration, source)
A helper function to get the location of a MONC's configuration in the IO data structure.
subroutine, public init_broadcast_inter_io(io_configuration)
Initialises the broadcast inter IO functionality.
procedure(perform_activity) function, pointer, public get_operator_perform_procedure(operator_name)
Retrieves the operator execution procedure of an operator with a specific name.
type(all_diagnostics_at_timestep_type) function, pointer retrieve_diagnostics(mapentry)
Retrieves the all diagnostics at a specific timestep from its map entry.
integer, parameter, public log_warn
Log WARNING and ERROR messages.
type(hashmap_type), volatile diagnostics_per_monc_at_timestep
type(hashset_type), volatile all_outstanding_fields
Converts data types to real.
integer, parameter perform_clean_every
A wrapper type containing all the diagnostics for MONC source processes at a specific timestep...
integer, parameter, public string_length
Default length of strings.
logical function, public get_action_attribute_logical(action_attributes, field_name)
Retrieves a logical value from the attribute which corresponds to a specific key. ...
subroutine check_diagnostics_entries_against_data(io_configuration, source, data_id, data_dump, timestep_diagnostics_entry)
Checks the outstanding fields of a time step entry against the data recieved from MONC and moves any ...
character(len=string_length) function, public get_action_attribute_string(action_attributes, field_name)
Retrieves the name of a field from the attributes specified in the configuration. ...
integer function, public get_number_field_dimensions(io_configuration, field_name, source, data_id)
Retrieves the number of field dimensions that a specific field has from a MONC process within a data ...
List data structure which implements a doubly linked list. This list will preserve its order...
subroutine, public init_allreduction_inter_io(io_configuration)
Initialises the all reduction inter IO functionality.
type(diagnostics_type), dimension(:), allocatable, volatile diagnostic_definitions
Adds a generic element to the end of the list.
integer function, public retrieve_data_definition(io_configuration, key)
Retrieves a specific data definition from the configuration which matches a key.
subroutine, public finalise_global_callback_inter_io(io_configuration)
Finalises the global callback.
integer function forthread_rwlock_destroy(rwlock_id)
subroutine add_required_fields_if_needed(required_fields)
Adds the required fields of an activity to the overall required fields which are cloned for each new ...
integer, volatile previous_viewed_timestep
integer, volatile current_point
subroutine deallocate_diagnostics_at_timestep(key)
Deallocates all the diagnostics at a specific timestep, this removes all the individual MONC timestep...
subroutine, public finalise_reduction_inter_io(io_configuration)
Finalises the reduction action, waiting for all outstanding requests and then freeing data...
Frees up all the allocatable, heap, memory associated with a list, stack, queue or map...
subroutine, public pass_fields_to_diagnostics_federator(io_configuration, source, data_id, data_dump)
Entry point into the diagnostics federator this runs the diagnostics, executing the defined rules bas...
integer function forthread_rwlock_unlock(lock_id)
type(data_values_type) function, pointer, public get_data_value_from_map_entry(map_entry)
Retrieves the data value (wrapper) by field name or null if no entry was found in the provided map en...
Reduction inter IO action which will perform reductions between IO servers. This is not as trivial as...
type(hashmap_type) function, public initialise_diagnostic_federator(io_configuration)
Initialises the diagnostics action and sets up the diagnostics master definitions.
Puts a generic key-value pair into the map.
integer function, public get_scalar_integer_from_monc(io_configuration, source, data_id, data_dump, key)
Retrieves a single integer element (scalar) from the data dump.
real(kind=double_precision) function, public get_scalar_real_from_monc(io_configuration, source, data_id, data_dump, key)
Retreives a scalar real with a corresponding key from the raw data dump.
type(diagnostics_at_timestep_type) function, pointer find_or_register_timestep_entry(io_configuration, timestep, source, time)
Locates or registers a new (if it does not exist) time step entry based upon the timestep and source ...
integer function, public get_reduction_operator(op_string)
Given the map of action attributes this procedure will identify the reduction operator that has been ...
integer, volatile all_diagnostics_per_timestep_rwlock
type(data_values_type) function, pointer get_value_from_monc_data(io_configuration, source, data_id, data_dump, field_name)
Retrieves a value from the communicated MONC data. If this was an integer then converts to a real...
Retrieves the generic value held at the specific map index or null if index > map elements...
type(all_diagnostics_at_timestep_type) function, pointer get_diagnostics_by_timestep(timestep, do_lock)
Retrieves the diagnostics list (each MONC source) at a specific timestep.
Hashset structure which will store unique strings. The hashing aspect means that lookup is very fast ...
Configuration associated with the representation of a specific data field.
Determines whether or not a map contains a specific key.
subroutine, public perform_inter_io_allreduction(io_configuration, field_values, field_size, field_name, reduction_op, root, timestep, completion_procedure)
Performs the all reduction inter IO reduction.
Adds a string to the end of the list.
subroutine, public finalise_diagnostic_federator(io_configuration)
Finalises the diagnostics federator, waiting for all outstanding requests and then freeing data...
type(hashset_type), volatile available_fields
integer, volatile timestep_entries_rwlock
type(all_diagnostics_at_timestep_type) function, pointer get_diagnostic_by_key(key)
Retrieves all diagnostics at a timestep by its key.
integer function, public get_operator_auto_size(io_configuration, operator_name, auto_dimension, action_attributes)
Gets a specific string element out of the list, stack, queue or map with the corresponding key...
Parses the XML configuration file to produce the io configuration description which contains the data...
This diagnostics federator will take in data fields sent from a MONC, perform operators on these as r...
subroutine issue_communication_calls(io_configuration, timestep_entry, diagnostics_by_timestep, source, data_id, data_dump)
Issues any inter io communucation calls that are appropriate based upon the data recieved from MONC...
subroutine, public initialise_operators()
Initialises any operators that require initialisation.
Removes a specific element from the list or map.
type(diagnostics_at_timestep_type) function, pointer create_timestep_entry(io_configuration, timestep, time, source)
Creates a timestep entry and processes all members, determining activities their required fields etc...