25#include <boost/thread/condition_variable.hpp>
26#include <boost/thread/locks.hpp>
27#include <boost/thread/mutex.hpp>
28#include <boost/thread/thread.hpp>
74 template <
class BlockIndex,
class Index>
77 assert(i>=0 && i<=n && n>0);
90 template <
class Index>
93 assert(j>=0 && j<m && n>0 && m>0);
101 Index high = ((j+1)*n)/m;
148 boost::lock_guard<boost::mutex> lock(q.mutex);
149 queue = std::move(q.queue);
159 boost::lock_guard<boost::mutex> lock(q.mutex);
167 boost::lock_guard<boost::mutex> lock(mutex);
168 return queue.empty();
177 boost::lock_guard<boost::mutex> lock(mutex);
187 boost::lock_guard<boost::mutex> lock(mutex);
188 queue.push(std::move(t));
200 boost::unique_lock<boost::mutex> lock(mutex);
203 while (queue.empty())
208 T t = std::move(queue.front());
221 boost::lock_guard<boost::mutex> lock(mutex);
223 return runningWorkers;
232 return runningWorkers;
237 mutable boost::mutex mutex;
238 boost::condition_variable filled;
248 typedef std::packaged_task<void()>
Task;
345 return cpuByNode[node].size();
353 return maxCpusPerNode;
456 int nCpu, nNode, maxCpusPerNode;
457 std::vector<ConcurrentQueue<Task>> nodeQueue;
459 boost::thread_group threads;
460 std::vector<int> nodeByCpu;
461 std::vector<std::vector<int>> cpuByNode;
462 std::map<void*,std::pair<size_t,int>> memBlocks;
463 std::vector<Kalloc> nodeMemory;
488 template <
class Func>
503 std::vector<Ticket> tickets(nTasks);
504 for (
int i=0; i<nTasks; ++i)
505 tickets[i] = pool.
run(
Task([i,&f,nTasks] { f(i,nTasks); }));
507 for (
auto& ticket: tickets)
530 template <
class Func>
537 std::vector<Ticket> tickets(nTasks);
543 for (
size_t i=0; i<nTasks; ++i)
545 tickets[i] = pool.
run(
Task([&f,&first,last,&mutex,i,&tt]
550 std::unique_lock<std::mutex> lock(mutex);
551 size_t myPos = first;
569 std::lock_guard<std::mutex> lock(mutex);
579 for (
auto& ticket: tickets)
583 std::ofstream out(
"timing.gnu");
603 template <
class Func>
608 std::vector<Ticket> tickets(nTasks);
609 for (
int i=0; i<nTasks; ++i)
610 tickets[i] = pool.
runOnNode(i,
Task([i,&f,&nTasks] { f(i,nTasks); }));
611 for (
auto& ticket: tickets)
620 namespace ThreadingDetail
622 class NumaAllocatorBase
625 NumaAllocatorBase(
int node);
627 size_t max_size()
const;
629 void* allocate(
size_t n);
631 void deallocate(
void* p,
size_t n);
705 return alloc.max_size() /
sizeof(T);
719 return static_cast<pointer>(alloc.allocate(n*
sizeof(T)));
727 alloc.deallocate(
static_cast<void*
>(p),n*
sizeof(T));
730 template<
class U,
class... Args >
733 ::new((
void*)p) U(std::forward<Args>(args)...);
760 ThreadingDetail::NumaAllocatorBase alloc;
798 boost::mutex&
get() {
return mutex; }
size_t size() const
Returns the number of tasks waiting.
ConcurrentQueue()
Constructs an empty queue.
int running(int n)
Change the number of running worker threads.
void push_back(T &&t)
Stores an element at the end of the queue.
ConcurrentQueue & operator=(ConcurrentQueue const &q)
Assignment.
T pop_front()
Retrieves the foremost element.
ConcurrentQueue(ConcurrentQueue &&q)
Moves a queue.
int running() const
Get the number of running worker threads.
A simple memory manager for NUMA systems.
A utility class implementing appropriate copy semantics for boost mutexes.
boost::mutex & get()
provides access to the mutex to perform the locking.
Mutex & operator=(Mutex const &m)
Assignment.
Mutex(Mutex const &m)
Copy constructor.
Mutex()=default
Default constructor.
An STL allocator that uses memory of a specific NUMA node only.
void construct(U *p, Args &&... args)
void deallocate(pointer p, size_type n)
NumaAllocator(int node)
Construct an allocator for allocating on the given NUMA node.
const_pointer address(const_reference x) const
pointer address(reference x) const
T const & const_reference
std::true_type propagate_on_container_copy_assignment
bool operator==(NumaAllocator< U > const &other) const
comparison for equality
bool operator!=(NumaAllocator< U > const &other) const
int node() const
Reports the node on which we allocate.
std::true_type propagate_on_container_swap
std::true_type propagate_on_container_move_assignment
size_type max_size() const
std::ptrdiff_t difference_type
pointer allocate(size_type n, std::allocator< void >::const_pointer=0)
Allocates the requested amount of memory.
Implementation of thread pools suitable for parallelization of (more or less) memory-bound algorithms...
int runningOnGlobalQueue() const
Reports how many worker threads are running to work on the global task queue.
static NumaThreadPool & instance(int maxThreads=std::numeric_limits< int >::max())
Returns a globally unique thread pool instance.
int nodes() const
Reports the number of NUMA nodes (i.e., memory interfaces/CPU sockets)
size_t alignment(int node) const
Reports the alignment size of allocator at given NUMA node.
void * allocate(size_t n, int node)
Allocates memory on a specific node.
int maxCpusOnNode() const
Reports the maximal number of CPUs on one node.
Ticket runOnNode(int node, Task &&task)
Schedules a task to be executed on a CPU belonging to the given NUMA node.
int cpus() const
Reports the total number of CPUs (usually a multiple of nodes).
void deallocate(void *p, size_t n, int node)
frees a chunk of memory previously allocated
void reserve(size_t n, size_t k, int node)
Tells the allocator to prepare for subsequent allocation of several memory blocks of same size.
bool isSequential() const
Returns true if tasks are executed sequentially. sequential execution can be enforced by calling Numa...
int cpus(int node) const
Reports the number of CPUs on the given node (usually the same for all nodes).
Ticket run(Task &&task)
Schedules a task to be executed on an arbitrary CPU.
Kalloc & allocator(int node)
Returns the allocator used for the given node.
A class that gathers data on task timing and provides gnuplot visualization.
void start(int task)
defines the start of given task.
void stop(int task)
defines the start of given task.
Dune::FieldVector< T, n > max(Dune::FieldVector< T, n > x, Dune::FieldVector< T, n > const &y)
Componentwise maximum.
Dune::FieldVector< T, n > min(Dune::FieldVector< T, n > x, Dune::FieldVector< T, n > const &y)
Componentwise minimum.
Index uniformWeightRangeStart(BlockIndex i, BlockIndex n, Index m)
Computes partitioning points of ranges for uniform weight distributions.
std::packaged_task< void()> Task
Abstract interface for tasks to be scheduled for concurrent execution.
void runInBackground(std::function< void()> &f)
Executes a function in a child process.
std::future< void > Ticket
Abstract waitable job ticket for submitted tasks.
void parallelForNodes(Func const &f, int maxTasks=std::numeric_limits< int >::max())
A parallel for loop that executes the given functor in parallel on different NUMA nodes.
Index uniformWeightRange(Index j, Index n, Index m)
Computes the range in which an index is to be found when partitioned for uniform weights.
void equalWeightRanges(std::vector< size_t > &x, size_t n)
Computes partitioning points such that the sum of weights in each partition is roughly the same.
boost::mutex refElementMutex
A global lock for the Dune::GenericReferenceElement singletons, which are not thread-safe.
void parallelFor(Func const &f, int maxTasks=std::numeric_limits< int >::max())
A parallel for loop that executes the given functor in parallel on different CPUs.
std::mutex DuneQuadratureRulesMutex
A global lock for the Dune::QuadratureRules factory, which is not thread-safe as of 2015-01-01.