KASKADE 7 development version
threading.hh
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the library KASKADE 7 */
4/* https://www.zib.de/research/projects/kaskade7-finite-element-toolbox */
5/* */
6/* Copyright (C) 2012-2019 Zuse Institute Berlin */
7/* */
8/* KASKADE 7 is distributed under the terms of the ZIB Academic License. */
9/* see $KASKADE/academic.txt */
10/* */
11/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12
13#ifndef THREADING_HH
14#define THREADING_HH
15
16#include <fstream>
17#include <functional>
18#include <future>
19#include <map>
20#include <mutex>
21#include <queue>
22#include <utility>
23#include <vector>
24
25#include <boost/thread/condition_variable.hpp>
26#include <boost/thread/locks.hpp>
27#include <boost/thread/mutex.hpp>
28#include <boost/thread/thread.hpp>
29
30#include "utilities/kalloc.hh"
31#include "utilities/timing.hh"
32
33namespace Kaskade
34{
39 extern std::mutex DuneQuadratureRulesMutex;
40
45 extern boost::mutex refElementMutex;
46
47 //---------------------------------------------------------------------------
48
60 void equalWeightRanges(std::vector<size_t>& x, size_t n);
61
74 template <class BlockIndex, class Index>
75 Index uniformWeightRangeStart(BlockIndex i, BlockIndex n, Index m)
76 {
77 assert(i>=0 && i<=n && n>0);
78 return (i*m)/n;
79 }
80
90 template <class Index>
91 Index uniformWeightRange(Index j, Index n, Index m)
92 {
93 assert(j>=0 && j<m && n>0 && m>0);
94
95 // We're looking for i such that floor(i*m/n) <= j < floor((i+1)*m/n), i.e. index j has
96 // to be in the half-open range given by the partitioning points of the range. Now this
97 // implies i*m/n-1 <= j < (i+1)*m/n and also j*n/m-1 < i <= (j+1)*n/m. As i is integer,
98 // this implies floor(j*n/m) <= i <= floor((j+1)*n/m). Typically, n/m is small, in which
99 // case this closed interval is likely to contain just one natural number - the result.
100 Index low = (j*n)/m; // floor is implied by integer arithmetics
101 Index high = ((j+1)*n)/m; // floor is implied by integer arithmetics
102
103 if (low==high)
104 {
105 assert(uniformWeightRangeStart(low,n,m)<=j && j<uniformWeightRangeStart(low+1,n,m));
106 return low;
107 }
108
109 // The implied inequality chains above are not sharp estimates, therefore the interval
110 // [low,high] can contain several natural numbers. This is always the case if n>m, i.e., we have
111 // more ranges than entries and several ranges are empty. But it can also happen if j*n/m is
112 // just below the next integral number. Now we have to walk through all natural numbers in
113 // the interval to find the correct range. TODO: is there a direct way?
114 Index i = low;
115 while (i<high && !(uniformWeightRangeStart(i,n,m)<=j && j<uniformWeightRangeStart(i+1,n,m)))
116 ++i;
117
118 assert(uniformWeightRangeStart(i,n,m)<=j && j<uniformWeightRangeStart(i+1,n,m));
119 return i;
120 }
121
122 //---------------------------------------------------------------------------
123
130 template <class T>
132 public:
137 : runningWorkers(0)
138 {
139 }
140
147 {
148 boost::lock_guard<boost::mutex> lock(q.mutex);
149 queue = std::move(q.queue);
150 }
151
158 {
159 boost::lock_guard<boost::mutex> lock(q.mutex);
160 queue = q.queue;
161 return *this;
162 }
163
164 bool empty() const
165 {
166 // we need a lock here because someone may add / remove tasks right now
167 boost::lock_guard<boost::mutex> lock(mutex);
168 return queue.empty();
169 }
170
174 size_t size() const
175 {
176 // we need a lock here because someone may add / remove tasks right now
177 boost::lock_guard<boost::mutex> lock(mutex);
178 return queue.size();
179 }
180
184 void push_back(T&& t)
185 {
186 {
187 boost::lock_guard<boost::mutex> lock(mutex);
188 queue.push(std::move(t));
189 } // release lock before waking up consumers
190 filled.notify_one();
191 }
192
199 {
200 boost::unique_lock<boost::mutex> lock(mutex);
201
202 // Wait for data to become available.
203 while (queue.empty())
204 filled.wait(lock);
205
206 // extract and remove data. When move assignment throws an exception, the queue
207 // remains unmodified
208 T t = std::move(queue.front());
209 queue.pop();
210 return t;
211 }
212
219 int running(int n)
220 {
221 boost::lock_guard<boost::mutex> lock(mutex);
222 runningWorkers += n;
223 return runningWorkers;
224 }
225
230 int running() const
231 {
232 return runningWorkers;
233 }
234
235 private:
236 std::queue<T> queue;
237 mutable boost::mutex mutex; // has to be mutable because of empty/size
238 boost::condition_variable filled;
239 int runningWorkers; // number of worker threads running on this queue
240 };
241
242 //----------------------------------------------------------------------------
243
248 typedef std::packaged_task<void()> Task;
249
254 typedef std::future<void> Ticket;
255
256
257
258 //----------------------------------------------------------------------------
259
293 {
294 public:
307
316 int nodes() const
317 {
318 return nNode;
319 }
320
327 int cpus() const
328 {
329 return nCpu;
330 }
331
336 {
337 return globalQueue.running();
338 }
339
343 int cpus(int node) const
344 {
345 return cpuByNode[node].size();
346 }
347
351 int maxCpusOnNode() const
352 {
353 return maxCpusPerNode;
354 }
355
362 bool isSequential() const
363 {
364 return sequential;
365 }
366
386 Ticket run(Task&& task);
387
399 Ticket runOnNode(int node, Task&& task);
400
413 Kalloc& allocator(int node);
414
421 void* allocate(size_t n, int node);
422
426 void deallocate(void* p, size_t n, int node);
427
431 size_t alignment(int node) const;
432
441 void reserve(size_t n, size_t k, int node);
442
447 private:
448
449 // private constructor to be called by instance()
450 NumaThreadPool(int maxThreads);
452
453 Ticket runOnQueue(ConcurrentQueue<Task>& queue, Task&& task);
454
455
456 int nCpu, nNode, maxCpusPerNode;
457 std::vector<ConcurrentQueue<Task>> nodeQueue; // task queues for passing to worker threads on nodes
458 ConcurrentQueue<Task> globalQueue;
459 boost::thread_group threads; // worker threads
460 std::vector<int> nodeByCpu; // NUMA topology info
461 std::vector<std::vector<int>> cpuByNode; // NUMA topology info
462 std::map<void*,std::pair<size_t,int>> memBlocks; // NUMA memory allocations
463 std::vector<Kalloc> nodeMemory; // NUMA memory management
464 bool sequential; // if true, execute all tasks immediately
465 };
466
467 //-----------------------------------------------------------------------------------------------------
468
488 template <class Func>
489 void parallelFor(Func const& f, int maxTasks = std::numeric_limits<int>::max())
490 {
492
493 // If tasks are to be executed sequentially, we shortcut the task pool. Less for avoiding overhead,
494 // but more for avoiding task packaging, which looses call stack info if exceptions are thrown.
495 if (pool.isSequential())
496 {
497 f(0,1);
498 return;
499 }
500
501 int nTasks = std::min(4*pool.cpus(),maxTasks);
502
503 std::vector<Ticket> tickets(nTasks);
504 for (int i=0; i<nTasks; ++i)
505 tickets[i] = pool.run(Task([i,&f,nTasks] { f(i,nTasks); }));
506
507 for (auto& ticket: tickets) // wait for the tasks to be completed
508 ticket.get(); // we use get() instead of wait() to rethrow any exceptions
509 }
510
530 template <class Func>
531 void parallelFor(size_t first, size_t last, Func const& f, size_t nTasks=std::numeric_limits<size_t>::max())
532 {
533 assert(last>=first);
535 nTasks = std::min(last-first,std::min(nTasks,size_t(pool.cpus())));
536
537 std::vector<Ticket> tickets(nTasks);
538 std::mutex mutex;
539
540 TaskTiming tt(nTasks+2);
541
542 tt.start(nTasks);
543 for (size_t i=0; i<nTasks; ++i)
544 {
545 tickets[i] = pool.run(Task([&f,&first,last,&mutex,i,&tt]
546 {
547 tt.start(i);
548 while (true)
549 {
550 std::unique_lock<std::mutex> lock(mutex);
551 size_t myPos = first;
552 ++first;
553 lock.unlock();
554
555 if (myPos>=last)
556 {
557 tt.stop(i);
558 return;
559 }
560
561 f(myPos);
562 }
563 }));
564
565 // Waking up threads appears to be quite time-consuming. If we have many threads in the pool and not so much work,
566 // the last threads are woken up when all the work has already been done. Then tasks are created without benefit.
567 // In comparison, locking on a mutex appears to be blazingly fast, so we check whether there is still work to be
568 // done, and if not, leave the loop. This should be particularly beneficial in hyperthreaded systems.
569 std::lock_guard<std::mutex> lock(mutex);
570 if (first>=last)
571 {
572 tickets.resize(i+1);
573 break;
574 }
575 }
576 tt.stop(nTasks);
577
578 tt.start(nTasks+1);
579 for (auto& ticket: tickets) // wait for the tasks to be completed
580 ticket.get(); // we use get() instead of wait() to rethrow any exceptions
581 tt.stop(nTasks+1);
582
583 std::ofstream out("timing.gnu");
584 out << tt;
585 }
586
587
603 template <class Func>
604 void parallelForNodes(Func const& f, int maxTasks = std::numeric_limits<int>::max())
605 {
607 int nTasks = std::min(pool.nodes(),maxTasks);
608 std::vector<Ticket> tickets(nTasks);
609 for (int i=0; i<nTasks; ++i)
610 tickets[i] = pool.runOnNode(i,Task([i,&f,&nTasks] { f(i,nTasks); }));
611 for (auto& ticket: tickets) // wait for the tasks to be completed
612 ticket.get(); // we use get() instead of wait() to rethrow any exceptions
613 }
614
615 //----------------------------------------------------------------------------
616
620 namespace ThreadingDetail
621 {
622 class NumaAllocatorBase
623 {
624 public:
625 NumaAllocatorBase(int node);
626
627 size_t max_size() const;
628
629 void* allocate(size_t n);
630
631 void deallocate(void* p, size_t n);
632
633 int node() const
634 {
635 return nod;
636 }
637
638 private:
639 int nod;
640 Kalloc* allocator;
641 };
642 }
651 template <class T>
653 {
654 public:
655 typedef T value_type;
656 typedef T* pointer;
657 typedef T const* const_pointer;
658 typedef T& reference;
659 typedef T const& const_reference;
660 typedef std::size_t size_type;
661 typedef std::ptrdiff_t difference_type;
662
663 // make sure that on copying, the target copy has its data in the same
664 // NUMA memory region as the source by default -- this improves data locality.
667 typedef std::true_type propagate_on_container_swap;
668
669 template <class U>
670 struct rebind
671 {
673 };
674
682 {
683 }
684
688 int node() const
689 {
690 return alloc.node();
691 }
692
694 {
695 return &x;
696 }
697
699 {
700 return &x;
701 }
702
704 {
705 return alloc.max_size() / sizeof(T);
706 }
707
716 pointer allocate(size_type n, std::allocator<void>::const_pointer /* hint */ = 0)
717 {
718 if (n>0)
719 return static_cast<pointer>(alloc.allocate(n*sizeof(T)));
720 else
721 return nullptr;
722 }
723
725 {
726 if (p)
727 alloc.deallocate(static_cast<void*>(p),n*sizeof(T));
728 }
729
730 template< class U, class... Args >
731 void construct(U* p, Args&&... args)
732 {
733 ::new((void*)p) U(std::forward<Args>(args)...);
734 }
735
736 template <class U>
737 void destroy(U* p)
738 {
739 p->~U();
740 }
741
747 template <class U>
748 bool operator==(NumaAllocator<U> const& other) const
749 {
750 return node()==other.node();
751 }
752
753 template <class U>
754 bool operator!=(NumaAllocator<U> const& other) const
755 {
756 return !(node() == other.node());
757 }
758
759 private:
760 ThreadingDetail::NumaAllocatorBase alloc;
761 };
762
763 //----------------------------------------------------------------------------
764
771 class Mutex
772 {
773 public:
779 Mutex() = default;
780
786 Mutex(Mutex const& m) {}
787
793 Mutex& operator=(Mutex const& m) { return *this; }
794
798 boost::mutex& get() { return mutex; }
799
800 private:
801 boost::mutex mutex;
802 };
803
804 //-------------------------------------------------------------------------------------
805
817 void runInBackground(std::function<void()>& f);
818}
819
820#endif
A concurrent fifo queue.
Definition: threading.hh:131
size_t size() const
Returns the number of tasks waiting.
Definition: threading.hh:174
ConcurrentQueue()
Constructs an empty queue.
Definition: threading.hh:136
int running(int n)
Change the number of running worker threads.
Definition: threading.hh:219
void push_back(T &&t)
Stores an element at the end of the queue.
Definition: threading.hh:184
ConcurrentQueue & operator=(ConcurrentQueue const &q)
Assignment.
Definition: threading.hh:157
T pop_front()
Retrieves the foremost element.
Definition: threading.hh:198
ConcurrentQueue(ConcurrentQueue &&q)
Moves a queue.
Definition: threading.hh:146
int running() const
Get the number of running worker threads.
Definition: threading.hh:230
A simple memory manager for NUMA systems.
Definition: kalloc.hh:40
A utility class implementing appropriate copy semantics for boost mutexes.
Definition: threading.hh:772
boost::mutex & get()
provides access to the mutex to perform the locking.
Definition: threading.hh:798
Mutex & operator=(Mutex const &m)
Assignment.
Definition: threading.hh:793
Mutex(Mutex const &m)
Copy constructor.
Definition: threading.hh:786
Mutex()=default
Default constructor.
An STL allocator that uses memory of a specific NUMA node only.
Definition: threading.hh:653
void construct(U *p, Args &&... args)
Definition: threading.hh:731
void deallocate(pointer p, size_type n)
Definition: threading.hh:724
NumaAllocator(int node)
Construct an allocator for allocating on the given NUMA node.
Definition: threading.hh:681
const_pointer address(const_reference x) const
Definition: threading.hh:698
pointer address(reference x) const
Definition: threading.hh:693
T const & const_reference
Definition: threading.hh:659
std::true_type propagate_on_container_copy_assignment
Definition: threading.hh:665
bool operator==(NumaAllocator< U > const &other) const
comparison for equality
Definition: threading.hh:748
bool operator!=(NumaAllocator< U > const &other) const
Definition: threading.hh:754
int node() const
Reports the node on which we allocate.
Definition: threading.hh:688
std::true_type propagate_on_container_swap
Definition: threading.hh:667
std::size_t size_type
Definition: threading.hh:660
std::true_type propagate_on_container_move_assignment
Definition: threading.hh:666
size_type max_size() const
Definition: threading.hh:703
std::ptrdiff_t difference_type
Definition: threading.hh:661
pointer allocate(size_type n, std::allocator< void >::const_pointer=0)
Allocates the requested amount of memory.
Definition: threading.hh:716
Implementation of thread pools suitable for parallelization of (more or less) memory-bound algorithms...
Definition: threading.hh:293
int runningOnGlobalQueue() const
Reports how many worker threads are running to work on the global task queue.
Definition: threading.hh:335
static NumaThreadPool & instance(int maxThreads=std::numeric_limits< int >::max())
Returns a globally unique thread pool instance.
int nodes() const
Reports the number of NUMA nodes (i.e., memory interfaces/CPU sockets)
Definition: threading.hh:316
size_t alignment(int node) const
Reports the alignment size of allocator at given NUMA node.
void * allocate(size_t n, int node)
Allocates memory on a specific node.
int maxCpusOnNode() const
Reports the maximal number of CPUs on one node.
Definition: threading.hh:351
Ticket runOnNode(int node, Task &&task)
Schedules a task to be executed on a CPU belonging to the given NUMA node.
int cpus() const
Reports the total number of CPUs (usually a multiple of nodes).
Definition: threading.hh:327
void deallocate(void *p, size_t n, int node)
frees a chunk of memory previously allocated
void reserve(size_t n, size_t k, int node)
Tells the allocator to prepare for subsequent allocation of several memory blocks of same size.
bool isSequential() const
Returns true if tasks are executed sequentially. sequential execution can be enforced by calling Numa...
Definition: threading.hh:362
int cpus(int node) const
Reports the number of CPUs on the given node (usually the same for all nodes).
Definition: threading.hh:343
Ticket run(Task &&task)
Schedules a task to be executed on an arbitrary CPU.
Kalloc & allocator(int node)
Returns the allocator used for the given node.
A class that gathers data on task timing and provides gnuplot visualization.
Definition: timing.hh:291
void start(int task)
defines the start of given task.
void stop(int task)
defines the start of given task.
Dune::FieldVector< T, n > max(Dune::FieldVector< T, n > x, Dune::FieldVector< T, n > const &y)
Componentwise maximum.
Definition: fixdune.hh:110
Dune::FieldVector< T, n > min(Dune::FieldVector< T, n > x, Dune::FieldVector< T, n > const &y)
Componentwise minimum.
Definition: fixdune.hh:122
Index uniformWeightRangeStart(BlockIndex i, BlockIndex n, Index m)
Computes partitioning points of ranges for uniform weight distributions.
Definition: threading.hh:75
std::packaged_task< void()> Task
Abstract interface for tasks to be scheduled for concurrent execution.
Definition: threading.hh:248
void runInBackground(std::function< void()> &f)
Executes a function in a child process.
std::future< void > Ticket
Abstract waitable job ticket for submitted tasks.
Definition: threading.hh:254
void parallelForNodes(Func const &f, int maxTasks=std::numeric_limits< int >::max())
A parallel for loop that executes the given functor in parallel on different NUMA nodes.
Definition: threading.hh:604
Index uniformWeightRange(Index j, Index n, Index m)
Computes the range in which an index is to be found when partitioned for uniform weights.
Definition: threading.hh:91
void equalWeightRanges(std::vector< size_t > &x, size_t n)
Computes partitioning points such that the sum of weights in each partition is roughly the same.
boost::mutex refElementMutex
A global lock for the Dune::GenericReferenceElement singletons, which are not thread-safe.
void parallelFor(Func const &f, int maxTasks=std::numeric_limits< int >::max())
A parallel for loop that executes the given functor in parallel on different CPUs.
Definition: threading.hh:489
std::mutex DuneQuadratureRulesMutex
A global lock for the Dune::QuadratureRules factory, which is not thread-safe as of 2015-01-01.
NumaAllocator< U > other
Definition: threading.hh:672