From 3d4383105fcdd891f56303340012bc5839db4d5c Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 20 Mar 2026 09:00:21 +0100 Subject: [PATCH] Revert "DPL: Better detection for injected workflows (#15130)" This reverts commit 20be6e73f6aaf46a30d3e2e21df455891c1f2167. --- Framework/Core/src/ArrowSupport.cxx | 8 +-- Framework/Core/src/WorkflowHelpers.cxx | 15 +--- run/o2sim_hepmc_publisher.cxx | 94 +++++++++++++------------- run/o2sim_kine_publisher.cxx | 3 +- run/o2sim_mctracks_to_aod.cxx | 12 ++-- 5 files changed, 58 insertions(+), 74 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 81acc26b1b097..c5cc021a53478 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -680,12 +680,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() workflow.erase(reader); } else { // load reader algorithm before deployment - auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { - return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) { - return DataSpecUtils::match(output, "TFN", "TFNumber", 0); - }); - }); - if (tfnsource == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected + auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; }); + if (mctracks2aod == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx)); } // otherwise the algorithm was set in injectServiceDevices } diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 2ef3df9426fde..abe566e239618 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -411,17 +411,13 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // add the reader if (aodReader.outputs.empty() == false) { - auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { - return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) { - return DataSpecUtils::match(output, "TFN", "TFNumber", 0); - }); - }); - if (tfnsource == workflow.end()) { + auto mctracks2aod = std::ranges::find_if(workflow, [](auto const& x) { return x.name == "mctracks-to-aod"; }); + if (mctracks2aod == workflow.end()) { // add normal reader aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"}); aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"}); } else { - // AODs are being injected the tfnsource is the entry point, add error-handler reader + // AODs are being injected on-the-fly, add error-handler reader aodReader.algorithm = AlgorithmSpec{ adaptStateful( [](DeviceSpec const& spec) { @@ -704,11 +700,6 @@ void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext cons return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); }); dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false; - - it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool { - return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFF")); - }); - dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false; } } diff --git a/run/o2sim_hepmc_publisher.cxx b/run/o2sim_hepmc_publisher.cxx index f255b4a3a4f62..bf40abacb134f 100644 --- a/run/o2sim_hepmc_publisher.cxx +++ b/run/o2sim_hepmc_publisher.cxx @@ -37,9 +37,7 @@ struct O2simHepmcPublisher { int tfCounter = 0; std::shared_ptr hepMCReader; bool eos = false; - - std::vector*> mctracks_vector; - std::vector mcheader_vector; + std::vector mcTracks; void init(o2::framework::InitContext& /*ic*/) { @@ -52,19 +50,13 @@ struct O2simHepmcPublisher { LOGP(fatal, "Cannot open HEPMC kine file {}", (std::string)hepmcFileName); } // allocate the memory upfront to prevent reallocations later - mctracks_vector.reserve(aggregate); - mcheader_vector.reserve(aggregate); + mcTracks.reserve(1e3 * aggregate); } void run(o2::framework::ProcessingContext& pc) { HepMC3::GenEvent event; - auto batch = maxEvents > 0 ? std::min((int)aggregate, (int)maxEvents - eventCounter) : (int)aggregate; - for (auto i = 0; i < batch; ++i) { - mctracks_vector.push_back(&pc.outputs().make>(Output{"MC", "MCTRACKS", 0})); - auto& mctracks = mctracks_vector.back(); - mcheader_vector.push_back(&pc.outputs().make(Output{"MC", "MCHEADER", 0})); - auto& mcheader = mcheader_vector.back(); + for (auto i = 0; i < (int)aggregate; ++i) { // read next entry hepMCReader->read_event(event); if (hepMCReader->failed()) { @@ -74,60 +66,61 @@ struct O2simHepmcPublisher { } // create O2 MCHeader and MCtracks vector out of HEPMC event - mcheader->SetEventID(event.event_number()); - mcheader->SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz()); + o2::dataformats::MCEventHeader mcHeader; + mcHeader.SetEventID(event.event_number()); + mcHeader.SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz()); auto xsecInfo = event.cross_section(); if (xsecInfo != nullptr) { - mcheader->putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events()); - mcheader->putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events()); - mcheader->putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec()); - mcheader->putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err()); + mcHeader.putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events()); + mcHeader.putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events()); + mcHeader.putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec()); + mcHeader.putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err()); } auto scale = event.attribute(MCInfoKeys::eventScale); if (scale != nullptr) { - mcheader->putInfo(MCInfoKeys::eventScale, (float)scale->value()); + mcHeader.putInfo(MCInfoKeys::eventScale, (float)scale->value()); } auto nMPI = event.attribute(MCInfoKeys::mpi); if (nMPI != nullptr) { - mcheader->putInfo(MCInfoKeys::mpi, nMPI->value()); + mcHeader.putInfo(MCInfoKeys::mpi, nMPI->value()); } auto sid = event.attribute(MCInfoKeys::processCode); auto scode = event.attribute(MCInfoKeys::processID); // default pythia8 hepmc3 interface uses signal_process_id if (sid != nullptr) { - mcheader->putInfo(MCInfoKeys::processCode, sid->value()); + mcHeader.putInfo(MCInfoKeys::processCode, sid->value()); } else if (scode != nullptr) { - mcheader->putInfo(MCInfoKeys::processCode, scode->value()); + mcHeader.putInfo(MCInfoKeys::processCode, scode->value()); } auto pdfInfo = event.pdf_info(); if (pdfInfo != nullptr) { - mcheader->putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]); - mcheader->putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]); - mcheader->putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]); - mcheader->putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]); - mcheader->putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]); - mcheader->putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]); - mcheader->putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale); - mcheader->putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]); - mcheader->putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]); + mcHeader.putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]); + mcHeader.putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]); + mcHeader.putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]); + mcHeader.putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]); + mcHeader.putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]); + mcHeader.putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]); + mcHeader.putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale); + mcHeader.putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]); + mcHeader.putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]); } auto heavyIon = event.heavy_ion(); if (heavyIon != nullptr) { - mcheader->putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard); - mcheader->putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj); - mcheader->putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ); - mcheader->putInfo(MCInfoKeys::nColl, heavyIon->Ncoll); - mcheader->putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions); - mcheader->putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions); - mcheader->putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions); - mcheader->putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n); - mcheader->putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p); - mcheader->putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n); - mcheader->putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p); - mcheader->putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter); - mcheader->putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle); - mcheader->putInfo("eccentricity", (float)heavyIon->eccentricity); - mcheader->putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN); - mcheader->putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality); + mcHeader.putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard); + mcHeader.putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj); + mcHeader.putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ); + mcHeader.putInfo(MCInfoKeys::nColl, heavyIon->Ncoll); + mcHeader.putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions); + mcHeader.putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions); + mcHeader.putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions); + mcHeader.putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n); + mcHeader.putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p); + mcHeader.putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n); + mcHeader.putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p); + mcHeader.putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter); + mcHeader.putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle); + mcHeader.putInfo("eccentricity", (float)heavyIon->eccentricity); + mcHeader.putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN); + mcHeader.putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality); } auto particles = event.particles(); @@ -138,7 +131,7 @@ struct O2simHepmcPublisher { auto has_children = children.size() > 0; auto p = particle->momentum(); auto v = particle->production_vertex(); - mctracks->emplace_back( + mcTracks.emplace_back( particle->pid(), has_parents ? parents.front()->id() : -1, has_parents ? parents.back()->id() : -1, has_children ? children.front()->id() : -1, has_children ? children.back()->id() : -1, @@ -146,13 +139,18 @@ struct O2simHepmcPublisher { v->position().x(), v->position().y(), v->position().z(), v->position().t(), 0); } + + // add to the message + pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcHeader); + pc.outputs().snapshot(Output{"MC", "MCTRACKS", 0}, mcTracks); + mcTracks.clear(); ++eventCounter; } // report number of TFs injected for the rate limiter to work ++tfCounter; pc.services().get().send(o2::monitoring::Metric{(uint64_t)tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL)); - if (eos || (maxEvents > 0 && eventCounter >= maxEvents)) { + if (eos || (maxEvents > 0 && eventCounter == maxEvents)) { pc.services().get().endOfStream(); pc.services().get().readyToQuit(QuitRequest::Me); } diff --git a/run/o2sim_kine_publisher.cxx b/run/o2sim_kine_publisher.cxx index 5920743c3fafa..cfbea6ae02a5f 100644 --- a/run/o2sim_kine_publisher.cxx +++ b/run/o2sim_kine_publisher.cxx @@ -40,8 +40,7 @@ struct O2simKinePublisher { void run(o2::framework::ProcessingContext& pc) { - auto batch = std::min((int)aggregate, nEvents - eventCounter); - for (auto i = 0; i < batch; ++i) { + for (auto i = 0; i < std::min((int)aggregate, nEvents - eventCounter); ++i) { auto mcevent = mcKinReader->getMCEventHeader(0, eventCounter); auto mctracks = mcKinReader->getTracks(0, eventCounter); pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcevent); diff --git a/run/o2sim_mctracks_to_aod.cxx b/run/o2sim_mctracks_to_aod.cxx index d95a3b33cc38f..124e8aa7b3e42 100644 --- a/run/o2sim_mctracks_to_aod.cxx +++ b/run/o2sim_mctracks_to_aod.cxx @@ -70,7 +70,7 @@ struct MctracksToAod { /** Run the conversion */ void run(o2::framework::ProcessingContext& pc) { - LOG(detail) << "=== Running extended MC AOD exporter ==="; + LOG(debug) << "=== Running extended MC AOD exporter ==="; using namespace o2::aodmchelpers; using McHeader = o2::dataformats::MCEventHeader; using McTrack = o2::MCTrack; @@ -94,13 +94,13 @@ struct MctracksToAod { // TODO: include BC simulation auto bcCounter = 0UL; size_t offset = 0; - LOG(detail) << "--- Loop over " << nParts << " parts ---"; + LOG(debug) << "--- Loop over " << nParts << " parts ---"; for (auto i = 0U; i < nParts; ++i) { auto record = mSampler.generateCollisionTime(); auto header = pc.inputs().get("mcheader", i); auto tracks = pc.inputs().get("mctracks", i); - LOG(detail) << "Updating collision table"; + LOG(debug) << "Updating collision table"; auto genID = updateMCCollisions(mCollisions.cursor, bcCounter, record.timeInBCNS * 1.e-3, @@ -108,12 +108,12 @@ struct MctracksToAod { 0, i); - LOG(detail) << "Updating HepMC tables"; + LOG(debug) << "Updating HepMC tables"; updateHepMCXSection(mXSections.cursor, bcCounter, genID, *header); updateHepMCPdfInfo(mPdfInfos.cursor, bcCounter, genID, *header); updateHepMCHeavyIon(mHeavyIons.cursor, bcCounter, genID, *header); - LOG(detail) << "Updating particles table"; + LOG(debug) << "Updating particles table"; TrackToIndex preselect; offset = updateParticles(mParticles.cursor, bcCounter, @@ -123,7 +123,7 @@ struct MctracksToAod { (bool)filt, false); - LOG(detail) << "Increment BC counter"; + LOG(debug) << "Increment BC counter"; bcCounter++; }