SourceXtractorPlusPlus
1.0.3
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
SEImplementation
src
lib
Measurement
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17
/*
18
* MultiThreadedMeasurement.cpp
19
*
20
* Created on: May 23, 2018
21
* Author: mschefer
22
*/
23
24
#include <chrono>
25
#include <ElementsKernel/Logging.h>
26
#include <csignal>
27
28
#include "
SEImplementation/Plugin/SourceIDs/SourceID.h
"
29
#include "
SEImplementation/Measurement/MultithreadedMeasurement.h
"
30
31
using namespace
SourceXtractor
;
32
33
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Multithreading"
);
34
35
36
MultithreadedMeasurement::~MultithreadedMeasurement
() {
37
if
(
m_output_thread
->joinable()) {
38
m_output_thread
->join();
39
}
40
}
41
42
void
MultithreadedMeasurement::startThreads
() {
43
m_output_thread
=
Euclid::make_unique<std::thread>
(
outputThreadStatic
,
this
);
44
}
45
46
void
MultithreadedMeasurement::stopThreads
() {
47
m_input_done
=
true
;
48
m_thread_pool
->block();
49
m_output_thread
->join();
50
logger
.debug() <<
"All worker threads done!"
;
51
}
52
53
void
MultithreadedMeasurement::synchronizeThreads
() {
54
// Wait until all worker threads are done
55
m_thread_pool
->block();
56
57
// Wait until the output queue is empty
58
while
(
true
) {
59
{
60
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
61
if
(
m_output_queue
.empty()) {
62
break
;
63
}
64
else
if
(
m_thread_pool
->checkForException(
false
)) {
65
logger
.fatal() <<
"An exception was thrown from a worker thread"
;
66
m_thread_pool
->checkForException(
true
);
67
}
68
else
if
(
m_thread_pool
->activeThreads() == 0) {
69
throw
Elements::Exception
() <<
"No active threads and the queue is not empty! Please, report this as a bug"
;
70
}
71
}
72
std::this_thread::sleep_for
(
std::chrono::milliseconds
(100));
73
}
74
}
75
76
void
MultithreadedMeasurement::receiveSource
(
std::unique_ptr<SourceGroupInterface>
source_group) {
77
// Force computation of SourceID here, where the order is still deterministic
78
for
(
auto
& source : *source_group) {
79
source.getProperty<
SourceID
>();
80
}
81
82
// Put the new SourceGroup into the input queue
83
auto
order_number =
m_group_counter
;
84
auto
lambda = [
this
, order_number, source_group =
std::move
(source_group)]()
mutable
{
85
// Trigger measurements
86
for
(
auto
& source : *source_group) {
87
m_source_to_row
(source);
88
}
89
// Pass to the output thread
90
{
91
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
92
m_output_queue
.emplace_back(order_number,
std::move
(source_group));
93
}
94
m_new_output
.notify_one();
95
};
96
auto
lambda_copyable = [lambda =
std::make_shared<decltype(lambda)>
(
std::move
(lambda))](){
97
(*lambda)();
98
};
99
m_thread_pool
->submit(lambda_copyable);
100
++
m_group_counter
;
101
}
102
103
void
MultithreadedMeasurement::outputThreadStatic
(
MultithreadedMeasurement
*measurement) {
104
logger
.debug() <<
"Starting output thread"
;
105
try
{
106
measurement->
outputThreadLoop
();
107
}
108
catch
(
const
Elements::Exception
& e) {
109
logger
.fatal() <<
"Output thread got an exception!"
;
110
logger
.fatal() << e.what();
111
if
(!measurement->
m_abort_raised
.exchange(
true
)) {
112
logger
.fatal() <<
"Aborting the execution"
;
113
::raise
(SIGTERM);
114
}
115
}
116
logger
.debug() <<
"Stopping output thread"
;
117
}
118
119
void
MultithreadedMeasurement::outputThreadLoop
() {
120
while
(
m_thread_pool
->activeThreads() > 0) {
121
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
122
123
// Wait for something in the output queue
124
if
(
m_output_queue
.empty()) {
125
m_new_output
.wait_for(output_lock,
std::chrono::milliseconds
(100));
126
}
127
128
// Process the output queue
129
while
(!
m_output_queue
.empty()) {
130
sendSource
(
std::move
(
m_output_queue
.front().second));
131
m_output_queue
.pop_front();
132
}
133
134
if
(
m_input_done
&&
m_thread_pool
->running() +
m_thread_pool
->queued() == 0 &&
135
m_output_queue
.empty()) {
136
break
;
137
}
138
}
139
}
140
141
void
MultithreadedMeasurement::receiveProcessSignal
(
const
ProcessSourcesEvent
& event) {
142
sendProcessSignal
(event);
143
}
logger
static Elements::Logging logger
MultithreadedMeasurement.h
SourceID.h
Elements::Exception
Elements::Logging
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Definition
MultithreadedMeasurement.h:64
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition
MultithreadedMeasurement.h:69
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::~MultithreadedMeasurement
~MultithreadedMeasurement() override
Definition
MultithreadedMeasurement.cpp:36
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition
MultithreadedMeasurement.h:67
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition
MultithreadedMeasurement.h:71
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition
MultithreadedMeasurement.cpp:119
SourceXtractor::MultithreadedMeasurement::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Definition
MultithreadedMeasurement.h:63
SourceXtractor::MultithreadedMeasurement::synchronizeThreads
void synchronizeThreads() override
Definition
MultithreadedMeasurement.cpp:53
SourceXtractor::MultithreadedMeasurement::receiveSource
void receiveSource(std::unique_ptr< SourceGroupInterface > source_group) override
Definition
MultithreadedMeasurement.cpp:76
SourceXtractor::MultithreadedMeasurement::MultithreadedMeasurement
MultithreadedMeasurement(SourceToRowConverter source_to_row, const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition
MultithreadedMeasurement.h:42
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::unique_ptr< SourceGroupInterface > > > m_output_queue
Definition
MultithreadedMeasurement.h:70
SourceXtractor::MultithreadedMeasurement::stopThreads
void stopThreads() override
Definition
MultithreadedMeasurement.cpp:46
SourceXtractor::MultithreadedMeasurement::receiveProcessSignal
void receiveProcessSignal(const ProcessSourcesEvent &event) override
Definition
MultithreadedMeasurement.cpp:141
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition
MultithreadedMeasurement.h:62
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement)
Definition
MultithreadedMeasurement.cpp:103
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition
MultithreadedMeasurement.cpp:42
SourceXtractor::PipelineEmitter< SourceGroupInterface >::sendProcessSignal
void sendProcessSignal(const ProcessSourcesEvent &event) const
Definition
PipelineStage.h:92
SourceXtractor::PipelineEmitter< SourceGroupInterface >::sendSource
void sendSource(std::unique_ptr< SourceGroupInterface > source) const
Definition
PipelineStage.h:85
SourceXtractor::SourceID
Definition
SourceID.h:33
std::chrono::milliseconds
std::make_shared
T make_shared(T... args)
std::move
T move(T... args)
Euclid::Configuration::logger
static Elements::Logging logger
Euclid::make_unique
std::unique_ptr< T > make_unique(Args &&... args)
SourceXtractor
Definition
Aperture.h:30
std::raise
T raise(T... args)
std::this_thread::sleep_for
T sleep_for(T... args)
SourceXtractor::ProcessSourcesEvent
Event received by SourceGrouping to request the processing of some of the Sources stored.
Definition
PipelineStage.h:33
std::unique_lock
std::unique_ptr
Generated by
1.15.0