diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index e968911fcca1..852613569801 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -349,7 +349,8 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } - if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { + // enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions. + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") } } diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 23dcd034120a..529ae5c087cd 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -244,6 +244,40 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { } } +func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "disable_portable_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + +func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "enable_streaming_java_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + func TestGetJobOptions_NoStagingLocation(t *testing.T) { resetGlobals() *stagingLocation = "" diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 0c23e6024dc6..58da4461593c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -591,6 +591,8 @@ def _add_runner_v2_missing_options(options): debug_options.add_experiment('use_unified_worker') debug_options.add_experiment('use_runner_v2') debug_options.add_experiment('use_portable_job_submission') + # enable_portable_runner is not added by default as it is not documented. + # This behavior will be fixed in later versions. def _check_and_add_missing_options(options): @@ -662,6 +664,8 @@ def _is_runner_v2_disabled(options): """Returns true if runner v2 is disabled.""" debug_options = options.view_as(DebugOptions) return ( + debug_options.lookup_experiment('disable_portable_runner') or + debug_options.lookup_experiment('enable_streaming_java_runner') or debug_options.lookup_experiment('disable_runner_v2') or debug_options.lookup_experiment('disable_runner_v2_until_2023') or debug_options.lookup_experiment('disable_runner_v2_until_v2.50') or diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index e1b8be6682f9..b3035d38c7c0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -41,6 +41,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options +from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.internal import names from apache_beam.runners.runner import PipelineState @@ -733,6 +734,25 @@ def test_explicit_streaming_no_unbounded(self): p.result.job.proto.type, apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING) + def test_runner_v2_disabled_experiments_raise(self): + disable_experiments = [ + 'disable_portable_runner', + 'enable_streaming_java_runner', + 'disable_runner_v2', + 'disable_runner_v2_until_2023', + 'disable_runner_v2_until_v2.50', + 'disable_prime_runner_v2', + ] + for experiment in disable_experiments: + options = PipelineOptions([f'--experiments={experiment}']) + self.assertTrue( + _is_runner_v2_disabled(options), + f'Expected {experiment} to disable runner v2') + with self.assertRaisesRegex( + ValueError, + 'Disabling Runner V2 no longer supported'): + DataflowRunner().run_pipeline(None, options) + if __name__ == '__main__': unittest.main()