Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,20 @@ def run_storage_write_test(
hamcrest_assert(p, bq_matcher)

def test_all_types(self):
self.skip_if_not_dataflow_runner()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The method skip_if_not_dataflow_runner is called here, but it is not defined in this class or file, unlike in the other test files modified in this PR. Additionally, self._runner is not initialized in this class. This will lead to an AttributeError when the tests are executed. Please ensure the method and its required state are correctly implemented or inherited.

table_name = "all_types"
schema = self.ALL_TYPES_SCHEMA
self.run_storage_write_test(table_name, self.ELEMENTS, schema)

def test_with_at_least_once_semantics(self):
self.skip_if_not_dataflow_runner()
table_name = "with_at_least_once_semantics"
schema = self.ALL_TYPES_SCHEMA
self.run_storage_write_test(
table_name, self.ELEMENTS, schema, use_at_least_once=True)

def test_nested_records_and_lists(self):
self.skip_if_not_dataflow_runner()
table_name = "nested_records_and_lists"
schema = {
"fields": [{
Expand Down Expand Up @@ -281,6 +284,7 @@ def test_nested_records_and_lists(self):
self.run_storage_write_test(table_name, items, schema)

def test_write_with_beam_rows(self):
self.skip_if_not_dataflow_runner()
table = 'write_with_beam_rows'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

Expand Down Expand Up @@ -308,6 +312,7 @@ def test_write_with_beam_rows(self):
hamcrest_assert(p, bq_matcher)

def test_write_with_clustering(self):
self.skip_if_not_dataflow_runner()
table = 'write_with_clustering'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

Expand Down Expand Up @@ -338,6 +343,7 @@ def test_write_with_clustering(self):
hamcrest_assert(p, bq_matcher)

def test_write_with_beam_rows_cdc(self):
self.skip_if_not_dataflow_runner()
table = 'write_with_beam_rows_cdc'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

Expand Down Expand Up @@ -378,6 +384,7 @@ def test_write_with_beam_rows_cdc(self):
hamcrest_assert(p, bq_matcher)

def test_write_with_dicts_cdc(self):
self.skip_if_not_dataflow_runner()
table = 'write_with_dicts_cdc'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

Expand Down Expand Up @@ -460,6 +467,7 @@ def test_write_with_dicts_cdc(self):
hamcrest_assert(p, bq_matcher)

def test_write_to_dynamic_destinations(self):
self.skip_if_not_dataflow_runner()
base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id)
spec_with_project = '{}:{}'.format(self.project, base_table_spec)
tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS]
Expand All @@ -484,6 +492,7 @@ def test_write_to_dynamic_destinations(self):
hamcrest_assert(p, all_of(*bq_matchers))

def test_write_to_dynamic_destinations_with_beam_rows(self):
self.skip_if_not_dataflow_runner()
base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id)
spec_with_project = '{}:{}'.format(self.project, base_table_spec)
tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS]
Expand Down Expand Up @@ -570,11 +579,13 @@ def test_streaming_with_auto_sharding(self):
self.run_streaming(table_name=table)

def test_streaming_with_at_least_once(self):
self.skip_if_not_dataflow_runner()
table = 'streaming_with_at_least_once'
self.run_streaming(table_name=table, use_at_least_once=True)

def test_write_with_big_lake_configuration(self):
"""Test BigQuery Storage Write API with BigLake configuration."""
self.skip_if_not_dataflow_runner()
table = 'write_with_big_lake_config'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

Expand Down Expand Up @@ -609,6 +620,7 @@ def test_write_with_big_lake_configuration(self):
self.assert_iceberg_tables_created(table, big_lake_config['storageUri'])

def test_write_with_managed_transform(self):
self.skip_if_not_dataflow_runner()
table = 'write_with_managed_transform'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

Expand Down
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.options.pipeline_options import PipelineOptions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This import is unnecessary if self.test_pipeline.get_option('runner') is used instead of manually instantiating PipelineOptions in setUp.

from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
Expand All @@ -54,8 +55,10 @@ class BigQueryGeographyIntegrationTests(unittest.TestCase):

def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')
self._runner = PipelineOptions(self.args).get_all_options()['runner']
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of manually parsing PipelineOptions, use self.test_pipeline.get_option('runner'). This is safer and more idiomatic. Furthermore, self.runner_name is already initialized on line 59 and could be used for the runner check, making self._runner and the additional self.args assignment on line 58 redundant.

Suggested change
self._runner = PipelineOptions(self.args).get_all_options()['runner']
self._runner = self.test_pipeline.get_option('runner')


self.bigquery_client = BigQueryWrapper()
self.dataset_id = '%s%d%s' % (
Expand Down Expand Up @@ -435,6 +438,7 @@ def test_geography_complex_geometries(self):
"indicating that jars have not been built")
def test_geography_storage_write_api(self):
"""Test GEOGRAPHY with Storage Write API method."""
self.skip_if_not_dataflow_runner()
table_name = 'geography_storage_write'
table_id = '{}.{}'.format(self.dataset_id, table_name)

Expand Down Expand Up @@ -485,6 +489,14 @@ def test_geography_storage_write_api(self):

hc.assert_that(p, hc.all_of(*pipeline_verifiers))

def skip_if_not_dataflow_runner(self) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The skip_if_not_dataflow_runner method is duplicated in sdks/python/apache_beam/io/gcp/bigtableio_it_test.py. Consider refactoring this logic into a shared utility module or a common base class for integration tests to improve maintainability.

# skip if dataflow runner is not specified
if not self._runner or "dataflowrunner" not in self._runner.lower():
self.skipTest(
"Streaming with exactly-once route has the requirement "
"`beam:requirement:pardo:on_window_expiration:v1`, "
"which is currently only supported by the Dataflow runner")

@pytest.mark.it_postcommit
def test_geography_file_loads_method(self):
"""Test GEOGRAPHY with FILE_LOADS method."""
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import apache_beam as beam
from apache_beam.io.gcp import bigtableio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
Expand Down Expand Up @@ -74,6 +75,7 @@ def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.project = self.test_pipeline.get_option('project')
self._runner = PipelineOptions(self.args).get_all_options()['runner']
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Directly accessing 'runner' from the options dictionary can be unsafe. It is recommended to use self.test_pipeline.get_option('runner'), which handles missing options gracefully and is more consistent with Beam's test patterns.

Suggested change
self._runner = PipelineOptions(self.args).get_all_options()['runner']
self._runner = self.test_pipeline.get_option('runner')


instance_id = instance_prefix(self.INSTANCE)

Expand Down Expand Up @@ -138,7 +140,16 @@ def add_rows(self, num_rows, num_families, num_columns_per_family):

return cells

def skip_if_not_dataflow_runner(self) -> bool:
# skip if dataflow runner is not specified
if not self._runner or "dataflowrunner" not in self._runner.lower():
self.skipTest(
"Streaming with exactly-once route has the requirement "
"`beam:requirement:pardo:on_window_expiration:v1`, "
"which is currently only supported by the Dataflow runner")

def test_read_xlang(self):
self.skip_if_not_dataflow_runner()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also read test should not be affected

# create rows and retrieve expected cells
expected_cells = self.add_rows(
num_rows=5, num_families=3, num_columns_per_family=4)
Expand Down
Loading