From 0da6df3c5d3f9af7282d35a40e3e5b456380489f Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 29 Apr 2026 18:59:32 +0400 Subject: [PATCH 1/3] Skip tests --- .../io/external/xlang_bigqueryio_it_test.py | 12 ++++++++++++ .../apache_beam/io/gcp/bigquery_geography_it_test.py | 9 +++++++++ sdks/python/apache_beam/io/gcp/bigtableio_it_test.py | 9 +++++++++ 3 files changed, 30 insertions(+) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index bc012bd7be9d..c62c8d534d55 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -228,17 +228,20 @@ def run_storage_write_test( hamcrest_assert(p, bq_matcher) def test_all_types(self): + self.skip_if_not_dataflow_runner() table_name = "all_types" schema = self.ALL_TYPES_SCHEMA self.run_storage_write_test(table_name, self.ELEMENTS, schema) def test_with_at_least_once_semantics(self): + self.skip_if_not_dataflow_runner() table_name = "with_at_least_once_semantics" schema = self.ALL_TYPES_SCHEMA self.run_storage_write_test( table_name, self.ELEMENTS, schema, use_at_least_once=True) def test_nested_records_and_lists(self): + self.skip_if_not_dataflow_runner() table_name = "nested_records_and_lists" schema = { "fields": [{ @@ -281,6 +284,7 @@ def test_nested_records_and_lists(self): self.run_storage_write_test(table_name, items, schema) def test_write_with_beam_rows(self): + self.skip_if_not_dataflow_runner() table = 'write_with_beam_rows' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) @@ -308,6 +312,7 @@ def test_write_with_beam_rows(self): hamcrest_assert(p, bq_matcher) def test_write_with_clustering(self): + self.skip_if_not_dataflow_runner() table = 'write_with_clustering' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) @@ -338,6 +343,7 @@ def test_write_with_clustering(self): hamcrest_assert(p, bq_matcher) def test_write_with_beam_rows_cdc(self): + self.skip_if_not_dataflow_runner() table = 'write_with_beam_rows_cdc' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) @@ -378,6 +384,7 @@ def test_write_with_beam_rows_cdc(self): hamcrest_assert(p, bq_matcher) def test_write_with_dicts_cdc(self): + self.skip_if_not_dataflow_runner() table = 'write_with_dicts_cdc' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) @@ -460,6 +467,7 @@ def test_write_with_dicts_cdc(self): hamcrest_assert(p, bq_matcher) def test_write_to_dynamic_destinations(self): + self.skip_if_not_dataflow_runner() base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) spec_with_project = '{}:{}'.format(self.project, base_table_spec) tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] @@ -484,6 +492,7 @@ def test_write_to_dynamic_destinations(self): hamcrest_assert(p, all_of(*bq_matchers)) def test_write_to_dynamic_destinations_with_beam_rows(self): + self.skip_if_not_dataflow_runner() base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) spec_with_project = '{}:{}'.format(self.project, base_table_spec) tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] @@ -570,11 +579,13 @@ def test_streaming_with_auto_sharding(self): self.run_streaming(table_name=table) def test_streaming_with_at_least_once(self): + self.skip_if_not_dataflow_runner() table = 'streaming_with_at_least_once' self.run_streaming(table_name=table, use_at_least_once=True) def test_write_with_big_lake_configuration(self): """Test BigQuery Storage Write API with BigLake configuration.""" + self.skip_if_not_dataflow_runner() table = 'write_with_big_lake_config' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) @@ -609,6 +620,7 @@ def test_write_with_big_lake_configuration(self): self.assert_iceberg_tables_created(table, big_lake_config['storageUri']) def test_write_with_managed_transform(self): + self.skip_if_not_dataflow_runner() table = 'write_with_managed_transform' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index 1136d909f739..19d040b206f9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -435,6 +435,7 @@ def test_geography_complex_geometries(self): "indicating that jars have not been built") def test_geography_storage_write_api(self): """Test GEOGRAPHY with Storage Write API method.""" + self.skip_if_not_dataflow_runner() table_name = 'geography_storage_write' table_id = '{}.{}'.format(self.dataset_id, table_name) @@ -485,6 +486,14 @@ def test_geography_storage_write_api(self): hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + def skip_if_not_dataflow_runner(self) -> bool: + # skip if dataflow runner is not specified + if not self._runner or "dataflowrunner" not in self._runner.lower(): + self.skipTest( + "Streaming with exactly-once route has the requirement " + "`beam:requirement:pardo:on_window_expiration:v1`, " + "which is currently only supported by the Dataflow runner") + @pytest.mark.it_postcommit def test_geography_file_loads_method(self): """Test GEOGRAPHY with FILE_LOADS method.""" diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 27b910ad5f08..0a43544e974d 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -138,7 +138,16 @@ def add_rows(self, num_rows, num_families, num_columns_per_family): return cells + def skip_if_not_dataflow_runner(self) -> bool: + # skip if dataflow runner is not specified + if not self._runner or "dataflowrunner" not in self._runner.lower(): + self.skipTest( + "Streaming with exactly-once route has the requirement " + "`beam:requirement:pardo:on_window_expiration:v1`, " + "which is currently only supported by the Dataflow runner") + def test_read_xlang(self): + self.skip_if_not_dataflow_runner() # create rows and retrieve expected cells expected_cells = self.add_rows( num_rows=5, num_families=3, num_columns_per_family=4) From bf3a2c7f7bb7a978079ba42ab1df58ce4e3f27f4 Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 29 Apr 2026 20:24:10 +0400 Subject: [PATCH 2/3] Add _runner in tests --- sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py | 2 ++ sdks/python/apache_beam/io/gcp/bigtableio_it_test.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index 19d040b206f9..0aae7a4184dc 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -34,6 +34,7 @@ from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -56,6 +57,7 @@ def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.runner_name = type(self.test_pipeline.runner).__name__ self.project = self.test_pipeline.get_option('project') + self._runner = PipelineOptions(self.args).get_all_options()['runner'] self.bigquery_client = BigQueryWrapper() self.dataset_id = '%s%d%s' % ( diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 0a43544e974d..f8f79a9a2b5a 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -30,6 +30,7 @@ import apache_beam as beam from apache_beam.io.gcp import bigtableio +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -74,6 +75,7 @@ def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() self.project = self.test_pipeline.get_option('project') + self._runner = PipelineOptions(self.args).get_all_options()['runner'] instance_id = instance_prefix(self.INSTANCE) From 64159c0ce6c705db961002d76f7db44849796d7b Mon Sep 17 00:00:00 2001 From: Vitaly Terentyev Date: Wed, 29 Apr 2026 21:19:21 +0400 Subject: [PATCH 3/3] Fix args --- sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index 0aae7a4184dc..6ca4a99c1ddb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -55,6 +55,7 @@ class BigQueryGeographyIntegrationTests(unittest.TestCase): def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) + self.args = self.test_pipeline.get_full_options_as_args() self.runner_name = type(self.test_pipeline.runner).__name__ self.project = self.test_pipeline.get_option('project') self._runner = PipelineOptions(self.args).get_all_options()['runner']