Skip to content

Commit bfbf945

Browse files
authored
DPL: allow for configurable CCDB paths (#15337)
1 parent 70bfe0e commit bfbf945

6 files changed

Lines changed: 102 additions & 23 deletions

File tree

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include "Framework/Signpost.h"
2222
#include "Framework/DanglingEdgesContext.h"
2323
#include "Framework/ConfigContext.h"
24-
#include "Framework/ConfigContext.h"
24+
#include "Framework/ConfigParamsHelper.h"
2525
#include <arrow/array/builder_binary.h>
2626
#include <arrow/type.h>
2727
#include <arrow/type_fwd.h>
@@ -71,31 +71,45 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
7171
{
7272
return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
7373
auto& dec = ic.services().get<DanglingEdgesContext>();
74+
// The effective default for each ccdb: option was already resolved at topology
75+
// time by ArrowSupport (consulting task Configurables) and registered on this
76+
// device's options. Here we just read the final value — honouring any further
77+
// runtime override supplied via CLI or JSON config.
78+
std::unordered_map<std::string, std::string> ccdbUrls;
79+
for (auto& input : dec.analysisCCDBInputs) {
80+
for (auto& m : input.metadata) {
81+
if (!m.name.starts_with("ccdb:") || ccdbUrls.count(m.name)) {
82+
continue;
83+
}
84+
std::string url = m.defaultValue.asString();
85+
if (ConfigParamsHelper::hasOption(spec.options, m.name)) {
86+
url = options.get<std::string>(m.name.c_str());
87+
}
88+
LOGP(info, "CCDB path resolved for {}: {}", m.name, url);
89+
ccdbUrls.emplace(m.name, std::move(url));
90+
}
91+
}
7492
std::vector<std::shared_ptr<arrow::Schema>> schemas;
75-
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
76-
7793
for (auto& input : dec.analysisCCDBInputs) {
94+
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
7895
std::vector<std::shared_ptr<arrow::Field>> fields;
7996
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
8097
schemaMetadata->Append("outputBinding", input.binding);
81-
8298
for (auto& m : input.metadata) {
83-
// Save the list of input tables
8499
if (m.name.starts_with("input:")) {
85100
auto name = m.name.substr(6);
86101
schemaMetadata->Append("sourceTable", name);
87102
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
88103
continue;
89104
}
90-
// Ignore the non ccdb: entries
91105
if (!m.name.starts_with("ccdb:")) {
92106
continue;
93107
}
94-
// Create the schema of the output
95-
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
96-
metadata->Append("url", m.defaultValue.asString());
108+
auto fieldMetadata = std::make_shared<arrow::KeyValueMetadata>();
109+
auto it = ccdbUrls.find(m.name);
110+
fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString());
97111
auto columnName = m.name.substr(strlen("ccdb:"));
98-
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
112+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, fieldMetadata));
99113
}
100114
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
101115
}

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ struct Builder {
172172

173173
std::shared_ptr<arrow::Table> materialize(ProcessingContext& pc);
174174
};
175-
} // namespace o2::framework
175+
} // namespace o2::framework
176176

177177
namespace o2::soa
178178
{
@@ -394,7 +394,7 @@ constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
394394
return {};
395395
}
396396

397-
} // namespace
397+
} // namespace
398398

399399
template <TableRef R>
400400
constexpr auto tableRef2InputSpec()
@@ -463,7 +463,7 @@ constexpr auto tableRef2OutputRef()
463463
o2::aod::label<R>(),
464464
R.version};
465465
}
466-
} // namespace o2::soa
466+
} // namespace o2::soa
467467

468468
namespace o2::framework
469469
{
@@ -672,7 +672,7 @@ struct Spawns : decltype(transformBase<T>()) {
672672

673673
std::shared_ptr<typename T::table_t> table = nullptr;
674674
std::shared_ptr<extension_t> extension = nullptr;
675-
std::array<o2::framework::expressions::Projector, N> projectors = []<typename... C>(framework::pack<C...>) -> std::array<expressions::Projector, sizeof...(C)>
675+
std::array<o2::framework::expressions::Projector, N> projectors = []<typename... C>(framework::pack<C...>)->std::array<expressions::Projector, sizeof...(C)>
676676
{
677677
return {{std::move(C::Projector())...}};
678678
}
@@ -1077,7 +1077,7 @@ concept is_partition = requires(T t) {
10771077
requires std::same_as<decltype(t.filter), expressions::Filter>;
10781078
requires std::same_as<decltype(t.mFiltered), std::unique_ptr<o2::soa::Filtered<typename T::content_t>>>;
10791079
};
1080-
} // namespace o2::framework
1080+
} // namespace o2::framework
10811081

10821082
namespace o2::soa
10831083
{
@@ -1100,6 +1100,6 @@ auto Attach(T const& table)
11001100
using output_t = Join<T, o2::soa::Table<o2::aod::Hash<"JOIN"_h>, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>;
11011101
return output_t{{table.asArrowTable()}, table.offset()};
11021102
}
1103-
} // namespace o2::soa
1103+
} // namespace o2::soa
11041104

1105-
#endif // o2_framework_AnalysisHelpers_H_DEFINED
1105+
#endif // o2_framework_AnalysisHelpers_H_DEFINED

Framework/Core/include/Framework/Configurable.h

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,26 @@ struct Configurable : IP {
8383
template <typename T, ConfigParamKind K = ConfigParamKind::kGeneric>
8484
using MutableConfigurable = Configurable<T, K, ConfigurablePolicyMutable<T, K>>;
8585

86+
/// Convenience wrapper for overriding the CCDB path of a CCDB column declared
87+
/// with DECLARE_SOA_CCDB_COLUMN / DECLARE_SOA_CCDB_COLUMN_FULL.
88+
///
89+
/// The option name, default value, and help string are all derived automatically
90+
/// from the column type: name = "ccdb:" + Column::mLabel, default = Column::query.
91+
///
92+
/// Example:
93+
/// struct MyTask {
94+
/// ConfigurableCCDBPath<tofcalib::LHCphase> lhcPhasePath;
95+
/// };
96+
template <typename Column>
97+
struct ConfigurableCCDBPath : Configurable<std::string> {
98+
ConfigurableCCDBPath()
99+
: Configurable<std::string>{std::string{"ccdb:"} + Column::mLabel,
100+
std::string{Column::query},
101+
std::string{"CCDB path for "} + Column::mLabel + " (default: " + Column::query + ")"}
102+
{
103+
}
104+
};
105+
86106
template <typename T>
87107
concept is_configurable = requires(T t) {
88108
requires std::same_as<std::string, decltype(t.name)>;
@@ -93,11 +113,10 @@ concept is_configurable = requires(T t) {
93113
using ConfigurableAxis = Configurable<std::vector<double>, ConfigParamKind::kAxisSpec, ConfigurablePolicyConst<std::vector<double>, ConfigParamKind::kAxisSpec>>;
94114

95115
template <typename T>
96-
concept is_configurable_axis = is_configurable<T>&&
97-
requires()
98-
{
99-
T::kind == ConfigParamKind::kAxisSpec;
100-
};
116+
concept is_configurable_axis = is_configurable<T> &&
117+
requires() {
118+
T::kind == ConfigParamKind::kAxisSpec;
119+
};
101120

102121
template <typename T, typename... As>
103122
struct ProcessConfigurable : Configurable<bool, ConfigParamKind::kProcessFlag> {

Framework/Core/src/AnalysisHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,4 +201,5 @@ std::shared_ptr<arrow::Table> Builder::materialize(ProcessingContext& pc)
201201
result = o2::soa::IndexBuilder::materialize(*builders.get(), std::move(tables), records, outputSchema, exclusive);
202202
return result;
203203
}
204+
204205
} // namespace o2::framework

Framework/Core/src/ArrowSupport.cxx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "Framework/ServiceRegistryHelpers.h"
3535
#include "Framework/Signpost.h"
3636
#include "Framework/DefaultsHelpers.h"
37+
#include "Framework/ConfigParamsHelper.h"
3738

3839
#include "CommonMessageBackendsHelpers.h"
3940
#include <Monitoring/Monitoring.h>
@@ -637,6 +638,34 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
637638
analysisCCDB->outputs.clear();
638639
analysisCCDB->inputs.clear();
639640
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
641+
// Register each ccdb: column path as an actual device option on the CCDB
642+
// device so it can be read from ConfigParamRegistry at runtime.
643+
// If any analysis task declared a Configurable<std::string> with the same
644+
// "ccdb:fXxx" name, prefer its default over the compile-time ::query value.
645+
// First encountered wins; log a warning if two tasks declare conflicting defaults.
646+
for (auto& input : dec.analysisCCDBInputs) {
647+
for (auto& m : input.metadata | std::views::filter(checks::has_params_with_name_starting("ccdb:"))) {
648+
ConfigParamSpec effective = m; // start with compile-time default
649+
bool foundFirst = false;
650+
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
651+
for (auto& opt : d.options) {
652+
if (opt.name == m.name) {
653+
if (!foundFirst) {
654+
effective = opt; // first task Configurable wins
655+
foundFirst = true;
656+
} else if (opt.defaultValue.asString() != effective.defaultValue.asString()) {
657+
LOGP(warn, "Task '{}' declares Configurable '{}' = '{}' which conflicts "
658+
"with an earlier value '{}'; earlier value will be used.",
659+
d.name, opt.name, opt.defaultValue.asString(),
660+
effective.defaultValue.asString());
661+
}
662+
break;
663+
}
664+
}
665+
}
666+
ConfigParamsHelper::addOptionIfMissing(analysisCCDB->options, effective);
667+
}
668+
}
640669
// load real AlgorithmSpec before deployment
641670
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
642671
}

Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,35 @@ struct DummyTimestampsTable {
5151
};
5252

5353
struct SimpleCCDBConsumer {
54+
ConfigurableCCDBPath<o2::aod::tofcalib::LHCphase> lhcPhasePath;
55+
5456
void process(o2::aod::TOFCalibrationObjects const& ccdbObjectsForAllTimestamps)
5557
{
58+
LOGP(info, "LHCphase CCDB path configurable value: {}", lhcPhasePath.value);
5659
LOGP(info, "Looking at all the LHCphases associated to the timestamps");
5760
for (auto& object : ccdbObjectsForAllTimestamps) {
5861
std::cout << object.lhcPhase().getStartValidity() << " " << object.lhcPhase().getEndValidity() << std::endl;
5962
}
6063
}
6164
};
6265

66+
struct AnotherCCDBConsumer {
67+
ConfigurableCCDBPath<o2::aod::tofcalib::LHCphase> lhcPhasePath;
68+
69+
void process(o2::aod::TOFCalibrationObjects const& ccdbObjectsForAllTimestamps)
70+
{
71+
LOGP(info, "AnotherCCDBConsumer LHCphase CCDB path configurable value: {}", lhcPhasePath.value);
72+
for (auto& object : ccdbObjectsForAllTimestamps) {
73+
std::cout << object.lhcPhase().getStartValidity() << " " << object.lhcPhase().getEndValidity() << std::endl;
74+
}
75+
}
76+
};
77+
6378
WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
6479
{
6580
return WorkflowSpec{
6681
adaptAnalysisTask<DummyTimestampsTable>(cfgc),
67-
adaptAnalysisTask<SimpleCCDBConsumer>(cfgc, TaskName{"simple-ccdb-cunsumer"}),
82+
adaptAnalysisTask<SimpleCCDBConsumer>(cfgc, TaskName{"simple-ccdb-consumer"}),
83+
adaptAnalysisTask<AnotherCCDBConsumer>(cfgc, TaskName{"another-ccdb-consumer"}),
6884
};
6985
}

0 commit comments

Comments
 (0)