Code_TYMPAN  4.4.0
Industrial site acoustic simulation
threading.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) <2012-2014> <EDF-R&D> <FRANCE>
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License as published by
5  * the Free Software Foundation; either version 2 of the License, or
6  * (at your option) any later version.
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10  * See the GNU General Public License for more details.
11  * You should have received a copy of the GNU General Public License along
12  * with this program; if not, write to the Free Software Foundation, Inc.,
13  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
14  */
15 
16 #include <QThread>
17 
18 #include "threading.h"
19 
20 OSlaveThread::OSlaveThread(OThreadPool* pool) : _pool(pool)
21 {
22  _bToEnd = false;
23 }
24 
26 {
27  this->terminate();
28 }
29 
30 void OSlaveThread::run()
31 {
32  while (isRunning() && !_bToEnd)
33  {
34  // Wait for available task.
36  if (isRunning() && !_bToEnd && !_pool->_tasks.empty())
37  {
38  // Dequeue next task.
39  LPOTask task = _pool->_tasks.front();
40  _pool->_tasks.pop();
41 
43 
44  // Signal all that task is running.
46  task->_running = true;
47 
48  // Run task.
49  task->main();
50 
51  // Signal all that task is completed.
52  task->_running = false;
53  task->_completed = true;
54  task->wakeAll();
56 
57  // Increment pool counter
59  _pool->_counter++;
61  }
62  else
64  }
65 }
66 
67 OTask::OTask() : _running(false), _completed(false), _canceled(false) {}
68 
70 {
71  while (!isCompleted() && !isCanceled())
72  {
73  wait(this);
74  }
75 }
76 
77 bool OTask::isRunning() const
78 {
80  return _running;
81 }
82 
83 bool OTask::isCompleted() const
84 {
86  return _completed;
87 }
88 
89 bool OTask::isCanceled() const
90 {
92  return _canceled;
93 }
94 
95 void OTask::reset()
96 {
98  _running = _completed = _canceled = false;
99 }
100 
101 OThreadPool::OThreadPool(unsigned int slaves) : _totalCount(0), _counter(0)
102 {
103  // Allocate slave threads.
104  for (unsigned int i = 0; i < slaves; ++i)
105  {
106  OSlaveThread* thread = new OSlaveThread(this);
107  push_back(thread);
108  }
109 }
110 
112 {
114 
115  // Wait for queue to become empty.
116  while (!_tasks.empty())
117  {
118  // Wait for last task in queue to at least start running.
119  LPOTask task = _tasks.back();
120  task->_completed = true;
121  task->_canceled = true;
122  }
123 
124  // Now terminate all threads.
125  unsigned int i = 0;
126  for (i = 0; i < size(); ++i)
127  {
128  (*this)[i]->_bToEnd = true;
129  (*this)[i]->terminate();
130  }
131 
132  // Signal them to wake up.
134 
135  // Then delete them (the thread destructor will wait for thread completion).
136  for (i = 0; i < size(); ++i)
137  {
138  delete (*this)[i];
139  }
140 }
141 
142 void OThreadPool::push(OTask* task)
143 {
144  // Reset task flags.
145  task->reset();
146 
147  // Push task onto queue and signal availability.
149  _tasks.push(task);
151 }
152 
153 unsigned int OThreadPool::getTotalCount() const
154 {
156  return _totalCount;
157 }
158 
159 unsigned int OThreadPool::getCount() const
160 {
162  return _counter;
163 }
164 
165 void OThreadPool::begin(unsigned int count)
166 {
167  TY_LOCK_SHARED_MUTEX(this);
168  _totalCount = count;
169  _counter = 0;
171 }
172 
174 {
175 
176  for (unsigned int i = 0; i < size(); ++i)
177  {
178  (*this)[i]->_bToEnd = false;
179  (*this)[i]->start();
180  }
181 }
182 
183 bool OThreadPool::end()
184 {
185  unsigned int totalCount = getTotalCount();
186 
187  unsigned int last = 0;
188  while (last < totalCount)
189  {
190  unsigned int current = getCount();
191  last = current;
192  OSleeper::msleep(5);
193  }
194  stop();
195  return true;
196 }
197 
198 void OThreadPool::stop()
199 {
200  TY_LOCK_SHARED_MUTEX(this);
201 
202  // For each task
203  for (size_t i = 0; i < _tasks.size(); ++i)
204  {
205  // Dequeue next task
206  LPOTask task = _tasks.front();
207  _tasks.pop();
208 
209  // Cancel task
211  task->_canceled = true;
213 
214  // Increment counter
215  _counter++;
216  }
217 
218  for (unsigned int i = 0; i < size(); ++i)
219  {
220  (*this)[i]->_bToEnd = true;
221  }
222 
224 
225  // Waiting for thread termination
226  while (getCount() < getTotalCount())
227  {
228  OSleeper::msleep(1);
229  }
230 }
#define TY_LOCK_SHARED_MUTEX(name)
Definition: threading.h:81
#define TY_UNLOCK_SHARED_MUTEX(name)
Definition: threading.h:82
#define TY_OMUTEXLOCKER_SHARED_MUTEX(name)
Definition: threading.h:83
This class defines a thread for running tasks in a threads collection. Slave thread for the threads c...
Definition: threading.h:132
~OSlaveThread()
Destroy the slave thread; wait for the end of the thread.
Definition: threading.cpp:25
bool _bToEnd
Definition: threading.h:140
OSlaveThread(OThreadPool *pool)
Build a slave thread for a threads collection.
Definition: threading.cpp:20
void run()
Run a waiting task.
Definition: threading.cpp:30
OThreadPool * _pool
Pointer on the parent threads collection.
Definition: threading.h:144
static void msleep(unsigned long msecs)
Definition: threading.h:47
Task of a threads collection.
Definition: threading.h:168
bool isCanceled() const
Return true if the task has been cancelled, false otherwise.
Definition: threading.cpp:89
void reset()
Reset the task status (_running=false and _completed=false)
Definition: threading.cpp:95
bool _running
Running flag.
Definition: threading.h:209
bool isCompleted() const
Return true if the task is completed, false otherwise.
Definition: threading.cpp:83
bool isRunning() const
Return true if the task is running, false otherwise.
Definition: threading.cpp:77
bool _completed
Completed flag.
Definition: threading.h:212
virtual ~OTask()
Destructor : waits for the end of the task to destroy it.
Definition: threading.cpp:69
friend class OSlaveThread
Definition: threading.h:217
OTask()
Default constructor.
Definition: threading.cpp:67
bool _canceled
Cancel flag.
Definition: threading.h:215
Slave threads collection.
Definition: threading.h:225
void begin(unsigned int count)
Begin solver.
Definition: threading.cpp:165
void startPool()
Definition: threading.cpp:173
virtual ~OThreadPool()
Destructor.
Definition: threading.cpp:111
bool end()
End solver.
Definition: threading.cpp:183
unsigned int _counter
Total number of ended tasks.
Definition: threading.h:276
std::queue< LPOTask > _tasks
Tasks queue.
Definition: threading.h:270
unsigned int getCount() const
Return the counter.
Definition: threading.cpp:159
void stop()
Cancel the pending tasks.
Definition: threading.cpp:198
virtual void push(OTask *task)
Add a task to the queue.
Definition: threading.cpp:142