-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Adds backlog reporting support for non-fnapi based SDF's. #38346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
a8ed2f2
71ffc9a
2f06938
43222da
dcb239b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| { | ||
| "comment": "Modify this file in a trivial way to cause this test suite to run!", | ||
| "modification": 2, | ||
| "modification": 1, | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -278,8 +278,109 @@ public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { | |
| if (residual == null) { | ||
| return new Result(null, cont, null, null); | ||
| } | ||
| final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residualForGetSize = residual; | ||
| // For a list of all DoFnInvoker arguments, see DoFn.java. | ||
| double backlogBytes = | ||
| invoker.invokeGetSize( | ||
| new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { | ||
| @Override | ||
| public String getErrorContext() { | ||
| return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName() | ||
| + "/GetSize"; | ||
| } | ||
|
|
||
| @Override | ||
| public InputT element(DoFn<InputT, OutputT> doFn) { | ||
| return element.getValue(); | ||
| } | ||
|
|
||
| @Override | ||
| public Object restriction() { | ||
| return residualForGetSize.getKey(); | ||
| } | ||
|
|
||
| @Override | ||
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { | ||
| return element.getTimestamp(); | ||
| } | ||
|
|
||
| @Override | ||
| public RestrictionTracker<?, ?> restrictionTracker() { | ||
| return invoker.invokeNewTracker( | ||
| new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { | ||
| @Override | ||
| public String getErrorContext() { | ||
| return OutputAndTimeBoundedSplittableProcessElementInvoker.class | ||
| .getSimpleName() | ||
| + "/NewTracker"; | ||
| } | ||
|
|
||
| @Override | ||
| public InputT element(DoFn<InputT, OutputT> doFn) { | ||
| return element.getValue(); | ||
| } | ||
|
|
||
| @Override | ||
| public Object restriction() { | ||
| return residualForGetSize.getKey(); | ||
| } | ||
|
|
||
| @Override | ||
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { | ||
| return element.getTimestamp(); | ||
| } | ||
|
|
||
| @Override | ||
| public BoundedWindow window() { | ||
| throw new IllegalStateException( | ||
| "Attempting to access window outside of a windowed context"); | ||
| } | ||
|
|
||
| @Override | ||
| public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { | ||
| throw new IllegalStateException( | ||
| "Attempting to access PaneInfo outside of a windowed context"); | ||
| } | ||
|
|
||
| @Override | ||
| public PipelineOptions pipelineOptions() { | ||
| return pipelineOptions; | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public BoundedWindow window() { | ||
| throw new IllegalStateException( | ||
| "Attempting to access window outside of a windowed context"); | ||
| } | ||
|
|
||
| @Override | ||
| public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { | ||
| throw new IllegalStateException( | ||
| "Attempting to access PaneInfo outside of a windowed context"); | ||
| } | ||
|
|
||
| @Override | ||
| public PipelineOptions pipelineOptions() { | ||
| return pipelineOptions; | ||
| } | ||
|
|
||
| @Override | ||
| public Object sideInput(String tagId) { | ||
| PCollectionView<?> view = sideInputMapping.get(tagId); | ||
| if (view == null) { | ||
| throw new IllegalArgumentException("calling getSideInput() with unknown view"); | ||
| } | ||
| return processContext.sideInput(view); | ||
| } | ||
| }); | ||
| return new Result( | ||
| residual.getKey(), cont, residual.getValue().getKey(), residual.getValue().getValue()); | ||
| residual.getKey(), | ||
| cont, | ||
| residual.getValue().getKey(), | ||
| residual.getValue().getValue(), | ||
| backlogBytes); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We expect to get this information only during finishBundle ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not during finishBundle, but rather in processElement after we've finished processing the restriction. The idea is that we want to get the backlog (work remaining) of the residual restriction after we've finished processing the bundle. I don't think you can call tryClaim or otherwise change the restriction in finishBundle (since it requires keyed state and whatnot).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we do any form of validation of the value returned here before sending it to the runner ? For example, ignore if negative or zero(wrong implementation but we probably don't want to pass that to the runner).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I decided to put the validation in the Dataflow execution context that way other runners could decide how they wanted to handle these edge cases. Thoughts? |
||
| } | ||
|
|
||
| private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -95,6 +95,30 @@ public OffsetRange getInitialRestriction(@SuppressWarnings("unused") @Element Vo | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| private static class GetSizeFn extends DoFn<Void, String> { | ||||||
| @ProcessElement | ||||||
| public ProcessContinuation process( | ||||||
| ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) { | ||||||
| for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { | ||||||
| c.output(String.valueOf(i)); | ||||||
| if (i == 2) { | ||||||
| return resume(); | ||||||
| } | ||||||
| } | ||||||
| return stop(); | ||||||
| } | ||||||
|
|
||||||
| @GetInitialRestriction | ||||||
| public OffsetRange getInitialRestriction() { | ||||||
| return new OffsetRange(0, 10); | ||||||
| } | ||||||
|
|
||||||
| @GetSize | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like implementing GetSize is optional:
Have we considered the case where this is not implemented ? Also, let's add a test for this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If GetSize isn't defined, we call this default GetSize method, which calls getProgress on the restriction tracker: Line 530 in b60082d
I have a SdfWithoutGetSize ParDo in SplittableParDoProcessFnTest.java that tests this works. |
||||||
| public double getSize(@Restriction OffsetRange range) { | ||||||
| return range.getTo() - range.getFrom(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest( | ||||||
| int totalNumOutputs, | ||||||
| Duration sleepBeforeFirstClaim, | ||||||
|
|
@@ -103,11 +127,12 @@ private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.R | |||||
| throws Exception { | ||||||
| SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, sleepBeforeEachOutput); | ||||||
| OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs); | ||||||
| return runTest(fn, initialRestriction); | ||||||
| return runTest(fn, initialRestriction, Duration.standardSeconds(3)); | ||||||
| } | ||||||
|
|
||||||
| private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest( | ||||||
| DoFn<Void, String> fn, OffsetRange initialRestriction) throws Exception { | ||||||
| DoFn<Void, String> fn, OffsetRange initialRestriction, Duration checkpointDuration) | ||||||
| throws Exception { | ||||||
| SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void> invoker = | ||||||
| new OutputAndTimeBoundedSplittableProcessElementInvoker<>( | ||||||
| fn, | ||||||
|
|
@@ -122,7 +147,7 @@ public <OutputT> void output(TupleTag<OutputT> tag, WindowedValue<OutputT> outpu | |||||
| NullSideInputReader.empty(), | ||||||
| Executors.newSingleThreadScheduledExecutor(), | ||||||
| 1000, | ||||||
| Duration.standardSeconds(3), | ||||||
| checkpointDuration, | ||||||
| () -> { | ||||||
| throw new UnsupportedOperationException("BundleFinalizer not configured for test."); | ||||||
| }); | ||||||
|
|
@@ -215,7 +240,7 @@ public OffsetRange getInitialRestriction( | |||||
| } | ||||||
| }; | ||||||
| e.expectMessage("Output is not allowed before tryClaim()"); | ||||||
| runTest(brokenFn, new OffsetRange(0, 5)); | ||||||
| runTest(brokenFn, new OffsetRange(0, 5), Duration.standardSeconds(3)); | ||||||
| } | ||||||
|
|
||||||
| @Test | ||||||
|
|
@@ -235,6 +260,18 @@ public OffsetRange getInitialRestriction( | |||||
| } | ||||||
| }; | ||||||
| e.expectMessage("Output is not allowed after a failed tryClaim()"); | ||||||
| runTest(brokenFn, new OffsetRange(0, 5)); | ||||||
| runTest(brokenFn, new OffsetRange(0, 5), Duration.standardSeconds(3)); | ||||||
| } | ||||||
|
|
||||||
| @Test | ||||||
| public void testBacklogBytes() throws Exception { | ||||||
| GetSizeFn fn = new GetSizeFn(); | ||||||
| OffsetRange initialRestriction = new OffsetRange(0, 10); | ||||||
| // Set a high checkpoint duration to prevent flakiness caused by early checkpointing. | ||||||
| SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result res = | ||||||
| runTest(fn, initialRestriction, Duration.standardMinutes(3)); | ||||||
| // GetSizeFn claims 3 elements and then takes a checkpoint. | ||||||
| assertEquals(7.0, res.getBacklogBytes(), 0.001); | ||||||
| assertEquals(new OffsetRange(3, 10), res.getResidualRestriction()); | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about creating a util to get information from the residual instead of creating an inline class here. Probably also refactor other similar places if any.