MONC
threadpool.F90
Go to the documentation of this file.
1 
12  use mpi, only : mpi_thread_multiple, mpi_thread_serialized
13  implicit none
14 
15 #ifndef TEST_MODE
16  private
17 #endif
18 
20  interface
21  subroutine thread_procedure(arguments, data_buffer)
22  integer, dimension(:), intent(in) :: arguments
23  character, dimension(:), allocatable, intent(inout), optional :: data_buffer
24  end subroutine thread_procedure
25  end interface
26 
29  procedure(thread_procedure), pointer, nopass :: proc
30  integer, dimension(:), allocatable :: arguments
31  character, dimension(:), allocatable :: data_buffer
33 
34  integer, parameter :: default_thread_pool_size=10
35  logical, volatile, dimension(:), allocatable :: thread_busy, thread_start
36  integer, volatile, dimension(:), allocatable :: thread_ids, thread_pass_data
37  integer, volatile, dimension(:), allocatable :: activate_thread_condition_variables, activate_thread_mutex
38  type(threaded_procedure_container_type), volatile, dimension(:), allocatable :: thread_entry_containers
39  integer, volatile :: netcdfmutex
40  integer, volatile :: next_suggested_idle_thread
41  logical, volatile :: threadpool_active
42 
44 
47 contains
48 
50  subroutine threadpool_init(io_configuration)
51  type(io_configuration_type), intent(inout) :: io_configuration
52 
53  integer :: n
54 
57  if (io_configuration%number_of_threads .ge. 1) then
58  total_number_of_threads=io_configuration%number_of_threads
59  else
60  if (io_configuration%my_io_rank==0) then
61  call log_log(log_warn, "No setting for IO server thread pool size which must be 1 or more so using default size")
62  end if
64  end if
69  threadpool_active=.true.
76  thread_busy(n)=.false.
77  thread_start(n)=.false.
78  thread_pass_data(n)=n
80  end do
81  end subroutine threadpool_init
82 
85 #ifdef ENFORCE_THREAD_SAFETY
87 #endif
88  end subroutine threadpool_lock_netcdf_access
89 
92 #ifdef ENFORCE_THREAD_SAFETY
94 #endif
95  end subroutine threadpool_unlock_netcdf_access
96 
101  subroutine threadpool_start_thread(proc, arguments, data_buffer)
102  procedure(thread_procedure) :: proc
103  integer, dimension(:), intent(in) :: arguments
104  character, dimension(:), allocatable, intent(in), optional :: data_buffer
105 
106  integer :: idle_thread_id
107 
108  if (.not. threadpool_active) call log_log(log_error, "Attemping to start IO thread on deactivated thread pool")
109 
110  idle_thread_id=find_idle_thread()
111  if (idle_thread_id .ne. -1) then
112  thread_busy(idle_thread_id)=.true.
113  thread_entry_containers(idle_thread_id)%proc=>proc
114  allocate(thread_entry_containers(idle_thread_id)%arguments(size(arguments)))
115  thread_entry_containers(idle_thread_id)%arguments=arguments
116  if (present(data_buffer)) allocate(thread_entry_containers(idle_thread_id)%data_buffer(size(data_buffer)), &
117  source=data_buffer)
118  ! Send the signal to the thread to wake up and start
120  thread_start(idle_thread_id)=.true.
123  end if
124  end subroutine threadpool_start_thread
125 
129  subroutine threadpool_thread_entry_procedure(thread_id)
130  integer :: thread_id
131 
132  do while (threadpool_active)
134  do while (.not. thread_start(thread_id) .and. threadpool_active)
136  activate_thread_mutex(thread_id)))
137  end do
139  if (.not. threadpool_active) return
140  thread_busy(thread_id)=.true.
141  thread_start(thread_id)=.false.
142 
146  if (allocated(thread_entry_containers(thread_id)%data_buffer)) then
147  call thread_entry_containers(thread_id)%proc(thread_entry_containers(thread_id)%arguments, &
148  data_buffer=thread_entry_containers(thread_id)%data_buffer)
149  deallocate(thread_entry_containers(thread_id)%data_buffer)
150  else
151  call thread_entry_containers(thread_id)%proc(thread_entry_containers(thread_id)%arguments)
152  end if
153  deallocate(thread_entry_containers(thread_id)%arguments)
157  thread_busy(thread_id)=.false.
158  end do
159  end subroutine threadpool_thread_entry_procedure
160 
163  logical function threadpool_is_idle()
168  end function threadpool_is_idle
169 
173  subroutine threadpool_deactivate()
174  integer :: i
175  integer, pointer :: retval
176 
177  allocate(retval)
178 
179  threadpool_active=.false.
187  end do
188  end subroutine threadpool_deactivate
189 
191  subroutine threadpool_finalise()
197  end subroutine threadpool_finalise
198 
201  integer function find_idle_thread()
203  do while (find_idle_thread == -1)
205  end do
206  end function find_idle_thread
207 
211  integer function get_index_of_idle_thread()
212  integer :: i
213 
215  if (.not. thread_busy(i)) then
219  return
220  end if
221  end do
224  end function get_index_of_idle_thread
225 
228  subroutine check_thread_status(ierr)
229  integer, intent(in) :: ierr
230 
231  if (ierr .ne. 0) then
232  call log_log(log_error, "Pthreads error in IO server, error code="//conv_to_string(ierr))
233  end if
234  end subroutine check_thread_status
235 end module threadpool_mod
type(threaded_procedure_container_type), dimension(:), allocatable, volatile thread_entry_containers
Definition: threadpool.F90:38
The thread call procedure interface.
Definition: threadpool.F90:21
integer function forthread_mutex_unlock(mutex_id)
Definition: forthread.F90:302
integer function forthread_cond_signal(cond_id)
Definition: forthread.F90:394
Wraps the thread procedure with the call itself and the data to pass to it.
Definition: threadpool.F90:28
integer function find_idle_thread()
Finds an idle thread, if one is not available then will block until one becomes free.
Definition: threadpool.F90:202
integer, volatile next_suggested_idle_thread
Definition: threadpool.F90:40
integer, dimension(:), allocatable, volatile activate_thread_mutex
Definition: threadpool.F90:37
integer, parameter, public log_error
Only log ERROR messages.
Definition: logging.F90:11
integer, volatile total_number_of_threads
Definition: threadpool.F90:43
subroutine, public threadpool_init(io_configuration)
Initialises the thread pool and marks each thread as idle.
Definition: threadpool.F90:51
integer function forthread_mutex_destroy(mutex_id)
Definition: forthread.F90:265
integer function forthread_kill(thread_id, sig)
Definition: forthread.F90:114
Logging utility.
Definition: logging.F90:2
integer function get_index_of_idle_thread()
Specifically gets the index of the next idle thread or -1 if they are all busy. This starts from a ne...
Definition: threadpool.F90:212
integer function forthread_join(thread_id, val)
Definition: forthread.F90:93
logical function, public threadpool_is_idle()
Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work) ...
Definition: threadpool.F90:164
integer function forthread_init()
Definition: forthread.F90:11
integer, dimension(:), allocatable, volatile thread_ids
Definition: threadpool.F90:36
subroutine, public threadpool_deactivate()
This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation an...
Definition: threadpool.F90:174
integer function forthread_mutex_init(mutex_id, attr_id)
Definition: forthread.F90:274
Conversion between common inbuilt FORTRAN data types.
Definition: conversions.F90:5
integer function forthread_cond_init(cond_id, attr_id)
Definition: forthread.F90:356
Converts data types to strings.
Definition: conversions.F90:36
logical, dimension(:), allocatable, volatile thread_busy
Definition: threadpool.F90:35
subroutine, public threadpool_lock_netcdf_access()
Aquires the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it...
Definition: threadpool.F90:85
integer function forthread_create(thread_id, attr_id, run, arg)
Definition: forthread.F90:33
subroutine, public log_log(level, message, str)
Logs a message at the specified level. If the level is above the current level then the message is ig...
Definition: logging.F90:75
logical, dimension(:), allocatable, volatile thread_start
Definition: threadpool.F90:35
This is a thread pool and the single management "main" thread will spawn out free threads in the pool...
Definition: threadpool.F90:5
subroutine, public threadpool_finalise()
Finalises the thread pool.
Definition: threadpool.F90:192
subroutine, public check_thread_status(ierr)
Checks the error status of any thread operation and reports an error if it failed.
Definition: threadpool.F90:229
integer function forthread_mutex_lock(mutex_id)
Definition: forthread.F90:284
integer, dimension(:), allocatable, volatile thread_pass_data
Definition: threadpool.F90:36
integer, volatile active_scalar_mutex
Definition: threadpool.F90:43
integer, parameter default_thread_pool_size
Number of threads in the pool.
Definition: threadpool.F90:34
subroutine threadpool_thread_entry_procedure(thread_id)
Entry point called by each thread creation in the pool, this calls out to the actual procedure to exe...
Definition: threadpool.F90:130
integer, parameter, public log_warn
Log WARNING and ERROR messages.
Definition: logging.F90:12
integer function forthread_destroy()
Definition: forthread.F90:23
integer, dimension(:), allocatable, volatile activate_thread_condition_variables
Definition: threadpool.F90:37
integer function forthread_cond_broadcast(cond_id)
Definition: forthread.F90:385
integer function forthread_cond_destroy(cond_id)
Definition: forthread.F90:347
integer function forthread_cond_wait(cond_id, mutex_id)
Definition: forthread.F90:376
subroutine, public threadpool_start_thread(proc, arguments, data_buffer)
Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.
Definition: threadpool.F90:102
subroutine, public threadpool_unlock_netcdf_access()
Releases the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it...
Definition: threadpool.F90:92
logical, volatile threadpool_active
Definition: threadpool.F90:41
integer, volatile active_threads
Definition: threadpool.F90:43
Parses the XML configuration file to produce the io configuration description which contains the data...
integer, volatile netcdfmutex
Mutex used for controling NetCDF access.
Definition: threadpool.F90:39