SourceXtractorPlusPlus 1.0.3
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
Prefetcher.cpp
Go to the documentation of this file.
1
17
18#include <ElementsKernel/Logging.h>
19#include "AlexandriaKernel/memory_tools.h"
21
23
24
25namespace SourceXtractor {
26
30template<typename Lock>
32 explicit ReverseLock(Lock& lock) : m_lock(lock) {
33 m_lock.unlock();
34 }
35
37 m_lock.lock();
38 }
39
40private:
41 Lock& m_lock;
42};
43
44Prefetcher::Prefetcher(const std::shared_ptr<Euclid::ThreadPool>& thread_pool, unsigned max_queue_size)
45 : m_thread_pool(thread_pool), m_stop(false), m_semaphore(max_queue_size) {
47}
48
50 if (m_output_thread->joinable())
51 wait();
52}
53
55 m_semaphore.acquire();
56
57 intptr_t source_addr = reinterpret_cast<intptr_t>(message.get());
58 {
60 m_received.emplace_back(EventType::SOURCE, source_addr);
61 }
62
63 // Pre-fetch in separate threads
64 auto lambda = [this, source_addr, message = std::move(message)]() mutable {
65 for (auto& prop : m_prefetch_set) {
66 message->getProperty(prop);
67 }
68 {
70 m_finished_sources.emplace(source_addr, std::move(message));
71 }
72 m_new_output.notify_one();
73 };
74 auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(std::move(lambda))](){
75 (*lambda)();
76 };
77 m_thread_pool->submit(lambda_copyable);
78}
79
80void Prefetcher::requestProperty(const PropertyId& property_id) {
81 m_prefetch_set.emplace(property_id);
82 logger.debug() << "Requesting prefetch of " << property_id.getString();
83}
84
86 logger.debug() << "Starting prefetcher output loop";
87
88 while (m_thread_pool->activeThreads() > 0) {
90
91 // Wait for something new
92 m_new_output.wait_for(output_lock, std::chrono::milliseconds(1000));
93
94 // Process the output queue
95 // This is, release sources when the front of the received has been processed
96 while (!m_received.empty()) {
97 auto next = m_received.front();
98 // If the front is a ProcessSourceEvent, everything received before is done,
99 // so pass downstream
100 if (next.m_event_type == EventType::PROCESS_SOURCE) {
101 auto event = m_event_queue.front();
102 m_event_queue.pop_front();
103 logger.debug() << "ProcessSourceEvent released";
104 {
105 ReverseLock<decltype(output_lock)> release_lock(output_lock);
106 sendProcessSignal(event);
107 }
108 m_received.pop_front();
109 continue;
110 }
111 // Find if the matching source is done
112 auto processed = m_finished_sources.find(next.m_source_addr);
113 // If not, we can't keep going, so exit here
114 if (processed == m_finished_sources.end()) {
115 logger.debug() << "Next source " << next.m_source_addr << " not done yet";
116 break;
117 }
118 // If it is, send it downstream
119 logger.debug() << "Source " << next.m_source_addr << " sent downstream";
120 {
121 ReverseLock<decltype(output_lock)> release_lock(output_lock);
122 sendSource(std::move(processed->second));
123 }
124 m_finished_sources.erase(processed);
125 m_received.pop_front();
126 m_semaphore.release();
127 }
128
129 if (m_stop && m_received.empty()) {
130 break;
131 }
132 }
133 logger.debug() << "Stopping prefetcher output loop";
134}
135
137 {
140 m_event_queue.emplace_back(message);
141 }
142 m_new_output.notify_one();
143 logger.debug() << "ProcessSourceEvent received";
144}
145
147 m_stop = true;
148 m_output_thread->join();
149}
150
152 // Wait until the output queue is empty
153 while (true) {
154 {
156 if (m_received.empty()) {
157 break;
158 }
159 else if (m_thread_pool->checkForException(false)) {
160 logger.fatal() << "An exception was thrown from a worker thread";
161 m_thread_pool->checkForException(true);
162 }
163 else if (m_thread_pool->activeThreads() == 0) {
164 throw Elements::Exception() << "No active threads and the queue is not empty! Please, report this as a bug";
165 }
166 }
168 }
169}
170
171} // end of namespace SourceXtractor
static Elements::Logging logger
static Logging getLogger(const std::string &name="")
void sendProcessSignal(const ProcessSourcesEvent &event) const
void sendSource(std::unique_ptr< SourceInterface > source) const
void requestProperty(const PropertyId &property_id)
void receiveProcessSignal(const ProcessSourcesEvent &event) override
std::deque< ProcessSourcesEvent > m_event_queue
Queue of received ProcessSourceEvent, order preserved.
Definition Prefetcher.h:119
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Pointer to the pool of worker threads.
Definition Prefetcher.h:109
Prefetcher(const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
std::condition_variable m_new_output
Notifies there is a new source done processing.
Definition Prefetcher.h:115
std::deque< EventType > m_received
Queue of type of received events. Used to pass downstream events respecting the received order.
Definition Prefetcher.h:121
Euclid::Semaphore m_semaphore
Keep the queue under control.
Definition Prefetcher.h:129
std::set< PropertyId > m_prefetch_set
Properties to prefetch.
Definition Prefetcher.h:111
std::map< intptr_t, std::unique_ptr< SourceInterface > > m_finished_sources
Finished sources.
Definition Prefetcher.h:117
std::unique_ptr< std::thread > m_output_thread
Orchestration thread.
Definition Prefetcher.h:113
void receiveSource(std::unique_ptr< SourceInterface > source) override
std::atomic_bool m_stop
Termination condition for the output loop.
Definition Prefetcher.h:126
Identifier used to set and retrieve properties.
Definition PropertyId.h:40
std::string getString() const
T get(T... args)
T lock(T... args)
T make_shared(T... args)
T move(T... args)
static Elements::Logging logger
std::unique_ptr< T > make_unique(Args &&... args)
T next(T... args)
T sleep_for(T... args)
Event received by SourceGrouping to request the processing of some of the Sources stored.