Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.maxmind.geoip2.DatabaseReader;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.OpenOptions;
Expand Down Expand Up @@ -45,11 +44,10 @@ public DatabaseReaderFactory(GreenbidsRealTimeDataProperties properties, Vertx v
}

@Override
public void initialize(Promise<Void> initializePromise) {
downloadAndExtract()
public Future<Void> initialize() {
return downloadAndExtract()
.onSuccess(databaseReaderRef::set)
.<Void>mapEmpty()
.onComplete(initializePromise);
.mapEmpty();
}

private Future<DatabaseReader> downloadAndExtract() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
Expand Down Expand Up @@ -97,9 +96,9 @@ public AgmaAnalyticsReporter(AgmaAnalyticsProperties agmaAnalyticsProperties,
}

@Override
public void initialize(Promise<Void> initializePromise) {
public Future<Void> initialize() {
vertx.setPeriodic(bufferTimeoutMs, ignored -> sendEvents(buffer.pollAll()));
initializePromise.complete();
return Future.succeededFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.prebid.server.analytics.reporter.pubstack;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
Expand Down Expand Up @@ -116,9 +115,9 @@ public String name() {
}

@Override
public void initialize(Promise<Void> initializePromise) {
public Future<Void> initialize() {
vertx.setPeriodic(configurationRefreshDelay, id -> fetchRemoteConfig());
fetchRemoteConfig().onComplete(initializePromise);
return fetchRemoteConfig();
}

void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.iab.openrtb.request.BidRequest;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
Expand Down Expand Up @@ -68,7 +67,7 @@ public CurrencyConversionService(ExternalConversionProperties externalConversion
* Must be called on Vertx event loop thread.
*/
@Override
public void initialize(Promise<Void> initializePromise) {
public Future<Void> initialize() {
if (externalConversionProperties != null) {
final Long refreshPeriod = externalConversionProperties.getRefreshPeriodMs();
final Long defaultTimeout = externalConversionProperties.getDefaultTimeoutMs();
Expand All @@ -84,7 +83,7 @@ public void initialize(Promise<Void> initializePromise) {
externalConversionProperties.getMetrics().createCurrencyRatesGauge(this::isRatesStale);
}

initializePromise.tryComplete();
return Future.succeededFuture();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.prebid.server.settings.service;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.execution.timeout.Timeout;
Expand Down Expand Up @@ -104,12 +103,12 @@ public DatabasePeriodicRefreshService(String initQuery,
}

@Override
public void initialize(Promise<Void> initializePromise) {
public Future<Void> initialize() {
getAll();
if (refreshPeriod > 0) {
vertx.setPeriodic(refreshPeriod, aLong -> refresh());
}
initializePromise.tryComplete();
return Future.succeededFuture();
}

private void getAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.prebid.server.exception.PreBidException;
import org.prebid.server.json.DecodeException;
Expand Down Expand Up @@ -91,13 +90,13 @@ public HttpPeriodicRefreshService(String refreshUrl,
}

@Override
public void initialize(Promise<Void> initializePromise) {
public Future<Void> initialize() {
getAll();
if (refreshPeriod > 0) {
vertx.setPeriodic(refreshPeriod, aLong -> refresh());
}

initializePromise.tryComplete();
return Future.succeededFuture();
}

private void getAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.prebid.server.auction.model.Tuple2;
import org.prebid.server.log.Logger;
Expand Down Expand Up @@ -73,15 +72,13 @@ public S3PeriodicRefreshService(S3AsyncClient asyncClient,
}

@Override
public void initialize(Promise<Void> initializePromise) {
fetchStoredDataResult(clock.millis(), MetricName.initialize)
.<Void>mapEmpty()
.onComplete(initializePromise);

public Future<Void> initialize() {
if (refreshPeriod > 0) {
logger.info("Starting s3 periodic refresh for " + cacheType + " every " + refreshPeriod + " s");
vertx.setPeriodic(refreshPeriod, ignored -> fetchStoredDataResult(clock.millis(), MetricName.update));
}

return fetchStoredDataResult(clock.millis(), MetricName.initialize).mapEmpty();
}

private Future<StoredDataResult<String>> fetchStoredDataResult(long startTime, MetricName metricName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.prebid.server.util.system;

import io.vertx.core.Promise;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import org.prebid.server.vertx.Initializable;
import oshi.SystemInfo;
Expand Down Expand Up @@ -31,10 +31,10 @@ public CpuLoadAverageStats(Vertx vertx, long measurementIntervalMillis) {
}

@Override
public void initialize(Promise<Void> initializePromise) {
public Future<Void> initialize() {
measureCpuLoad();
vertx.setPeriodic(measurementIntervalMillis, timerId -> measureCpuLoad());
initializePromise.tryComplete();
return Future.succeededFuture();
}

private void measureCpuLoad() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/prebid/server/vertx/Initializable.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.prebid.server.vertx;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;

/**
* Denotes components requiring initialization after they have been created.
Expand All @@ -12,5 +12,5 @@
@FunctionalInterface
public interface Initializable {

void initialize(Promise<Void> initializePromise);
Future<Void> initialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import org.prebid.server.vertx.CloseableAdapter;
import org.prebid.server.vertx.Initializable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;

public class DaemonVerticle extends AbstractVerticle {
Expand All @@ -33,31 +31,26 @@ public DaemonVerticle(List<Initializable> initializables, List<ScheduledReporter

@Override
public void start(Promise<Void> startPromise) {
all(initializables, initializable -> initializable::initialize).onComplete(startPromise);
all(initializables, Initializable::initialize, true).onComplete(startPromise);
}

@Override
public void stop(Promise<Void> stopPromise) {
all(closeables, closeable -> closeable::close).onComplete(stopPromise);
all(closeables, closeable -> Future.future(closeable::close), false).onComplete(stopPromise);
}

private static <E> Future<Void> all(Collection<E> entries,
Function<E, Consumer<Promise<Void>>> entryToPromiseConsumerMapper) {
private static <E> Future<Void> all(
Collection<E> entries,
Function<E, Future<Void>> entryToFutureMapper,
boolean start) {

final List<Future<Void>> entriesFutures = new ArrayList<>();

for (E entry : entries) {
final Promise<Void> entryPromise = Promise.promise();
entriesFutures.add(entryPromise.future());

entryToPromiseConsumerMapper.apply(entry).accept(entryPromise);
}

return Future.all(entriesFutures)
return Future.all(entries.stream().map(entryToFutureMapper).toList())
.onSuccess(r -> logger.info(
"Successfully started {} instance on thread: {}",
"Successfully {} {} instance on thread: {}",
start ? "started" : "stopped",
DaemonVerticle.class.getSimpleName(),
Thread.currentThread().getName()))
.mapEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void initializeShouldFetchConfigAndSetPeriodicTimerForConfigUpdate() thro
Future.succeededFuture(HttpClientResponse.of(200, null, mapper.writeValueAsString(pubstackConfig))));

// when
pubstackAnalyticsReporter.initialize(Promise.promise());
pubstackAnalyticsReporter.initialize();

// then
verify(vertx).setPeriodic(anyLong(), any());
Expand All @@ -109,17 +108,16 @@ public void initializeShouldFailUpdateSendBuffersAndSetTimerWhenEndpointFromRemo
throws JsonProcessingException {

// given
final Promise<Void> promise = Promise.promise();
final PubstackConfig pubstackConfig = PubstackConfig.of("newScopeId", "invalid",
Collections.singletonMap(EventType.auction, true));
given(httpClient.get(anyString(), anyLong())).willReturn(
Future.succeededFuture(HttpClientResponse.of(200, null, mapper.writeValueAsString(pubstackConfig))));

// when
pubstackAnalyticsReporter.initialize(promise);
final Future<Void> result = pubstackAnalyticsReporter.initialize();

// then
assertThatThrownBy(() -> promise.future().await(5, TimeUnit.SECONDS))
assertThatThrownBy(() -> result.await(5, TimeUnit.SECONDS))
.hasMessage("[pubstack] Failed to create event report url for endpoint: invalid")
.isInstanceOf(PreBidException.class);
verify(auctionHandler).reportEvents();
Expand All @@ -138,8 +136,8 @@ public void initializeShouldNotUpdateEventsIfFetchedConfigIsSameAsPrevious() thr
Future.succeededFuture(HttpClientResponse.of(200, null, mapper.writeValueAsString(pubstackConfig))));

// when
pubstackAnalyticsReporter.initialize(Promise.promise());
pubstackAnalyticsReporter.initialize(Promise.promise());
pubstackAnalyticsReporter.initialize();
pubstackAnalyticsReporter.initialize();

// then
verify(httpClient, times(2)).get(anyString(), anyLong());
Expand All @@ -157,7 +155,7 @@ public void initializeShouldNotSendEventsAndUpdateConfigsWhenResponseStatusIsNot
Future.succeededFuture(HttpClientResponse.of(400, null, null)));

// when
pubstackAnalyticsReporter.initialize(Promise.promise());
pubstackAnalyticsReporter.initialize();

// then
verify(vertx).setPeriodic(anyLong(), any());
Expand All @@ -173,7 +171,7 @@ public void initializeShouldNotSendEventsAndUpdateConfigsWhenCantParseResponse()
Future.succeededFuture(HttpClientResponse.of(200, null, "{\"endpoint\" : {}}")));

// when
pubstackAnalyticsReporter.initialize(Promise.promise());
pubstackAnalyticsReporter.initialize();

// then
verify(vertx).setPeriodic(anyLong(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.iab.openrtb.request.BidRequest;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -397,7 +396,7 @@ private CurrencyConversionService createInitializedService(String url,
clock,
jacksonMapper));

currencyService.initialize(Promise.promise());
currencyService.initialize();

return currencyService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -156,7 +155,7 @@ private void createAndInitService(long refresh) {
metrics,
clock);

databasePeriodicRefreshService.initialize(Promise.promise());
databasePeriodicRefreshService.initialize();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -174,7 +173,7 @@ private static void createAndInitService(CacheNotificationListener<String> notif

final HttpPeriodicRefreshService httpPeriodicRefreshService = new HttpPeriodicRefreshService(
url, refreshPeriod, timeout, notificationListener, vertx, httpClient, jacksonMapper);
httpPeriodicRefreshService.initialize(Promise.promise());
httpPeriodicRefreshService.initialize();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.prebid.server.settings.service;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
Expand Down Expand Up @@ -169,8 +168,6 @@ private Future<Void> createAndInitService(long refreshPeriod) {
metrics,
vertx);

final Promise<Void> init = Promise.promise();
s3PeriodicRefreshService.initialize(init);
return init.future();
return s3PeriodicRefreshService.initialize();
}
}