Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Framework/Core/include/Framework/DataProcessingContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "Framework/DataRelayer.h"
#include "Framework/AlgorithmSpec.h"
#include <atomic>
#include <functional>

namespace o2::framework
Expand All @@ -33,7 +34,7 @@ struct DataProcessorContext {
DataProcessorContext(DataProcessorContext const&) = delete;
DataProcessorContext() = default;

bool allDone = false;
std::atomic<bool> allDone = false;
/// Latest run number we processed globally for this DataProcessor.
int64_t lastRunNumberProcessed = -1;

Expand All @@ -43,7 +44,6 @@ struct DataProcessorContext {

// FIXME: move stuff here from the list below... ;-)
ServiceRegistry* registry = nullptr;
std::vector<DataRelayer::RecordAction> completed;
std::vector<ExpirationHandler> expirationHandlers;
AlgorithmSpec::InitCallback init;
AlgorithmSpec::ProcessCallback statefulProcess;
Expand Down
5 changes: 5 additions & 0 deletions Framework/Core/include/Framework/StreamContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#ifndef O2_FRAMEWORK_STREAMCONTEXT_H_
#define O2_FRAMEWORK_STREAMCONTEXT_H_

#include "Framework/DataRelayer.h"
#include "Framework/ServiceHandle.h"
#include "ProcessingContext.h"
#include "ServiceSpec.h"
Expand Down Expand Up @@ -64,6 +65,10 @@ struct StreamContext {
// the callback will be called for all of them.
std::vector<ServiceStartStreamHandle> preStartStreamHandles;

/// Per-stream list of actions ready to be dispatched. Populated by
/// getReadyToProcess() and consumed by tryDispatchComputation().
std::vector<DataRelayer::RecordAction> completed;

// Information on wether or not all the required routes have been created.
// This is used to check if the LifetimeTimeframe routes were all created
// for a given iteration.
Expand Down
29 changes: 16 additions & 13 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
// 99 is to execute DPL callbacks last
this->SubscribeToStateChange("99-dpl", stateWatcher);

// One task for now.
mStreams.resize(1);
mHandles.resize(1);
auto* poolSizeEnv = getenv("DPL_THREADPOOL_SIZE");
// 0 (or unset): synchronous execution on the main thread.
// N > 0: N concurrent async streams; I/O runs on the main thread while
// computation runs on N pool threads.
size_t numStreams = poolSizeEnv ? std::max(0, std::atoi(poolSizeEnv)) : 0;
mStreams.resize(std::max(numStreams, 1UL));
mHandles.resize(std::max(numStreams, 1UL));

ServiceRegistryRef ref{mServiceRegistry};

Expand Down Expand Up @@ -1210,10 +1214,8 @@ void DataProcessingDevice::Run()
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");

bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
if (dplEnableMultithreding) {
setenv("UV_THREADPOOL_SIZE", "1", 1);
}
auto* poolSizeEnv = getenv("DPL_THREADPOOL_SIZE");
bool dplEnableMultithreding = poolSizeEnv && std::atoi(poolSizeEnv) > 0;

while (state.transitionHandling != TransitionHandlingState::Expired) {
if (state.nextFairMQState.empty() == false) {
Expand Down Expand Up @@ -1634,6 +1636,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
void DataProcessingDevice::doRun(ServiceRegistryRef ref)
{
auto& context = ref.get<DataProcessorContext>();
auto& streamContext = ref.get<StreamContext>();
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
auto& state = ref.get<DeviceState>();
auto& spec = ref.get<DeviceSpec const>();
Expand All @@ -1642,9 +1645,9 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
return;
}

context.completed.clear();
context.completed.reserve(16);
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
streamContext.completed.clear();
streamContext.completed.reserve(16);
if (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed)) {
state.lastActiveDataProcessor.store(&context);
}
DanglingContext danglingContext{*context.registry};
Expand All @@ -1658,8 +1661,8 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
state.lastActiveDataProcessor = &context;
}

context.completed.clear();
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
streamContext.completed.clear();
if (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed)) {
state.lastActiveDataProcessor = &context;
}

Expand All @@ -1685,7 +1688,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)

bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;

while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
while (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed) && shouldProcess) {
relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
}

Expand Down