diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml index bc60b8bde351..a34fa32b2614 100644 --- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml +++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml @@ -94,6 +94,7 @@ jobs: ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Stream.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_One_Hot_Encoding_Batch.txt # The env variables are created and populated in the test-arguments-action as "_test_arguments_" - name: get current time run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV @@ -214,3 +215,15 @@ jobs: -PpythonVersion=3.10 \ -PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \ '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_10 }} --autoscaling_algorithm=THROUGHPUT_BASED --max_num_workers=20 --metrics_table=result_table_row_inference_stream --influx_measurement=result_table_row_inference_stream --mode=streaming --input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark --window_size_sec=60 --trigger_interval_sec=30 --timeout_ms=900000 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs --job_name=benchmark-tests-table-row-inference-stream-${{env.NOW_UTC}}' + - name: run MLTransform One-Hot Encoding Batch + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.mltransform_one_hot_encoding_benchmark \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PbeamPythonExtra=ml_test \ + -PloadTest.requirementsTxtFile=apache_beam/ml/transforms/mltransform_tests_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_11 }} --autoscaling_algorithm=NONE --metrics_table=mltransform_one_hot_encoding_batch --influx_measurement=mltransform_one_hot_encoding_batch --job_name=benchmark-tests-mltransform-one-hot-encoding-batch-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/mltransform/one_hot_output_${{env.NOW_UTC}} --artifact_location=gs://temp-storage-for-end-to-end-tests/mltransform/artifacts_${{env.NOW_UTC}}' diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_One_Hot_Encoding_Batch.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_One_Hot_Encoding_Batch.txt new file mode 100644 index 000000000000..27648d0c0fb0 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_MLTransform_One_Hot_Encoding_Batch.txt @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--region=us-central1 +--machine_type=n1-standard-2 +--num_workers=50 +--disk_size_gb=50 +--autoscaling_algorithm=NONE +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--sdk_location=container +--requirements_file=apache_beam/ml/transforms/mltransform_tests_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--metrics_table=mltransform_one_hot_encoding_batch +--input_options={} +--influx_measurement=mltransform_one_hot_encoding_batch +# Note: output_file and artifact_location are set in the workflow with unique timestamps +--input_file=gs://apache-beam-ml/testing/inputs/sentences_50k.txt +--input_format=text +--categorical_columns=category +--num_records=1000000 +--runner=DataflowRunner diff --git a/.test-infra/tools/refresh_looker_metrics.py b/.test-infra/tools/refresh_looker_metrics.py index afd8ffa6f861..8ab648fed304 100644 --- a/.test-infra/tools/refresh_looker_metrics.py +++ b/.test-infra/tools/refresh_looker_metrics.py @@ -44,7 +44,8 @@ ("85", ["268", "269", "270", "271", "272"]), # PyTorch Sentiment Batch DistilBERT base uncased ("86", ["284", "285", "286", "287", "288"]), # VLLM Batch Gemma ("96", ["270", "304", "305", "353", "354"]), # Table Row Inference Sklearn Batch - ("106", ["355", "356", "357", "358", "359"]) # Table Row Inference Sklearn Streaming + ("106", ["355", "356", "357", "358", "359"]), # Table Row Inference Sklearn Streaming + ("108", ["365", "366", "367", "368", "369"]) # MLTransform One-Hot Encoding Batch ] def get_look(id: str) -> models.Look: diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding.py new file mode 100644 index 000000000000..5754d59b955d --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding.py @@ -0,0 +1,268 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Categorical encoding pipeline using MLTransform for batch processing. + +This pipeline demonstrates MLTransform's ComputeAndApplyVocabulary transform +for categorical feature encoding. It can either read input data from a file +or generate synthetic test data, computes vocabulary on categorical columns, +and converts categorical values to integer indices. + +Example usage with input file: + python mltransform_one_hot_encoding.py \ + --input_file=gs://bucket/input.jsonl \ + --output_file=gs://bucket/output.jsonl \ + --artifact_location=gs://bucket/artifacts \ + --categorical_columns=category \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 \ + --temp_location=gs://bucket/temp + +Example usage with synthetic data: + python mltransform_one_hot_encoding.py \ + --output_file=gs://bucket/output.jsonl \ + --categorical_columns=category \ + --num_records=100000 \ + --runner=DataflowRunner \ + --project=PROJECT \ + --region=us-central1 +""" + +import argparse +import json +import logging +import tempfile +from typing import Any + +import apache_beam as beam +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary +from apache_beam.runners.runner import PipelineResult + + +def parse_json_line(line: str) -> dict[str, Any]: + """Parse a JSON line into a dictionary.""" + try: + return json.loads(line) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse JSON line: {line[:200]}...") from e + + +def parse_text_line(line: str, + categorical_columns: list[str]) -> dict[str, Any]: + """Parse plain text line into the first categorical column.""" + text_value = line.strip() + if not text_value: + text_value = 'unknown' + return {categorical_columns[0]: text_value} + + +def format_json_output(element: Any) -> str: + """Format output element as JSON string.""" + def to_json_compatible(value: Any) -> Any: + """Recursively convert non-JSON types (e.g. numpy arrays/scalars).""" + if isinstance(value, dict): + return {k: to_json_compatible(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [to_json_compatible(v) for v in value] + + # MLTransform outputs may include numpy scalar/ndarray values. + if hasattr(value, 'tolist'): + return to_json_compatible(value.tolist()) + if hasattr(value, 'item'): + try: + return to_json_compatible(value.item()) + except (TypeError, ValueError): + pass + return value + + if hasattr(element, 'as_dict'): + return json.dumps(to_json_compatible(element.as_dict())) + if hasattr(element, '_asdict'): + return json.dumps(to_json_compatible(element._asdict())) + return json.dumps(to_json_compatible(dict(element))) + + +def generate_synthetic_record(index: int, + categorical_columns: list[str]) -> dict[str, str]: + """Generate a deterministic synthetic record with categorical values.""" + categories = [ + 'electronics', + 'clothing', + 'food', + 'books', + 'sports', + 'home', + 'toys', + 'health', + 'automotive', + 'garden' + ] + colors = [ + 'red', + 'blue', + 'green', + 'yellow', + 'black', + 'white', + 'purple', + 'orange', + 'pink', + 'gray' + ] + sizes = ['small', 'medium', 'large', 'xlarge', 'tiny', 'huge'] + + record = {} + for col in categorical_columns: + if col.lower() in ['category', 'type', 'product']: + record[col] = categories[index % len(categories)] + elif col.lower() in ['color', 'colour']: + record[col] = colors[index % len(colors)] + elif col.lower() in ['size', 'dimension']: + record[col] = sizes[index % len(sizes)] + else: + # Default to categories for unknown columns + record[col] = categories[index % len(categories)] + return record + + +def run( + argv=None, + save_main_session=True, + test_pipeline=None) -> PipelineResult | None: + """Run the categorical encoding pipeline.""" + known_args, pipeline_args = parse_known_args(argv) + + categorical_columns = [ + col.strip() for col in known_args.categorical_columns.split(',') + ] + + if not categorical_columns or not categorical_columns[0]: + raise ValueError("At least one categorical column must be specified") + + if not known_args.output_file: + raise ValueError("--output_file is required") + + # Create artifact location if not provided + artifact_location = known_args.artifact_location + if not artifact_location: + artifact_location = tempfile.mkdtemp() + logging.info("Using temporary artifact location: %s", artifact_location) + + pipeline_options = beam.options.pipeline_options.PipelineOptions( + pipeline_args) + pipeline_options.view_as( + beam.options.pipeline_options.SetupOptions + ).save_main_session = save_main_session + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + + # Use synthetic data or read from file + if known_args.input_file: + # Read and parse input data from file + if known_args.input_format == 'jsonl': + parse_input_fn = parse_json_line + else: + if len(categorical_columns) > 1: + logging.warning( + 'Input format is "text" but multiple categorical columns are ' + 'specified. Only the first column "%s" will be used for parsing.', + categorical_columns[0]) + parse_input_fn = lambda line: parse_text_line(line, categorical_columns) + raw_data = ( + pipeline + | 'ReadFromJSONL' >> beam.io.ReadFromText(known_args.input_file) + | 'ParseInput' >> beam.Map(parse_input_fn)) + else: + # Generate synthetic data + num_records = known_args.num_records or 100000 + logging.info("Generating %d synthetic records", num_records) + + raw_data = ( + pipeline + | 'GenerateSyntheticIndexes' >> beam.Create(range(num_records)) + | 'BuildSyntheticRecord' >> beam.Map( + lambda idx: generate_synthetic_record(idx, categorical_columns))) + + # Build MLTransform with ComputeAndApplyVocabulary + ml_transform = MLTransform( + write_artifact_location=artifact_location, + ).with_transform( + ComputeAndApplyVocabulary( + columns=categorical_columns, vocab_filename='vocab_onehot')) + + # Apply MLTransform + transformed_data = ( + raw_data + | 'ValidateAndFilterColumns' >> beam.Filter( + lambda element: all(col in element for col in categorical_columns)) + | 'MLTransform' >> ml_transform + | 'FormatOutput' >> beam.Map(format_json_output)) + + # Write output + _ = ( + transformed_data + | 'WriteToJSONL' >> beam.io.WriteToText( + known_args.output_file, + file_name_suffix='.jsonl', + shard_name_template='')) + + result = pipeline.run() + return result + + +def parse_known_args(argv): + """Parse command-line arguments.""" + parser = argparse.ArgumentParser( + description='Categorical encoding pipeline using MLTransform') + + parser.add_argument( + '--input_file', + help='Input JSONL file path (local or GCS). ' + 'If not provided, synthetic data will be generated.') + parser.add_argument( + '--input_format', + choices=['jsonl', 'text'], + default='jsonl', + help='Input file format for --input_file. Use jsonl for JSON lines ' + 'or text for plain text lines (default: jsonl).') + parser.add_argument( + '--output_file', + required=True, + help='Output file prefix for encoded results (JSONL format)') + parser.add_argument( + '--artifact_location', + help='GCS or local path to store MLTransform artifacts ' + '(vocabulary files). If not provided, a temp location is used.') + parser.add_argument( + '--categorical_columns', + required=True, + help='Comma-separated list of categorical column names to encode') + parser.add_argument( + '--num_records', + type=int, + default=100000, + help='Number of synthetic records to generate if --input_file is not ' + 'provided (default: 100000)') + + return parser.parse_known_args(argv) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding_test.py b/sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding_test.py new file mode 100644 index 000000000000..e0140e93e1bd --- /dev/null +++ b/sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding_test.py @@ -0,0 +1,254 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for mltransform_one_hot_encoding pipeline.""" + +import json +import logging +import os +import tempfile +import unittest +from typing import Any + +import pytest + +try: + from apache_beam.examples.ml_transform import mltransform_one_hot_encoding + from apache_beam.testing.test_pipeline import TestPipeline +except ImportError: # pylint: disable=bare-except + raise unittest.SkipTest('tensorflow_transform is not installed.') + + +def create_test_input_data() -> list[dict[str, Any]]: + """Create sample test data for one-hot encoding.""" + return [ + { + 'category': 'electronics', 'color': 'red', 'size': 'small' + }, + { + 'category': 'clothing', 'color': 'blue', 'size': 'medium' + }, + { + 'category': 'electronics', 'color': 'green', 'size': 'large' + }, + { + 'category': 'food', 'color': 'red', 'size': 'small' + }, + { + 'category': 'clothing', 'color': 'blue', 'size': 'medium' + }, + ] + + +class OneHotEncodingPipelineTest(unittest.TestCase): + """Unit and integration tests for one-hot encoding pipeline.""" + def setUp(self): + """Set up test fixtures.""" + self.test_dir = tempfile.mkdtemp() + self.input_file = os.path.join(self.test_dir, 'input.jsonl') + self.output_prefix = os.path.join(self.test_dir, 'output') + self.artifact_location = os.path.join(self.test_dir, 'artifacts') + + # Create test input file + test_data = create_test_input_data() + with open(self.input_file, 'w', encoding='utf-8') as f: + for record in test_data: + f.write(json.dumps(record) + '\n') + + def tearDown(self): + """Clean up test fixtures.""" + import shutil + shutil.rmtree(self.test_dir, ignore_errors=True) + + def test_parse_json_line_valid(self): + """Test parsing valid JSON lines.""" + line = '{"category": "electronics", "color": "red"}' + result = mltransform_one_hot_encoding.parse_json_line(line) + self.assertEqual(result['category'], 'electronics') + self.assertEqual(result['color'], 'red') + + def test_parse_json_line_invalid(self): + """Test parsing invalid JSON lines raises ValueError.""" + with self.assertRaises(ValueError): + mltransform_one_hot_encoding.parse_json_line('not valid json') + + def test_format_json_output_with_row(self): + """Test formatting beam.Row output as JSON.""" + import apache_beam as beam + row = beam.Row(category='test', value=123) + result = mltransform_one_hot_encoding.format_json_output(row) + parsed = json.loads(result) + self.assertEqual(parsed['category'], 'test') + self.assertEqual(parsed['value'], 123) + + def test_format_json_output_with_dict(self): + """Test formatting dict output as JSON.""" + element = {'category': 'test', 'value': 123} + result = mltransform_one_hot_encoding.format_json_output(element) + parsed = json.loads(result) + self.assertEqual(parsed['category'], 'test') + self.assertEqual(parsed['value'], 123) + + @pytest.mark.uses_tft + def test_end_to_end_pipeline_local(self): + """Integration test running the full pipeline locally.""" + extra_opts = { + 'input_file': self.input_file, + 'output_file': self.output_prefix, + 'artifact_location': self.artifact_location, + 'categorical_columns': 'category,color,size', + } + + with TestPipeline() as pipeline: + mltransform_one_hot_encoding.run( + argv=pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=pipeline) + + # Verify output file exists + output_file = self.output_prefix + '.jsonl' + self.assertTrue( + os.path.exists(output_file), f"Output file not found: {output_file}") + + # Verify output content + with open(output_file, 'r', encoding='utf-8') as f: + lines = [line.strip() for line in f if line.strip()] + + self.assertEqual(len(lines), 5) + + # Parse and verify structure + for line in lines: + record = json.loads(line) + # Should have original columns plus one-hot encoded versions + self.assertIn('category', record) + self.assertIn('color', record) + self.assertIn('size', record) + + @pytest.mark.uses_tft + def test_pipeline_with_missing_columns(self): + """Test pipeline handles records with missing columns gracefully.""" + # Create input with some missing columns + mixed_data = [ + { + 'category': 'electronics', 'color': 'red', 'size': 'small' + }, + { + 'category': 'clothing', 'color': 'blue' + }, # missing size + { + 'category': 'food' + }, # missing color and size + ] + + input_file = os.path.join(self.test_dir, 'mixed_input.jsonl') + with open(input_file, 'w', encoding='utf-8') as f: + for record in mixed_data: + f.write(json.dumps(record) + '\n') + + extra_opts = { + 'input_file': input_file, + 'output_file': os.path.join(self.test_dir, 'mixed_output'), + 'artifact_location': os.path.join(self.test_dir, 'mixed_artifacts'), + 'categorical_columns': 'category,color,size', + } + + with TestPipeline() as pipeline: + mltransform_one_hot_encoding.run( + argv=pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=pipeline) + + # Only first record should be processed + output_file = os.path.join(self.test_dir, 'mixed_output.jsonl') + with open(output_file, 'r', encoding='utf-8') as f: + lines = [line.strip() for line in f if line.strip()] + + self.assertEqual(len(lines), 1) + record = json.loads(lines[0]) + self.assertEqual(record['category'], 'electronics') + + def test_cli_synthetic_data_no_input(self): + """Test pipeline works without input file using synthetic data.""" + # Should not raise error when input_file is missing (uses synthetic data) + with tempfile.TemporaryDirectory() as tmpdir: + output_file = os.path.join(tmpdir, 'output') + artifact_location = os.path.join(tmpdir, 'artifacts') + + with TestPipeline() as pipeline: + # Should work without input_file (uses synthetic data) + mltransform_one_hot_encoding.run( + argv=pipeline.get_full_options_as_args( + output_file=output_file, + artifact_location=artifact_location, + categorical_columns='category', + num_records=100), + test_pipeline=pipeline) + + def test_cli_validation_missing_output(self): + """Test CLI argument validation for missing output file.""" + with self.assertRaises(ValueError) as context: + mltransform_one_hot_encoding.run( + argv=['--input_file=/tmp/in.jsonl', '--categorical_columns=category']) + self.assertIn('output_file', str(context.exception).lower()) + + def test_cli_validation_empty_columns(self): + """Test CLI argument validation for empty columns.""" + with self.assertRaises(ValueError) as context: + mltransform_one_hot_encoding.run( + argv=[ + '--input_file=/tmp/in.jsonl', + '--output_file=/tmp/out.jsonl', + '--categorical_columns=' + ]) + self.assertIn('categorical', str(context.exception).lower()) + + +class OneHotEncodingCLITest(unittest.TestCase): + """Tests for CLI argument handling.""" + def test_parse_known_args_basic(self): + """Test basic argument parsing.""" + args, _ = mltransform_one_hot_encoding.parse_known_args([ + '--input_file=/tmp/in.jsonl', + '--output_file=/tmp/out.jsonl', + '--categorical_columns=category,color', + ]) + self.assertEqual(args.input_file, '/tmp/in.jsonl') + self.assertEqual(args.output_file, '/tmp/out.jsonl') + self.assertEqual(args.categorical_columns, 'category,color') + + def test_parse_known_args_with_artifact(self): + """Test argument parsing with artifact location.""" + args, _ = mltransform_one_hot_encoding.parse_known_args([ + '--input_file=gs://bucket/in.jsonl', + '--output_file=gs://bucket/out', + '--artifact_location=gs://bucket/artifacts', + '--categorical_columns=size,color', + ]) + self.assertEqual(args.artifact_location, 'gs://bucket/artifacts') + + def test_parse_known_args_multiple_columns(self): + """Test parsing multiple categorical columns.""" + args, _ = mltransform_one_hot_encoding.parse_known_args([ + '--input_file=in.jsonl', + '--output_file=out.jsonl', + '--categorical_columns=col1,col2,col3,col4', + ]) + columns = [c.strip() for c in args.categorical_columns.split(',')] + self.assertEqual(columns, ['col1', 'col2', 'col3', 'col4']) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/transforms/mltransform_tests_requirements.txt b/sdks/python/apache_beam/ml/transforms/mltransform_tests_requirements.txt new file mode 100644 index 000000000000..9f37e070a606 --- /dev/null +++ b/sdks/python/apache_beam/ml/transforms/mltransform_tests_requirements.txt @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Requirements for MLTransform tests on Dataflow workers. +# Keep this aligned with CloudML benchmark stack to avoid worker import errors. +dill==0.4.1 +tfx_bsl==1.16.1 +tensorflow-transform==1.16.0 +tensorflow>=2.16,<2.17 +numpy>=1.22.0,<2.0 +tensorflow-metadata>=1.16.1,<1.17.0 +pyarrow>=10,<11 +tensorflow-serving-api>=2.16.1,<2.20 +tf-keras>=2.16.0,<2.17 +google-cloud-monitoring>=2.27.0 diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_one_hot_encoding_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_one_hot_encoding_benchmark.py new file mode 100644 index 000000000000..e80fca633352 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/mltransform_one_hot_encoding_benchmark.py @@ -0,0 +1,197 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +"""Benchmark test for MLTransform One-Hot Encoding pipeline. + +This benchmark measures the performance of MLTransform for one-hot encoding +categorical features, including throughput, latency, and cost metrics on +Dataflow. +""" + +import logging + +from google.cloud import monitoring_v3 +from google.protobuf.duration_pb2 import Duration + +from apache_beam.examples.ml_transform import mltransform_one_hot_encoding +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import WorkerOptions +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark +from apache_beam.testing.load_tests.load_test import LoadTestOptions + + +class MLTransformOneHotEncodingOptions( + LoadTestOptions, + StandardOptions, + GoogleCloudOptions, + WorkerOptions, + DebugOptions, + SetupOptions, +): + """Pipeline options for MLTransform One-Hot Encoding benchmark.""" + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--input_file', + default='', + help='Input JSONL/text file path for benchmark data.') + parser.add_argument( + '--input_format', + choices=['jsonl', 'text'], + default='jsonl', + help='Input file format for input_file: jsonl or text.') + parser.add_argument( + '--output_file', + required=True, + help='Output file path for encoded results') + parser.add_argument( + '--artifact_location', + required=True, + help='GCS path to store MLTransform artifacts') + parser.add_argument( + '--categorical_columns', + default='category', + help='Comma-separated list of categorical column names to encode') + parser.add_argument( + '--num_records', + type=int, + default=100000, + help='Number of synthetic records to generate') + + +class MLTransformOneHotEncodingBenchmarkTest(DataflowCostBenchmark): + """Benchmark for MLTransform One-Hot Encoding on Dataflow. + + This benchmark measures: + - Throughput: Elements processed per second + - Latency: Time to process input records + - Cost: Estimated cost on Dataflow + + The pcollection is chosen to capture the output of the MLTransform + step where one-hot encoding is applied. + """ + options_class = MLTransformOneHotEncodingOptions + + def __init__(self): + self.metrics_namespace = 'BeamML_MLTransform' + # Use the output of MLTransform step for throughput measurement + # This captures the processed data after vocabulary encoding + super().__init__( + metrics_namespace=self.metrics_namespace, + is_streaming=False, + pcollection='FormatOutput.out0') + + def test(self): + """Execute the one-hot encoding pipeline for benchmarking.""" + extra_opts = {} + + extra_opts['output_file'] = self.pipeline.get_option('output_file') + extra_opts['artifact_location'] = self.pipeline.get_option( + 'artifact_location') + extra_opts['categorical_columns'] = ( + self.pipeline.get_option('categorical_columns') or 'category') + + input_file = self.pipeline.get_option('input_file') + if input_file: + extra_opts['input_file'] = input_file + extra_opts['input_format'] = ( + self.pipeline.get_option('input_format') or 'jsonl') + else: + # Handle synthetic data generation + num_records = self.pipeline.get_option('num_records') + if num_records: + extra_opts['num_records'] = int(num_records) + + self.result = mltransform_one_hot_encoding.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + def _get_throughput_metrics( + self, + project: str, + job_id: str, + start_time: str, + end_time: str, + pcollection_name: str | None = None, + ) -> dict[str, float]: + """Get throughput metrics with runner-v2-friendly fallbacks.""" + candidate_pcollections = [] + if pcollection_name: + candidate_pcollections.append(pcollection_name) + candidate_pcollections.extend([ + self.pcollection, + 'MLTransform.out0', + 'FormatOutput.out0', + ]) + + # Deduplicate while preserving order. + seen = set() + unique_candidates = [] + for name in candidate_pcollections: + if name and name not in seen: + seen.add(name) + unique_candidates.append(name) + + for name in unique_candidates: + metrics = super()._get_throughput_metrics( + project, job_id, start_time, end_time, pcollection_name=name) + if (metrics.get('AvgThroughputBytes', 0) > 0 or + metrics.get('AvgThroughputElements', 0) > 0): + return metrics + + # Final fallback: aggregate job-level throughput without pcollection label. + interval = monitoring_v3.TimeInterval( + start_time=start_time, end_time=end_time) + aggregation = monitoring_v3.Aggregation( + alignment_period=Duration(seconds=60), + per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN) + requests = { + "Bytes": monitoring_v3.ListTimeSeriesRequest( + name=f"projects/{project}", + filter=( + 'metric.type="dataflow.googleapis.com/job/estimated_byte_count" ' + f'AND metric.labels.job_id="{job_id}"'), + interval=interval, + aggregation=aggregation), + "Elements": monitoring_v3.ListTimeSeriesRequest( + name=f"projects/{project}", + filter=( + 'metric.type="dataflow.googleapis.com/job/element_count" ' + f'AND metric.labels.job_id="{job_id}"'), + interval=interval, + aggregation=aggregation), + } + + fallback_metrics = {} + for key, req in requests.items(): + time_series = self.monitoring_client.list_time_series(request=req) + values = [ + point.value.double_value for series in time_series + for point in series.points + ] + fallback_metrics[f"AvgThroughput{key}"] = ( + sum(values) / len(values) if values else 0.0) + return fallback_metrics + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + MLTransformOneHotEncodingBenchmarkTest().run() diff --git a/website/www/site/content/en/performance/_index.md b/website/www/site/content/en/performance/_index.md index 1624c58efe2e..5fc3c34f8600 100644 --- a/website/www/site/content/en/performance/_index.md +++ b/website/www/site/content/en/performance/_index.md @@ -59,3 +59,4 @@ See the following pages for performance measures recorded when running various B - [TensorFlow MNIST Image Classification](/performance/tensorflowmnist) - [VLLM Gemma Batch Completion Tesla T4 GPU](/performance/vllmgemmabatchtesla) - [Table Row Inference Sklearn Batch](/performance/tablerowinference) +- [MLTransform One-Hot Encoding](/performance/mltransformonehot) diff --git a/website/www/site/content/en/performance/mltransformonehot/_index.md b/website/www/site/content/en/performance/mltransformonehot/_index.md new file mode 100644 index 000000000000..2d641bcfb66f --- /dev/null +++ b/website/www/site/content/en/performance/mltransformonehot/_index.md @@ -0,0 +1,42 @@ +--- +title: "MLTransform One-Hot Encoding Performance" +--- + + + +# MLTransform One-Hot Encoding Performance + +**Pipeline**: MLTransform One-Hot Encoding for Categorical Features +**Type**: Batch only +**Host**: 50 × n1-standard-2 (2 vCPUs, 7.5 GB RAM) + +The following graphs show various metrics when running the MLTransform One-Hot +Encoding pipeline using Apache Beam's MLTransform TFT integration. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available +[here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml_transform/mltransform_one_hot_encoding.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="mltransformonehot" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="mltransformonehot" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="mltransformonehot" read_or_write="write" section="date" >}} diff --git a/website/www/site/data/performance.yaml b/website/www/site/data/performance.yaml index 8841bbb58ecb..81a7ee6d4702 100644 --- a/website/www/site/data/performance.yaml +++ b/website/www/site/data/performance.yaml @@ -283,3 +283,19 @@ looks: title: AvgThroughputBytesPerSec by Version - id: P7wKZy6tQFWbbDfm4HzfCJnsQrVgfGsJ title: AvgThroughputElementsPerSec by Version + mltransformonehot: + write: + folder: 108 + cost: + - id: 37DYwfbr5y4gt7g7g7RzRSzGTjd4Jjbj + title: RunTime and EstimatedCost + date: + - id: trcXrBCyPjYj2jTGc3px3d72xMXPCmZb + title: AvgThroughputBytesPerSec by Date + - id: yczZ4G8rYcP3SHjtQXz7p4BdRhGcXydx + title: AvgThroughputElementsPerSec by Date + version: + - id: mHtwwXKndpJVPjnkcHqRZpfPzfzHtT38 + title: AvgThroughputBytesPerSec by Version + - id: Cn5FsXkdy2ZXCxCJshSCxcjsTW3TXf3c + title: AvgThroughputElementsPerSec by Version