MONC
Data Types | Functions/Subroutines | Variables
threadpool_mod Module Reference

This is a thread pool and the single management "main" thread will spawn out free threads in the pool to perform specific work. If there are no free threads then it will block until one becomes available. It uses ForThreads, which is a wrapper around pthreads. The thread pool works by creating a number of threads and then passing the work to these threads, rather than creating a new thread for each piece of work. More...

Data Types

interface  thread_procedure
 The thread call procedure interface. More...
 
type  threaded_procedure_container_type
 Wraps the thread procedure with the call itself and the data to pass to it. More...
 

Functions/Subroutines

subroutine, public threadpool_init (io_configuration)
 Initialises the thread pool and marks each thread as idle. More...
 
subroutine, public threadpool_lock_netcdf_access ()
 Aquires the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it. More...
 
subroutine, public threadpool_unlock_netcdf_access ()
 Releases the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it. More...
 
subroutine, public threadpool_start_thread (proc, arguments, data_buffer)
 Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle. More...
 
subroutine threadpool_thread_entry_procedure (thread_id)
 Entry point called by each thread creation in the pool, this calls out to the actual procedure to execute and doing it this way allows us to perform some actions before or after which can help with the management of the pool. More...
 
logical function, public threadpool_is_idle ()
 Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work) More...
 
subroutine, public threadpool_deactivate ()
 This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation and finalisation procedures are split out as we want to deactivate the pool (to ensure no threads are running actions), finalise these actions which might involve destroying mutexes, and then destroying the threading environment in finalisation. More...
 
subroutine, public threadpool_finalise ()
 Finalises the thread pool. More...
 
integer function find_idle_thread ()
 Finds an idle thread, if one is not available then will block until one becomes free. More...
 
integer function get_index_of_idle_thread ()
 Specifically gets the index of the next idle thread or -1 if they are all busy. This starts from a next suggested idle thread and will wrap around, as often the next thread will be idle rather than searching from the beginning again. More...
 
subroutine, public check_thread_status (ierr)
 Checks the error status of any thread operation and reports an error if it failed. More...
 

Variables

integer, parameter default_thread_pool_size =10
 Number of threads in the pool. More...
 
logical, dimension(:), allocatable, volatile thread_busy
 
logical, dimension(:), allocatable, volatile thread_start
 
integer, dimension(:), allocatable, volatile thread_ids
 
integer, dimension(:), allocatable, volatile thread_pass_data
 
integer, dimension(:), allocatable, volatile activate_thread_condition_variables
 
integer, dimension(:), allocatable, volatile activate_thread_mutex
 
type(threaded_procedure_container_type), dimension(:), allocatable, volatile thread_entry_containers
 
integer, volatile netcdfmutex
 Mutex used for controling NetCDF access. More...
 
integer, volatile next_suggested_idle_thread
 
logical, volatile threadpool_active
 
integer, volatile active_threads
 
integer, volatile total_number_of_threads
 
integer, volatile active_scalar_mutex
 

Detailed Description

This is a thread pool and the single management "main" thread will spawn out free threads in the pool to perform specific work. If there are no free threads then it will block until one becomes available. It uses ForThreads, which is a wrapper around pthreads. The thread pool works by creating a number of threads and then passing the work to these threads, rather than creating a new thread for each piece of work.

Function/Subroutine Documentation

◆ check_thread_status()

subroutine, public threadpool_mod::check_thread_status ( integer, intent(in)  ierr)

Checks the error status of any thread operation and reports an error if it failed.

Parameters
ierrThe error/success flag returned from the ForThreads library, which itself is returned from pthreads

Definition at line 229 of file threadpool.F90.

229  integer, intent(in) :: ierr
230 
231  if (ierr .ne. 0) then
232  call log_log(log_error, "Pthreads error in IO server, error code="//conv_to_string(ierr))
233  end if
Here is the call graph for this function:

◆ find_idle_thread()

integer function threadpool_mod::find_idle_thread ( )
private

Finds an idle thread, if one is not available then will block until one becomes free.

Returns
The id of the idle thread which can be used

Definition at line 202 of file threadpool.F90.

202  find_idle_thread=get_index_of_idle_thread()
203  do while (find_idle_thread == -1)
204  find_idle_thread=get_index_of_idle_thread()
205  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_index_of_idle_thread()

integer function threadpool_mod::get_index_of_idle_thread ( )
private

Specifically gets the index of the next idle thread or -1 if they are all busy. This starts from a next suggested idle thread and will wrap around, as often the next thread will be idle rather than searching from the beginning again.

Returns
The index of the next idle thread or -1 if there is none

Definition at line 212 of file threadpool.F90.

212  integer :: i
213 
214  do i=next_suggested_idle_thread, total_number_of_threads
215  if (.not. thread_busy(i)) then
216  get_index_of_idle_thread=i
217  next_suggested_idle_thread=i+1
218  if (next_suggested_idle_thread .gt. total_number_of_threads) next_suggested_idle_thread=1
219  return
220  end if
221  end do
222  next_suggested_idle_thread=1
223  get_index_of_idle_thread=-1
Here is the caller graph for this function:

◆ threadpool_deactivate()

subroutine, public threadpool_mod::threadpool_deactivate ( )

This waits for all busy threads to complete and then shuts all the pthreads down. The deactivation and finalisation procedures are split out as we want to deactivate the pool (to ensure no threads are running actions), finalise these actions which might involve destroying mutexes, and then destroying the threading environment in finalisation.

Definition at line 174 of file threadpool.F90.

174  integer :: i
175  integer, pointer :: retval
176 
177  allocate(retval)
178 
179  threadpool_active=.false.
180  do i=1, total_number_of_threads
181  call check_thread_status(forthread_mutex_lock(activate_thread_mutex(i)))
182  call check_thread_status(forthread_cond_signal(activate_thread_condition_variables(i)))
183  call check_thread_status(forthread_mutex_unlock(activate_thread_mutex(i)))
184  call check_thread_status(forthread_join(thread_ids(i),retval))
185  call check_thread_status(forthread_mutex_destroy(activate_thread_mutex(i)))
186  call check_thread_status(forthread_cond_destroy(activate_thread_condition_variables(i)))
187  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadpool_finalise()

subroutine, public threadpool_mod::threadpool_finalise ( )

Finalises the thread pool.

Definition at line 192 of file threadpool.F90.

192  call check_thread_status(forthread_mutex_destroy(netcdfmutex))
193  call check_thread_status(forthread_mutex_destroy(active_scalar_mutex))
194  deallocate(thread_busy, thread_start, thread_ids, thread_pass_data, activate_thread_condition_variables, &
195  activate_thread_mutex, thread_entry_containers)
196  call check_thread_status(forthread_destroy())
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadpool_init()

subroutine, public threadpool_mod::threadpool_init ( type(io_configuration_type), intent(inout)  io_configuration)

Initialises the thread pool and marks each thread as idle.

Definition at line 51 of file threadpool.F90.

51  type(io_configuration_type), intent(inout) :: io_configuration
52 
53  integer :: n
54 
55  call check_thread_status(forthread_init())
56  call check_thread_status(forthread_mutex_init(netcdfmutex, -1))
57  if (io_configuration%number_of_threads .ge. 1) then
58  total_number_of_threads=io_configuration%number_of_threads
59  else
60  if (io_configuration%my_io_rank==0) then
61  call log_log(log_warn, "No setting for IO server thread pool size which must be 1 or more so using default size")
62  end if
63  total_number_of_threads=default_thread_pool_size
64  end if
65  allocate(thread_busy(total_number_of_threads), thread_start(total_number_of_threads), &
66  thread_ids(total_number_of_threads), thread_pass_data(total_number_of_threads), &
67  activate_thread_condition_variables(total_number_of_threads), activate_thread_mutex(total_number_of_threads), &
68  thread_entry_containers(total_number_of_threads))
69  threadpool_active=.true.
70  active_threads=total_number_of_threads
71  next_suggested_idle_thread=1
72  call check_thread_status(forthread_mutex_init(active_scalar_mutex, -1))
73  do n=1, total_number_of_threads
74  call check_thread_status(forthread_cond_init(activate_thread_condition_variables(n), -1))
75  call check_thread_status(forthread_mutex_init(activate_thread_mutex(n), -1))
76  thread_busy(n)=.false.
77  thread_start(n)=.false.
78  thread_pass_data(n)=n
79  call check_thread_status(forthread_create(thread_ids(n), -1, threadpool_thread_entry_procedure, thread_pass_data(n)))
80  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadpool_is_idle()

logical function, public threadpool_mod::threadpool_is_idle ( )

Determines whether the thread pool is idle or not (i.e. all threads are idle and waiting for work)

Returns
Whether the thread pool is idle

Definition at line 164 of file threadpool.F90.

164 
165  call check_thread_status(forthread_mutex_lock(active_scalar_mutex))
166  threadpool_is_idle = active_threads==total_number_of_threads
167  call check_thread_status(forthread_mutex_unlock(active_scalar_mutex))
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadpool_lock_netcdf_access()

subroutine, public threadpool_mod::threadpool_lock_netcdf_access ( )

Aquires the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it.

Definition at line 85 of file threadpool.F90.

85 #ifdef ENFORCE_THREAD_SAFETY
86  call check_thread_status(forthread_mutex_lock(netcdfmutex))
87 #endif
Here is the call graph for this function:

◆ threadpool_start_thread()

subroutine, public threadpool_mod::threadpool_start_thread ( procedure(thread_procedure proc,
integer, dimension(:), intent(in)  arguments,
character, dimension(:), intent(in), optional, allocatable  data_buffer 
)

Starts an idle thread from the pool to execute a specific procedure with some data. If there is no thread available then this will block until one becomes idle.

Parameters
procThe procedure for the thread to execute
dataData to pass into the thread

Definition at line 102 of file threadpool.F90.

102  procedure(thread_procedure) :: proc
103  integer, dimension(:), intent(in) :: arguments
104  character, dimension(:), allocatable, intent(in), optional :: data_buffer
105 
106  integer :: idle_thread_id
107 
108  if (.not. threadpool_active) call log_log(log_error, "Attemping to start IO thread on deactivated thread pool")
109 
110  idle_thread_id=find_idle_thread()
111  if (idle_thread_id .ne. -1) then
112  thread_busy(idle_thread_id)=.true.
113  thread_entry_containers(idle_thread_id)%proc=>proc
114  allocate(thread_entry_containers(idle_thread_id)%arguments(size(arguments)))
115  thread_entry_containers(idle_thread_id)%arguments=arguments
116  if (present(data_buffer)) allocate(thread_entry_containers(idle_thread_id)%data_buffer(size(data_buffer)), &
117  source=data_buffer)
118  ! Send the signal to the thread to wake up and start
119  call check_thread_status(forthread_mutex_lock(activate_thread_mutex(idle_thread_id)))
120  thread_start(idle_thread_id)=.true.
121  call check_thread_status(forthread_cond_signal(activate_thread_condition_variables(idle_thread_id)))
122  call check_thread_status(forthread_mutex_unlock(activate_thread_mutex(idle_thread_id)))
123  end if
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadpool_thread_entry_procedure()

subroutine threadpool_mod::threadpool_thread_entry_procedure ( integer  thread_id)
private

Entry point called by each thread creation in the pool, this calls out to the actual procedure to execute and doing it this way allows us to perform some actions before or after which can help with the management of the pool.

Parameters
thread_idThe thread pool id (index) of this thread

Definition at line 130 of file threadpool.F90.

130  integer :: thread_id
131 
132  do while (threadpool_active)
133  call check_thread_status(forthread_mutex_lock(activate_thread_mutex(thread_id)))
134  do while (.not. thread_start(thread_id) .and. threadpool_active)
135  call check_thread_status(forthread_cond_wait(activate_thread_condition_variables(thread_id), &
136  activate_thread_mutex(thread_id)))
137  end do
138  call check_thread_status(forthread_mutex_unlock(activate_thread_mutex(thread_id)))
139  if (.not. threadpool_active) return
140  thread_busy(thread_id)=.true.
141  thread_start(thread_id)=.false.
142 
143  call check_thread_status(forthread_mutex_lock(active_scalar_mutex))
144  active_threads=active_threads-1
145  call check_thread_status(forthread_mutex_unlock(active_scalar_mutex))
146  if (allocated(thread_entry_containers(thread_id)%data_buffer)) then
147  call thread_entry_containers(thread_id)%proc(thread_entry_containers(thread_id)%arguments, &
148  data_buffer=thread_entry_containers(thread_id)%data_buffer)
149  deallocate(thread_entry_containers(thread_id)%data_buffer)
150  else
151  call thread_entry_containers(thread_id)%proc(thread_entry_containers(thread_id)%arguments)
152  end if
153  deallocate(thread_entry_containers(thread_id)%arguments)
154  call check_thread_status(forthread_mutex_lock(active_scalar_mutex))
155  active_threads=active_threads+1
156  call check_thread_status(forthread_mutex_unlock(active_scalar_mutex))
157  thread_busy(thread_id)=.false.
158  end do
Here is the call graph for this function:
Here is the caller graph for this function:

◆ threadpool_unlock_netcdf_access()

subroutine, public threadpool_mod::threadpool_unlock_netcdf_access ( )

Releases the NetCDF thread lock, NetCDF is not thread safe so we need to manage thread calls to it.

Definition at line 92 of file threadpool.F90.

92 #ifdef ENFORCE_THREAD_SAFETY
93  call check_thread_status(forthread_mutex_unlock(netcdfmutex))
94 #endif
Here is the call graph for this function:

Variable Documentation

◆ activate_thread_condition_variables

integer, dimension(:), allocatable, volatile threadpool_mod::activate_thread_condition_variables
private

Definition at line 37 of file threadpool.F90.

37  integer, volatile, dimension(:), allocatable :: activate_thread_condition_variables, activate_thread_mutex

◆ activate_thread_mutex

integer, dimension(:), allocatable, volatile threadpool_mod::activate_thread_mutex
private

Definition at line 37 of file threadpool.F90.

◆ active_scalar_mutex

integer, volatile threadpool_mod::active_scalar_mutex
private

Definition at line 43 of file threadpool.F90.

◆ active_threads

integer, volatile threadpool_mod::active_threads
private

Definition at line 43 of file threadpool.F90.

43  integer, volatile :: active_threads, total_number_of_threads, active_scalar_mutex

◆ default_thread_pool_size

integer, parameter threadpool_mod::default_thread_pool_size =10
private

Number of threads in the pool.

Definition at line 34 of file threadpool.F90.

34  integer, parameter :: default_thread_pool_size=10

◆ netcdfmutex

integer, volatile threadpool_mod::netcdfmutex
private

Mutex used for controling NetCDF access.

Definition at line 39 of file threadpool.F90.

39  integer, volatile :: netcdfmutex

◆ next_suggested_idle_thread

integer, volatile threadpool_mod::next_suggested_idle_thread
private

Definition at line 40 of file threadpool.F90.

40  integer, volatile :: next_suggested_idle_thread

◆ thread_busy

logical, dimension(:), allocatable, volatile threadpool_mod::thread_busy
private

Definition at line 35 of file threadpool.F90.

35  logical, volatile, dimension(:), allocatable :: thread_busy, thread_start

◆ thread_entry_containers

type(threaded_procedure_container_type), dimension(:), allocatable, volatile threadpool_mod::thread_entry_containers
private

Definition at line 38 of file threadpool.F90.

38  type(threaded_procedure_container_type), volatile, dimension(:), allocatable :: thread_entry_containers

◆ thread_ids

integer, dimension(:), allocatable, volatile threadpool_mod::thread_ids
private

Definition at line 36 of file threadpool.F90.

36  integer, volatile, dimension(:), allocatable :: thread_ids, thread_pass_data

◆ thread_pass_data

integer, dimension(:), allocatable, volatile threadpool_mod::thread_pass_data
private

Definition at line 36 of file threadpool.F90.

◆ thread_start

logical, dimension(:), allocatable, volatile threadpool_mod::thread_start
private

Definition at line 35 of file threadpool.F90.

◆ threadpool_active

logical, volatile threadpool_mod::threadpool_active
private

Definition at line 41 of file threadpool.F90.

41  logical, volatile :: threadpool_active

◆ total_number_of_threads

integer, volatile threadpool_mod::total_number_of_threads
private

Definition at line 43 of file threadpool.F90.