SourceXtractorPlusPlus 1.0.3
SourceXtractor++, the next generation SExtractor
Loading...
Searching...
No Matches
SourceXtractor.cpp
Go to the documentation of this file.
1/*
2 * Copyright © 2019-2022 Université de Genève, LMU Munich - Faculty of Physics, IAP-CNRS/Sorbonne Université
3 *
4 * This library is free software; you can redistribute it and/or modify it under
5 * the terms of the GNU Lesser General Public License as published by the Free
6 * Software Foundation; either version 3.0 of the License, or (at your option)
7 * any later version.
8 *
9 * This library is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12 * details.
13 *
14 * You should have received a copy of the GNU Lesser General Public License
15 * along with this library; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
23
24#include <dlfcn.h>
25#include <iomanip>
26#include <map>
27#include <string>
28#include <typeinfo>
29
30#include <boost/program_options.hpp>
31#include <boost/algorithm/string/predicate.hpp>
33
34#include "ElementsKernel/Main.h"
35#include "ElementsKernel/System.h"
36#include "ElementsKernel/Temporary.h"
37
38#include "Configuration/ConfigManager.h"
39#include "Configuration/Utils.h"
40
42
50
52
55
79
81#include "SEMain/PluginConfig.h"
82#include "SEMain/Sorter.h"
83
84
85namespace po = boost::program_options;
86namespace fs = boost::filesystem;
87using namespace SourceXtractor;
88using namespace Euclid::Configuration;
89
91
92static const std::string LIST_OUTPUT_PROPERTIES {"list-output-properties"};
93static const std::string PROPERTY_COLUMN_MAPPING_ALL {"property-column-mapping-all"};
94static const std::string PROPERTY_COLUMN_MAPPING {"property-column-mapping"};
95static const std::string DUMP_CONFIG {"dump-default-config"};
96
97class GroupObserver : public Observer<std::shared_ptr<SourceGroupInterface>> {
98public:
99 virtual void handleMessage(const std::shared_ptr<SourceGroupInterface>& group) override {
100 m_list.push_back(group);
101 }
102
104};
105
106class SourceObserver : public Observer<std::shared_ptr<SourceWithOnDemandProperties>> {
107public:
109 m_list.push_back(source);
110 }
111
113};
114
116
117static void setupEnvironment(void) {
118 // Some parts of boost (including boost::filesystem) can throw an exception when the
119 // locale as configured in the environment is invalid.
120 // We work around that overriding the locale if we find an invalid one.
121 // See https://svn.boost.org/trac10/ticket/10205
122 try {
123 std::locale("");
124 }
125 catch (...) {
126 ::setenv("LC_ALL", "C", 1);
127 }
128}
129
136 // Despite the documentation, the methods following C ABI are capitalized
137 void (*set_num_threads)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Num_Threads"));
138 void (*set_dynamic)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Dynamic"));
139 void (*openblas_set_num_threads)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "openblas_set_num_threads"));
140 if (set_num_threads) {
141 logger.debug() << "Disabling multithreading";
142 set_num_threads(1);
143 }
144 if (openblas_set_num_threads) {
145 logger.debug() << "Disabling OpenBLAS multithreading";
146 openblas_set_num_threads(1);
147 }
148 if (set_dynamic) {
149 logger.debug() << "Disabling dynamic multithreading";
150 set_dynamic(0);
151 }
152}
153
154class SEMain : public Elements::Program {
155
171
172 bool config_initialized = false;
173 po::options_description config_parameters;
174
175public:
176
177 SEMain(const std::string& plugin_path, const std::vector<std::string>& plugin_list)
178 : plugin_manager { task_factory_registry, output_registry, plugin_path, plugin_list } {
179 }
180
184 po::options_description getConfigParameters() {
185 if (!config_initialized) {
186 auto& config_manager = ConfigManager::getInstance(config_manager_id);
187 config_manager.registerConfiguration<SourceXtractorConfig>();
188 config_manager.registerConfiguration<BackgroundConfig>();
189 config_manager.registerConfiguration<SE2BackgroundConfig>();
190 config_manager.registerConfiguration<MemoryConfig>();
191 config_manager.registerConfiguration<BackgroundAnalyzerFactory>();
192 config_manager.registerConfiguration<SamplingConfig>();
193 config_manager.registerConfiguration<DetectionFrameConfig>();
194
196
197 //plugins need to be registered before reportConfigDependencies()
198 plugin_manager.loadPlugins();
199 task_factory_registry->reportConfigDependencies(config_manager);
200 segmentation_factory.reportConfigDependencies(config_manager);
201 partition_factory.reportConfigDependencies(config_manager);
202 grouping_factory.reportConfigDependencies(config_manager);
203 deblending_factory.reportConfigDependencies(config_manager);
204 measurement_factory.reportConfigDependencies(config_manager);
205 output_factory.reportConfigDependencies(config_manager);
206
207 config_parameters.add(config_manager.closeRegistration());
208 config_initialized = true;
209 }
210 return config_parameters;
211 }
212
215 auto options = getConfigParameters();
216
217 options.add_options() (LIST_OUTPUT_PROPERTIES.c_str(), po::bool_switch(),
218 "List the possible output properties for the given input parameters and exit");
219 options.add_options() (PROPERTY_COLUMN_MAPPING_ALL.c_str(), po::bool_switch(),
220 "Show the columns created for each property");
221 options.add_options() (PROPERTY_COLUMN_MAPPING.c_str(), po::bool_switch(),
222 "Show the columns created for each property, for the given configuration");
223 options.add_options() (DUMP_CONFIG.c_str(), po::bool_switch(),
224 "Dump parameters with default values into a configuration file");
225 progress_printer_factory.addOptions(options);
226
227 // Allow to pass Python options as positional following --
228 po::positional_options_description p;
229 p.add("python-arg", -1);
230
231 return {options, p};
232 }
233
235 template <typename T>
236 static void writeDefault(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
237 out << opt.long_name() << '=' << boost::any_cast<T>(default_value) << std::endl;
238 }
239
241 template <typename T>
242 static void writeDefaultMultiple(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
243 auto values = boost::any_cast<std::vector<T>>(default_value);
244 if (values.empty()) {
245 out << "# " << opt.long_name() << '=' << std::endl;
246 }
247 else {
248 for (const auto& v : values)
249 out << opt.long_name() << '=' << v << std::endl;
250 }
251 }
252
255 typedef std::function<void(std::ostream&, const po::option_description&, const boost::any&)> PrinterFunction;
257 {typeid(bool), &writeDefault<bool>},
258 {typeid(int), &writeDefault<int>},
259 {typeid(double), &writeDefault<double>},
262 };
263 decltype(printers)::const_iterator printer;
264
266 for (const auto& p : config_parameters.options()) {
267 boost::any default_value;
268
269 std::cout << "# " << p->description() << std::endl;
270 if (!p->semantic()->apply_default(default_value)) {
271 std::cout << '#' << p->long_name() << "=" << std::endl;
272 }
273 else if ((printer = printers.find(default_value.type())) == printers.end()) {
274 std::cout << '#' << p->long_name() << "=<Unknown type " << default_value.type().name() << '>' << std::endl;
275 }
276 else {
277 printer->second(std::cout, *p, default_value);
278 }
280 }
281
282 // We need to print the log options manually, as that is set up by Elements
283 std::cout << "# Log level: FATAL, ERROR, WARN, INFO, DEBUG" << std::endl;
284 std::cout << "log-level=INFO" << std::endl;
285 std::cout << "# Log file" << std::endl;
286 std::cout << "#log-file" << std::endl;
287 }
288
290
291 // If the user just requested to see the possible output columns we show
292 // them and we do nothing else
293
294 if (args.at(LIST_OUTPUT_PROPERTIES).as<bool>()) {
295 for (auto& name : output_registry->getOutputPropertyNames()) {
296 std::cout << name << std::endl;
297 }
299 }
300
301 if (args.at(PROPERTY_COLUMN_MAPPING_ALL).as<bool>()) {
302 output_registry->printPropertyColumnMap();
304 }
305
306 if (args.at(DUMP_CONFIG).as<bool>()) {
309 }
310
311 // Make sure the BLAS multithreading does not interfere
313
314 // Elements does not verify that the config-file exists. It will just not read it.
315 // We verify that it does exist here.
316 if (args.find("config-file") != args.end()) {
317 auto cfg_file = args.at("config-file").as<fs::path>();
318 if (cfg_file != "" && !fs::exists(cfg_file)) {
319 throw Elements::Exception() << "The configuration file '" << cfg_file << "' does not exist";
320 }
321 }
322
323 // Create the progress listener and printer ASAP
324 progress_printer_factory.configure(args);
325 auto progress_mediator = progress_printer_factory.createProgressMediator();
326
327 // Initialize the rest of the components
328 auto& config_manager = ConfigManager::getInstance(config_manager_id);
329 config_manager.initialize(args);
330
331 // Configure TileManager
332 auto memory_config = config_manager.getConfiguration<MemoryConfig>();
333 TileManager::getInstance()->setOptions(memory_config.getTileSize(),
334 memory_config.getTileSize(), memory_config.getTileMaxMemory());
335
336 CheckImages::getInstance().configure(config_manager);
337
338 task_factory_registry->configure(config_manager);
339 task_factory_registry->registerPropertyInstances(*output_registry);
340
341 segmentation_factory.configure(config_manager);
342 partition_factory.configure(config_manager);
343 grouping_factory.configure(config_manager);
344 deblending_factory.configure(config_manager);
345 measurement_factory.configure(config_manager);
346 output_factory.configure(config_manager);
347
348 if (args.at(PROPERTY_COLUMN_MAPPING).as<bool>()) {
349 output_registry->printPropertyColumnMap(config_manager.getConfiguration<OutputConfig>().getOutputProperties());
351 }
352
353 auto segmentation = segmentation_factory.createSegmentation();
354
355 // Multithreading
356 auto multithreading_config = config_manager.getConfiguration<MultiThreadingConfig>();
357 auto thread_pool = multithreading_config.getThreadPool();
358
359 // Rest of the stages
360 auto partition = partition_factory.getPartition();
361 auto source_grouping = grouping_factory.createGrouping();
362
363 std::shared_ptr<Deblending> deblending = deblending_factory.createDeblending();
364 std::shared_ptr<Measurement> measurement = measurement_factory.getMeasurement();
365 std::shared_ptr<Output> output = output_factory.createOutput();
366
367 // Prefetcher
369 if (thread_pool) {
370 auto prefetch = source_grouping->requiredProperties();
371 auto deblending_prefetch = deblending->requiredProperties();
372 prefetch.insert(deblending_prefetch.begin(), deblending_prefetch.end());
373 if (!prefetch.empty()) {
374 prefetcher = std::make_shared<Prefetcher>(thread_pool, multithreading_config.getMaxQueueSize());
375 prefetcher->requestProperties(prefetch);
376 }
377 }
378
379 // Link together the pipeline's steps
380 segmentation->setNextStage(partition);
381
382 if (prefetcher) {
383 partition->setNextStage(prefetcher);
384 prefetcher->setNextStage(source_grouping);
385 }
386 else {
387 partition->setNextStage(source_grouping);
388 }
389
390 source_grouping->setNextStage(deblending);
391 deblending->setNextStage(measurement);
392
393 if (config_manager.getConfiguration<OutputConfig>().getOutputUnsorted()) {
394 logger.info() << "Writing output following measure order";
395 measurement->setNextStage(output);
396 } else {
397 logger.info() << "Writing output following segmentation order";
398 auto sorter = std::make_shared<Sorter>();
399 measurement->setNextStage(sorter);
400 sorter->setNextStage(output);
401 }
402
403 segmentation->Observable<SegmentationProgress>::addObserver(progress_mediator->getSegmentationObserver());
404 segmentation->Observable<SourceInterface>::addObserver(progress_mediator->getDetectionObserver());
405 deblending->Observable<SourceGroupInterface>::addObserver(progress_mediator->getDeblendingObserver());
406 measurement->Observable<SourceGroupInterface>::addObserver(progress_mediator->getMeasurementObserver());
407
408 // Add observers for CheckImages
409 if (CheckImages::getInstance().getSegmentationImage(0) != nullptr) {
410 segmentation->Observable<SourceInterface>::addObserver(std::make_shared<DetectionIdCheckImage>());
411 }
412 if (CheckImages::getInstance().getPartitionImage(0) != nullptr) {
413 measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<SourceIdCheckImage>());
414 }
415 if (CheckImages::getInstance().getGroupImage(0) != nullptr) {
416 measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<GroupIdCheckImage>());
417 }
418 if (CheckImages::getInstance().getMoffatImage(0) != nullptr) {
419 measurement->Observable<SourceGroupInterface>::addObserver(std::make_shared<MoffatCheckImage>());
420 }
421 const auto& detection_frames = config_manager.getConfiguration<DetectionFrameConfig>().getDetectionFrames();
422
423 // Perform measurements (multi-threaded part)
424 measurement->startThreads();
425 size_t prev_writen_rows = 0;
426
427 if (detection_frames.size() > 0) {
428 size_t frame_number = 0;
429 for (auto& detection_frame : detection_frames) {
430 frame_number++;
431 try {
432 // Process the image
433 logger.info() << "Processing frame "
434 << frame_number << " / " << detection_frames.size() << " : " << detection_frame->getLabel();
435 segmentation->processFrame(detection_frame);
436 }
437 catch (const std::exception &e) {
438 logger.error() << "Failed to process the frame! " << e.what();
439 measurement->stopThreads();
441 }
442
443 if (prefetcher) {
444 prefetcher->synchronize();
445 }
446 measurement->synchronizeThreads();
447
448 size_t nb_writen_rows = output->flush();
449 output->nextPart();
450
451 logger.info() << (nb_writen_rows - prev_writen_rows) << " sources detected in frame, " << nb_writen_rows << " total";
452
453 prev_writen_rows = nb_writen_rows;
454 }
455 } else {
456 // Running detection-less
457
458 auto assoc_mode_config = config_manager.getConfiguration<AssocModeConfig>();
459 if (assoc_mode_config.getCatalogs().size() < 1) {
460 logger.error() << "No detection image and no assoc catalog";
461 measurement->stopThreads();
463 }
464
465 try {
466 // Process the catalog
467 logger.info() << "Processing assoc catalog (no detection image)\n";
468 segmentation->processFrame(nullptr);
469 }
470 catch (const std::exception &e) {
471 logger.error() << "Failed to process the assoc catalog!\n" << e.what();
472 measurement->stopThreads();
474 }
475
476 if (prefetcher) {
477 prefetcher->synchronize();
478 }
479 measurement->synchronizeThreads();
480
481 size_t nb_writen_rows = output->flush();
482 output->nextPart();
483
484 logger.info() << (nb_writen_rows - prev_writen_rows) << " sources detected in catalog, " << nb_writen_rows << " total";
485
486 prev_writen_rows = nb_writen_rows;
487 }
488
489 if (prefetcher) {
490 prefetcher->wait();
491 }
492 measurement->stopThreads();
493
494 // Those check images can only be added AFTER the processing of the detection frames
495 for (auto& detection_frame : detection_frames) {
496 CheckImages::getInstance().addFilteredCheckImage(detection_frame->getFilteredImage());
497 CheckImages::getInstance().addThresholdedCheckImage(detection_frame->getThresholdedImage());
498 CheckImages::getInstance().addSnrCheckImage(detection_frame->getSnrImage());
499 }
500
502 TileManager::getInstance()->flush();
503 progress_mediator->done();
504
505 if (prev_writen_rows > 0) {
506 logger.info() << "total " << prev_writen_rows << " sources detected";
507 } else {
508 logger.info() << "NO SOURCES DETECTED";
509 }
510
512 }
513};
514
515
517
518public:
520 m_plugin_path(plugin_path), m_plugin_list(plugin_list) {
521 }
522
523 virtual ~PluginOptionsMain() = default;
524
525 boost::program_options::options_description defineSpecificProgramOptions() override {
526 auto& config_manager = ConfigManager::getInstance(conf_man_id);
527 config_manager.registerConfiguration<PluginConfig>();
528 auto options = config_manager.closeRegistration();
529 // The following will consume any extra options in the configuration file
530 options.add_options()("*", po::value<std::vector<std::string>>());
531 return options;
532 }
533
535 auto& config_manager = ConfigManager::getInstance(conf_man_id);
536 config_manager.initialize(args);
537 auto& conf = config_manager.getConfiguration<PluginConfig>();
538 m_plugin_path = conf.getPluginPath();
539 m_plugin_list = conf.getPluginList();
541 }
542
543private:
544
548
549};
550
551
552static void forwardOptions(int argc, char *const *argv, std::vector<std::string>& plugin_options_input) {
553 for (int i = 0; i < argc; ++i) {
554 std::string option{argv[i]};
555 if (option == "--config-file") {
556 plugin_options_input.emplace_back("--config-file");
557 plugin_options_input.emplace_back(std::string{argv[i + 1]});
558 }
559 if (boost::starts_with(option, "--config-file=")) {
560 plugin_options_input.emplace_back(option);
561 }
562 if (option == "--plugin-directory") {
563 plugin_options_input.emplace_back("--plugin-directory");
564 plugin_options_input.emplace_back(std::string{argv[i + 1]});
565 }
566 if (boost::starts_with(option, "--plugin-directory=")) {
567 plugin_options_input.emplace_back(option);
568 }
569 if (option == "--plugin") {
570 plugin_options_input.emplace_back("--plugin");
571 plugin_options_input.emplace_back(std::string{argv[i + 1]});
572 }
573 if (boost::starts_with(option, "--plugin=")) {
574 plugin_options_input.emplace_back(option);
575 }
576 }
577}
578
579
580ELEMENTS_API int main(int argc, char* argv[]) {
581 std::string plugin_path {};
582 std::vector<std::string> plugin_list {};
583
584 // This adds the current directory as a valid location for the default "sourcextractor++.conf" configuration
585 Elements::TempEnv local_env;
586 if (local_env["ELEMENTS_CONF_PATH"].empty()) {
587 local_env["ELEMENTS_CONF_PATH"] = ".:/etc";
588 } else {
589 local_env["ELEMENTS_CONF_PATH"] = ".:" + local_env["ELEMENTS_CONF_PATH"] + ":/etc";
590 }
591
593
594 // Try to be reasonably graceful with unhandled exceptions
596
597 try {
598 // First we create a program which has a sole purpose to get the options for
599 // the plugin paths. Note that we do not want to have this helper program
600 // to handle any other options except of the plugin-directory and plugin, so
601 // we create a subset of the given options with only the necessary ones. We
602 // also turn off the the logging.
603 std::vector<int> masked_indices{};
604 std::vector<std::string> plugin_options_input{};
605 plugin_options_input.emplace_back("DummyProgram");
606 plugin_options_input.emplace_back("--log-level");
607 plugin_options_input.emplace_back("ERROR");
608 forwardOptions(argc, argv, plugin_options_input);
609
610 int argc_tmp = plugin_options_input.size();
611 std::vector<const char *> argv_tmp(argc_tmp);
612 for (unsigned int i = 0; i < plugin_options_input.size(); ++i) {
613 auto& option_str = plugin_options_input[i];
614 argv_tmp[i] = option_str.data();
615 }
616
617 CREATE_MANAGER_WITH_ARGS(plugin_options_program, PluginOptionsMain, plugin_path, plugin_list);
618 plugin_options_program.run(argc_tmp, const_cast<char **>(argv_tmp.data()));
619
620 CREATE_MANAGER_WITH_ARGS(main, SEMain, plugin_path, plugin_list);
621 Elements::ExitCode exit_code = main.run(argc, argv);
622 return static_cast<Elements::ExitCodeType>(exit_code);
623 }
624 catch (const std::exception &e) {
625 logger.fatal() << e.what();
627 }
628 catch (...) {
629 logger.fatal() << "Unknown exception type!";
630 logger.fatal() << "Please, report this as a bug";
632 }
633}
static long config_manager_id
static Elements::Logging logger
static const std::string PROPERTY_COLUMN_MAPPING
static void setupEnvironment(void)
static void disableBlasMultithreading()
static const std::string LIST_OUTPUT_PROPERTIES
static void forwardOptions(int argc, char *const *argv, std::vector< std::string > &plugin_options_input)
static const std::string PROPERTY_COLUMN_MAPPING_ALL
static const std::string DUMP_CONFIG
static long config_manager_id
ELEMENTS_API int main(int argc, char *argv[])
T at(T... args)
static Logging getLogger(const std::string &name="")
static void onTerminate() noexcept
static ConfigManager & getInstance(long id)
virtual void handleMessage(const std::shared_ptr< SourceGroupInterface > &group) override
std::list< std::shared_ptr< SourceGroupInterface > > m_list
boost::program_options::options_description defineSpecificProgramOptions() override
Elements::ExitCode mainMethod(std::map< std::string, boost::program_options::variable_value > &args) override
std::string & m_plugin_path
virtual ~PluginOptionsMain()=default
PluginOptionsMain(std::string &plugin_path, std::vector< std::string > &plugin_list)
std::vector< std::string > & m_plugin_list
DeblendingFactory deblending_factory
std::shared_ptr< OutputRegistry > output_registry
std::pair< po::options_description, po::positional_options_description > defineProgramArguments() override
Return the arguments that the program accepts.
po::options_description getConfigParameters()
PartitionFactory partition_factory
SegmentationFactory segmentation_factory
void printDefaults()
Print a configuration file populated with defaults.
std::shared_ptr< SourceGroupFactory > group_factory
ProgressReporterFactory progress_printer_factory
GroupingFactory grouping_factory
std::shared_ptr< SourceFactory > source_factory
std::shared_ptr< TaskFactoryRegistry > task_factory_registry
SEMain(const std::string &plugin_path, const std::vector< std::string > &plugin_list)
OutputFactory output_factory
Elements::ExitCode mainMethod(std::map< std::string, po::variable_value > &args) override
static void writeDefaultMultiple(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a multiple-value option.
bool config_initialized
static void writeDefault(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a simple option.
MeasurementFactory measurement_factory
PluginManager plugin_manager
std::shared_ptr< TaskProvider > task_provider
po::options_description config_parameters
std::list< std::shared_ptr< SourceWithOnDemandProperties > > m_list
virtual void handleMessage(const std::shared_ptr< SourceWithOnDemandProperties > &source) override
void addFilteredCheckImage(std::shared_ptr< Image< SeFloat > > filtered_image)
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
void addSnrCheckImage(std::shared_ptr< Image< SeFloat > > snr_image)
static CheckImages & getInstance()
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
void addThresholdedCheckImage(std::shared_ptr< Image< SeFloat > > thresholded_image)
Provides combined detection frame.
const std::shared_ptr< Euclid::ThreadPool > & getThreadPool() const
Observer interface to be used with Observable to implement the Observer pattern.
Definition Observable.h:38
const std::vector< std::string > getOutputProperties()
PluginManager handles the loading of plugins and calls their registration function,...
The SegmentationFactory will provide a Segmentation implementation based on the current configuration...
static std::shared_ptr< TileManager > getInstance()
T data(T... args)
T emplace_back(T... args)
T end(T... args)
T endl(T... args)
T find(T... args)
#define ELEMENTS_API
#define CREATE_MANAGER_WITH_ARGS(MANAGER, ELEMENTS_PROGRAM,...)
T make_shared(T... args)
Environment TempEnv
std::underlying_type< ExitCode >::type ExitCodeType
long getUniqueManagerId() noexcept
static Elements::Logging logger
Definition conf.py:1
T partition(T... args)
T set_terminate(T... args)
T size(T... args)