OpenShot Audio Library | OpenShotAudio  0.3.1
juce_ThreadPool.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2017 - ROLI Ltd.
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
26 struct ThreadPool::ThreadPoolThread : public Thread
27 {
28  ThreadPoolThread (ThreadPool& p, size_t stackSize)
29  : Thread ("Pool", stackSize), pool (p)
30  {
31  }
32 
33  void run() override
34  {
35  while (! threadShouldExit())
36  if (! pool.runNextJob (*this))
37  wait (500);
38  }
39 
40  std::atomic<ThreadPoolJob*> currentJob { nullptr };
41  ThreadPool& pool;
42 
43  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
44 };
45 
46 //==============================================================================
47 ThreadPoolJob::ThreadPoolJob (const String& name) : jobName (name)
48 {
49 }
50 
52 {
53  // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
54  // to remove it first!
55  jassert (pool == nullptr || ! pool->contains (this));
56 }
57 
59 {
60  return jobName;
61 }
62 
63 void ThreadPoolJob::setJobName (const String& newName)
64 {
65  jobName = newName;
66 }
67 
69 {
70  shouldStop = true;
71  listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
72 }
73 
75 {
76  listeners.add (listener);
77 }
78 
80 {
81  listeners.remove (listener);
82 }
83 
85 {
86  if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
87  return t->currentJob.load();
88 
89  return nullptr;
90 }
91 
92 //==============================================================================
93 ThreadPool::ThreadPool (int numThreads, size_t threadStackSize)
94 {
95  jassert (numThreads > 0); // not much point having a pool without any threads!
96 
97  createThreads (numThreads, threadStackSize);
98 }
99 
101 {
102  createThreads (SystemStats::getNumCpus());
103 }
104 
106 {
107  removeAllJobs (true, 5000);
108  stopThreads();
109 }
110 
111 void ThreadPool::createThreads (int numThreads, size_t threadStackSize)
112 {
113  for (int i = jmax (1, numThreads); --i >= 0;)
114  threads.add (new ThreadPoolThread (*this, threadStackSize));
115 
116  for (auto* t : threads)
117  t->startThread();
118 }
119 
120 void ThreadPool::stopThreads()
121 {
122  for (auto* t : threads)
123  t->signalThreadShouldExit();
124 
125  for (auto* t : threads)
126  t->stopThread (500);
127 }
128 
129 void ThreadPool::addJob (ThreadPoolJob* job, bool deleteJobWhenFinished)
130 {
131  jassert (job != nullptr);
132  jassert (job->pool == nullptr);
133 
134  if (job->pool == nullptr)
135  {
136  job->pool = this;
137  job->shouldStop = false;
138  job->isActive = false;
139  job->shouldBeDeleted = deleteJobWhenFinished;
140 
141  {
142  const ScopedLock sl (lock);
143  jobs.add (job);
144  }
145 
146  for (auto* t : threads)
147  t->notify();
148  }
149 }
150 
151 void ThreadPool::addJob (std::function<ThreadPoolJob::JobStatus()> jobToRun)
152 {
153  struct LambdaJobWrapper : public ThreadPoolJob
154  {
155  LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
156  JobStatus runJob() override { return job(); }
157 
158  std::function<ThreadPoolJob::JobStatus()> job;
159  };
160 
161  addJob (new LambdaJobWrapper (jobToRun), true);
162 }
163 
164 void ThreadPool::addJob (std::function<void()> jobToRun)
165 {
166  struct LambdaJobWrapper : public ThreadPoolJob
167  {
168  LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (j) {}
169  JobStatus runJob() override { job(); return ThreadPoolJob::jobHasFinished; }
170 
171  std::function<void()> job;
172  };
173 
174  addJob (new LambdaJobWrapper (jobToRun), true);
175 }
176 
177 int ThreadPool::getNumJobs() const noexcept
178 {
179  const ScopedLock sl (lock);
180  return jobs.size();
181 }
182 
183 int ThreadPool::getNumThreads() const noexcept
184 {
185  return threads.size();
186 }
187 
188 ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
189 {
190  const ScopedLock sl (lock);
191  return jobs [index];
192 }
193 
194 bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
195 {
196  const ScopedLock sl (lock);
197  return jobs.contains (const_cast<ThreadPoolJob*> (job));
198 }
199 
200 bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
201 {
202  const ScopedLock sl (lock);
203  return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
204 }
205 
206 void ThreadPool::moveJobToFront (const ThreadPoolJob* job) noexcept
207 {
208  const ScopedLock sl (lock);
209 
210  auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
211 
212  if (index > 0 && ! job->isActive)
213  jobs.move (index, 0);
214 }
215 
216 bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* job, int timeOutMs) const
217 {
218  if (job != nullptr)
219  {
220  auto start = Time::getMillisecondCounter();
221 
222  while (contains (job))
223  {
224  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
225  return false;
226 
227  jobFinishedSignal.wait (2);
228  }
229  }
230 
231  return true;
232 }
233 
234 bool ThreadPool::removeJob (ThreadPoolJob* job, bool interruptIfRunning, int timeOutMs)
235 {
236  bool dontWait = true;
237  OwnedArray<ThreadPoolJob> deletionList;
238 
239  if (job != nullptr)
240  {
241  const ScopedLock sl (lock);
242 
243  if (jobs.contains (job))
244  {
245  if (job->isActive)
246  {
247  if (interruptIfRunning)
248  job->signalJobShouldExit();
249 
250  dontWait = false;
251  }
252  else
253  {
254  jobs.removeFirstMatchingValue (job);
255  addToDeleteList (deletionList, job);
256  }
257  }
258  }
259 
260  return dontWait || waitForJobToFinish (job, timeOutMs);
261 }
262 
263 bool ThreadPool::removeAllJobs (bool interruptRunningJobs, int timeOutMs,
264  ThreadPool::JobSelector* selectedJobsToRemove)
265 {
266  Array<ThreadPoolJob*> jobsToWaitFor;
267 
268  {
269  OwnedArray<ThreadPoolJob> deletionList;
270 
271  {
272  const ScopedLock sl (lock);
273 
274  for (int i = jobs.size(); --i >= 0;)
275  {
276  auto* job = jobs.getUnchecked(i);
277 
278  if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
279  {
280  if (job->isActive)
281  {
282  jobsToWaitFor.add (job);
283 
284  if (interruptRunningJobs)
285  job->signalJobShouldExit();
286  }
287  else
288  {
289  jobs.remove (i);
290  addToDeleteList (deletionList, job);
291  }
292  }
293  }
294  }
295  }
296 
297  auto start = Time::getMillisecondCounter();
298 
299  for (;;)
300  {
301  for (int i = jobsToWaitFor.size(); --i >= 0;)
302  {
303  auto* job = jobsToWaitFor.getUnchecked (i);
304 
305  if (! isJobRunning (job))
306  jobsToWaitFor.remove (i);
307  }
308 
309  if (jobsToWaitFor.size() == 0)
310  break;
311 
312  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
313  return false;
314 
315  jobFinishedSignal.wait (20);
316  }
317 
318  return true;
319 }
320 
321 StringArray ThreadPool::getNamesOfAllJobs (bool onlyReturnActiveJobs) const
322 {
323  StringArray s;
324  const ScopedLock sl (lock);
325 
326  for (auto* job : jobs)
327  if (job->isActive || ! onlyReturnActiveJobs)
328  s.add (job->getJobName());
329 
330  return s;
331 }
332 
333 bool ThreadPool::setThreadPriorities (int newPriority)
334 {
335  bool ok = true;
336 
337  for (auto* t : threads)
338  if (! t->setPriority (newPriority))
339  ok = false;
340 
341  return ok;
342 }
343 
344 ThreadPoolJob* ThreadPool::pickNextJobToRun()
345 {
346  OwnedArray<ThreadPoolJob> deletionList;
347 
348  {
349  const ScopedLock sl (lock);
350 
351  for (int i = 0; i < jobs.size(); ++i)
352  {
353  if (auto* job = jobs[i])
354  {
355  if (! job->isActive)
356  {
357  if (job->shouldStop)
358  {
359  jobs.remove (i);
360  addToDeleteList (deletionList, job);
361  --i;
362  continue;
363  }
364 
365  job->isActive = true;
366  return job;
367  }
368  }
369  }
370  }
371 
372  return nullptr;
373 }
374 
375 bool ThreadPool::runNextJob (ThreadPoolThread& thread)
376 {
377  if (auto* job = pickNextJobToRun())
378  {
379  auto result = ThreadPoolJob::jobHasFinished;
380  thread.currentJob = job;
381 
382  try
383  {
384  result = job->runJob();
385  }
386  catch (...)
387  {
388  jassertfalse; // Your runJob() method mustn't throw any exceptions!
389  }
390 
391  thread.currentJob = nullptr;
392 
393  OwnedArray<ThreadPoolJob> deletionList;
394 
395  {
396  const ScopedLock sl (lock);
397 
398  if (jobs.contains (job))
399  {
400  job->isActive = false;
401 
402  if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
403  {
404  jobs.removeFirstMatchingValue (job);
405  addToDeleteList (deletionList, job);
406 
407  jobFinishedSignal.signal();
408  }
409  else
410  {
411  // move the job to the end of the queue if it wants another go
412  jobs.move (jobs.indexOf (job), -1);
413  }
414  }
415  }
416 
417  return true;
418  }
419 
420  return false;
421 }
422 
423 void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job) const
424 {
425  job->shouldStop = true;
426  job->pool = nullptr;
427 
428  if (job->shouldBeDeleted)
429  deletionList.add (job);
430 }
431 
432 } // namespace juce
int getNumJobs() const noexcept
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
void removeListener(Thread::Listener *)
int getNumThreads() const noexcept
Thread(const String &threadName, size_t threadStackSize=0)
Definition: juce_Thread.cpp:26
void add(const ElementType &newElement)
Definition: juce_Array.h:418
virtual bool isJobSuitable(ThreadPoolJob *job)=0
ElementType getUnchecked(int index) const
Definition: juce_Array.h:252
virtual JobStatus runJob()=0
static ThreadPoolJob * getCurrentThreadPoolJob()
bool isJobRunning(const ThreadPoolJob *job) const noexcept
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
bool threadShouldExit() const
ThreadPoolJob(const String &name)
void addListener(Thread::Listener *)
int size() const noexcept
Definition: juce_Array.h:215
virtual void exitSignalSent()=0
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
ObjectClass * add(ObjectClass *newObject)
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
bool setThreadPriorities(int newPriority)
String getJobName() const
static Thread *JUCE_CALLTYPE getCurrentThread()
void setJobName(const String &newName)
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
ThreadPoolJob * getJob(int index) const noexcept
void remove(int indexToRemove)
Definition: juce_Array.h:767
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
static int getNumCpus() noexcept
void add(String stringToAdd)
bool contains(const ThreadPoolJob *job) const noexcept
static uint32 getMillisecondCounter() noexcept
Definition: juce_Time.cpp:226