From af7565aa7042ab2e96efc40f2bfc00e65b2894db Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Mon, 16 Mar 2026 13:44:43 -0700 Subject: [PATCH 1/3] Add DIGEST-MD5 SASL delegation token auth to HiveCatalog Enable PyIceberg's HiveCatalog to authenticate using DIGEST-MD5 SASL with delegation tokens from $HADOOP_TOKEN_FILE_LOCATION, which is the standard mechanism in secure Hadoop environments. This unblocks PyIceberg adoption in production clusters that don't use Kerberos directly. - Add HiveAuthError exception for Hive-specific auth failures - Add hadoop_credentials module to parse HDTS binary token files - Add _DigestMD5SaslTransport to work around THRIFT-5926 (None initial response) - Support hive.metastore.authentication property (NONE/KERBEROS/DIGEST-MD5) - Add pure-sasl to hive extras in pyproject.toml - Backward compatible: existing kerberos_auth boolean still works Closes #3145 Co-Authored-By: Claude Opus 4.6 --- mkdocs/docs/configuration.md | 3 + pyiceberg/catalog/hive.py | 62 +++++++- pyiceberg/exceptions.py | 4 + pyiceberg/utils/hadoop_credentials.py | 136 +++++++++++++++++ pyproject.toml | 5 +- tests/catalog/test_hive.py | 129 +++++++++++++++- tests/utils/test_hadoop_credentials.py | 201 +++++++++++++++++++++++++ uv.lock | 7 +- 8 files changed, 532 insertions(+), 15 deletions(-) create mode 100644 pyiceberg/utils/hadoop_credentials.py create mode 100644 tests/utils/test_hadoop_credentials.py diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 391cca78b0..8ef40572d4 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -685,6 +685,9 @@ catalog: | hive.kerberos-authentication | true | Using authentication via Kerberos | | hive.kerberos-service-name | hive | Kerberos service name (default hive) | | ugi | t-1234:secret | Hadoop UGI for Hive client. | +| hive.metastore.authentication | DIGEST-MD5 | Auth mechanism: `NONE` (default), `KERBEROS`, or `DIGEST-MD5` | + +When using DIGEST-MD5 authentication, PyIceberg reads a Hive delegation token from the file pointed to by the `$HADOOP_TOKEN_FILE_LOCATION` environment variable. This is the standard mechanism used in secure Hadoop environments where delegation tokens are distributed to jobs. Install PyIceberg with `pip install "pyiceberg[hive]"` to get the required `puresasl` dependency. When using Hive 2.x, make sure to set the compatibility flag: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 1bec186ca8..749bca393a 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -63,6 +63,7 @@ ) from pyiceberg.exceptions import ( CommitFailedException, + HiveAuthError, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchIcebergTableError, @@ -109,6 +110,7 @@ UnknownType, UUIDType, ) +from pyiceberg.utils.hadoop_credentials import read_hive_delegation_token from pyiceberg.utils.properties import property_as_bool, property_as_float if TYPE_CHECKING: @@ -127,6 +129,9 @@ HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name" HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive" +HIVE_METASTORE_AUTH = "hive.metastore.authentication" +HIVE_METASTORE_AUTH_DEFAULT = "NONE" + LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time" LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time" LOCK_CHECK_RETRIES = "lock-check-retries" @@ -139,6 +144,33 @@ logger = logging.getLogger(__name__) +class _DigestMD5SaslTransport(TTransport.TSaslClientTransport): + """TSaslClientTransport subclass that works around THRIFT-5926. + + The upstream ``TSaslClientTransport.open()`` passes the first + ``sasl.process()`` response directly to ``_send_sasl_message()``, + but for DIGEST-MD5 the initial response is ``None`` (challenge-first + mechanism). Sending ``None`` causes a ``TypeError``. This subclass + coerces ``None`` to ``b""`` so the SASL handshake proceeds normally. + """ + + def open(self) -> None: + # Intercept sasl.process to coerce the initial None response + original_process = self.sasl.process + + def _patched_process(challenge: bytes | None = None) -> bytes | None: + result = original_process(challenge) + if result is None: + return b"" + return result + + self.sasl.process = _patched_process + try: + super().open() + finally: + self.sasl.process = original_process + + class _HiveClient: """Helper class to nicely open and close the transport.""" @@ -151,20 +183,41 @@ def __init__( ugi: str | None = None, kerberos_auth: bool | None = HIVE_KERBEROS_AUTH_DEFAULT, kerberos_service_name: str | None = HIVE_KERBEROS_SERVICE_NAME, + auth_mechanism: str | None = None, ): self._uri = uri - self._kerberos_auth = kerberos_auth self._kerberos_service_name = kerberos_service_name self._ugi = ugi.split(":") if ugi else None + + # Resolve auth mechanism: explicit auth_mechanism takes precedence, + # then fall back to legacy kerberos_auth boolean for backward compat. + if auth_mechanism is not None: + self._auth_mechanism = auth_mechanism.upper() + elif kerberos_auth: + self._auth_mechanism = "KERBEROS" + else: + self._auth_mechanism = HIVE_METASTORE_AUTH_DEFAULT + self._transport = self._init_thrift_transport() def _init_thrift_transport(self) -> TTransport: url_parts = urlparse(self._uri) socket = TSocket.TSocket(url_parts.hostname, url_parts.port) - if not self._kerberos_auth: - return TTransport.TBufferedTransport(socket) - else: + + if self._auth_mechanism == "KERBEROS": return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service=self._kerberos_service_name) + elif self._auth_mechanism == "DIGEST-MD5": + identifier, password = read_hive_delegation_token() + return _DigestMD5SaslTransport( + socket, + host=url_parts.hostname, + service=self._kerberos_service_name, + mechanism="DIGEST-MD5", + username=identifier, + password=password, + ) + else: + return TTransport.TBufferedTransport(socket) def _client(self) -> Client: protocol = TBinaryProtocol.TBinaryProtocol(self._transport) @@ -319,6 +372,7 @@ def _create_hive_client(properties: dict[str, str]) -> _HiveClient: properties.get("ugi"), property_as_bool(properties, HIVE_KERBEROS_AUTH, HIVE_KERBEROS_AUTH_DEFAULT), properties.get(HIVE_KERBEROS_SERVICE_NAME, HIVE_KERBEROS_SERVICE_NAME_DEFAULT), + auth_mechanism=properties.get(HIVE_METASTORE_AUTH), ) except BaseException as e: last_exception = e diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index e755c73095..76a34a6e23 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -130,3 +130,7 @@ class WaitingForLockException(Exception): class ValidationException(Exception): """Raised when validation fails.""" + + +class HiveAuthError(Exception): + """Raised when Hive Metastore authentication fails.""" diff --git a/pyiceberg/utils/hadoop_credentials.py b/pyiceberg/utils/hadoop_credentials.py new file mode 100644 index 0000000000..aa5c9ed9ba --- /dev/null +++ b/pyiceberg/utils/hadoop_credentials.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Hadoop Delegation Token Service (HDTS) file parser. + +Reads delegation tokens from the binary token file pointed to by +the ``$HADOOP_TOKEN_FILE_LOCATION`` environment variable. +""" + +from __future__ import annotations + +import base64 +import os +from io import BytesIO + +from pyiceberg.exceptions import HiveAuthError + +HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" +HIVE_DELEGATION_TOKEN_KIND = "HIVE_DELEGATION_TOKEN" +HDTS_MAGIC = b"HDTS" +HDTS_SUPPORTED_VERSION = 0 + + +def _read_hadoop_vint(stream: BytesIO) -> int: + """Decode a Hadoop WritableUtils VInt/VLong from a byte stream.""" + first = stream.read(1) + if not first: + raise HiveAuthError("Unexpected end of token file while reading VInt") + b = first[0] + if b <= 0x7F: + return b + # Number of additional bytes is encoded in leading 1-bits + num_extra = 0 + mask = 0x80 + while b & mask: + num_extra += 1 + mask >>= 1 + # First byte contributes the remaining bits + result = b & (mask - 1) + extra = stream.read(num_extra) + if len(extra) != num_extra: + raise HiveAuthError("Unexpected end of token file while reading VInt") + for byte in extra: + result = (result << 8) | byte + # Sign-extend if negative (high bit of decoded value is set) + if result >= (1 << (8 * num_extra + (8 - num_extra - 1) - 1)): + result -= 1 << (8 * num_extra + (8 - num_extra - 1)) + return result + + +def _read_hadoop_bytes(stream: BytesIO) -> bytes: + """Read a VInt-prefixed byte array from a Hadoop token stream.""" + length = _read_hadoop_vint(stream) + if length < 0: + raise HiveAuthError(f"Invalid byte array length: {length}") + data = stream.read(length) + if len(data) != length: + raise HiveAuthError("Unexpected end of token file while reading byte array") + return data + + +def _read_hadoop_text(stream: BytesIO) -> str: + """Read a VInt-prefixed UTF-8 string from a Hadoop token stream.""" + return _read_hadoop_bytes(stream).decode("utf-8") + + +def read_hive_delegation_token() -> tuple[str, str]: + """Read a Hive delegation token from ``$HADOOP_TOKEN_FILE_LOCATION``. + + Returns: + A ``(identifier, password)`` tuple where both values are + base64-encoded strings suitable for SASL DIGEST-MD5 auth. + + Raises: + HiveAuthError: If the token file is missing, malformed, or + does not contain a ``HIVE_DELEGATION_TOKEN``. + """ + token_file = os.environ.get(HADOOP_TOKEN_FILE_LOCATION) + if not token_file: + raise HiveAuthError( + f"${HADOOP_TOKEN_FILE_LOCATION} environment variable is not set. " + "A Hadoop delegation token file is required for DIGEST-MD5 authentication." + ) + + try: + with open(token_file, "rb") as f: + data = f.read() + except FileNotFoundError: + raise HiveAuthError(f"Hadoop token file not found: {token_file}") + + stream = BytesIO(data) + + magic = stream.read(4) + if magic != HDTS_MAGIC: + raise HiveAuthError(f"Invalid Hadoop token file magic: expected {HDTS_MAGIC!r}, got {magic!r}") + + version_byte = stream.read(1) + if not version_byte: + raise HiveAuthError("Unexpected end of token file while reading version") + version = version_byte[0] + if version != HDTS_SUPPORTED_VERSION: + raise HiveAuthError(f"Unsupported Hadoop token file version: {version}") + + num_tokens = _read_hadoop_vint(stream) + + for _ in range(num_tokens): + # Each token entry: identifier_bytes, password_bytes, kind_text, service_text + identifier_bytes = _read_hadoop_bytes(stream) + password_bytes = _read_hadoop_bytes(stream) + kind = _read_hadoop_text(stream) + _service = _read_hadoop_text(stream) + + if kind == HIVE_DELEGATION_TOKEN_KIND: + return ( + base64.b64encode(identifier_bytes).decode("ascii"), + base64.b64encode(password_bytes).decode("ascii"), + ) + + raise HiveAuthError( + f"No {HIVE_DELEGATION_TOKEN_KIND} found in token file: {token_file}. " + f"File contains {num_tokens} token(s)." + ) diff --git a/pyproject.toml b/pyproject.toml index adbbd4fe2f..14a028c08f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,7 +74,10 @@ bodo = ["bodo>=2025.7.4"] daft = ["daft>=0.5.0"] polars = ["polars>=1.21.0,<2"] snappy = ["python-snappy>=0.6.0,<1.0.0"] -hive = ["thrift>=0.13.0,<1.0.0"] +hive = [ + "thrift>=0.13.0,<1.0.0", + "pure-sasl>=0.6.0", +] hive-kerberos = [ "thrift>=0.13.0,<1.0.0", "thrift-sasl>=0.4.3", diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 88b653e44f..9f537510aa 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -26,6 +26,7 @@ import pytest import thrift.transport.TSocket +from thrift.transport import TSocket, TTransport from hive_metastore.ttypes import ( AlreadyExistsException, EnvironmentContext, @@ -48,11 +49,13 @@ DO_NOT_UPDATE_STATS_DEFAULT, HIVE_KERBEROS_AUTH, HIVE_KERBEROS_SERVICE_NAME, + HIVE_METASTORE_AUTH, LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, HiveCatalog, _construct_hive_storage_descriptor, + _DigestMD5SaslTransport, _HiveClient, ) from pyiceberg.exceptions import ( @@ -1326,7 +1329,9 @@ def test_create_hive_client_success() -> None: with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client: client = HiveCatalog._create_hive_client(properties) - mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", False, "hive") + mock_hive_client.assert_called_once_with( + "thrift://localhost:10000", "user", False, "hive", auth_mechanism=None + ) assert client is not None @@ -1339,7 +1344,9 @@ def test_create_hive_client_with_kerberos_success() -> None: } with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client: client = HiveCatalog._create_hive_client(properties) - mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", True, "hiveuser") + mock_hive_client.assert_called_once_with( + "thrift://localhost:10000", "user", True, "hiveuser", auth_mechanism=None + ) assert client is not None @@ -1351,9 +1358,10 @@ def test_create_hive_client_multiple_uris() -> None: client = HiveCatalog._create_hive_client(properties) assert mock_hive_client.call_count == 2 - mock_hive_client.assert_has_calls( - [call("thrift://localhost:10000", "user", False, "hive"), call("thrift://localhost:10001", "user", False, "hive")] - ) + mock_hive_client.assert_has_calls([ + call("thrift://localhost:10000", "user", False, "hive", auth_mechanism=None), + call("thrift://localhost:10001", "user", False, "hive", auth_mechanism=None), + ]) assert client is not None @@ -1407,3 +1415,114 @@ def test_create_hive_client_with_kerberos_using_context_manager( # closing and re-opening work as expected. with client as open_client: assert open_client._iprot.trans.isOpen() + + +def _fake_read_token() -> tuple[str, str]: + """Return a fake delegation token for tests.""" + return ("dGVzdC1pZA==", "dGVzdC1wdw==") + + +def test_auth_mechanism_none_creates_buffered_transport_explicit() -> None: + """When auth_mechanism is explicitly NONE, a TBufferedTransport is created.""" + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="NONE") + assert isinstance(client._transport, TTransport.TBufferedTransport) + assert client._auth_mechanism == "NONE" + + +def test_auth_mechanism_kerberos_resolved() -> None: + """When auth_mechanism is KERBEROS, _auth_mechanism is set correctly. + + We don't fully instantiate because TSaslClientTransport with GSSAPI + requires the kerberos C module which may not be installed. + """ + client = _HiveClient.__new__(_HiveClient) + client._auth_mechanism = "KERBEROS" + assert client._auth_mechanism == "KERBEROS" + + +def test_auth_mechanism_digest_md5_creates_digest_transport(monkeypatch: pytest.MonkeyPatch) -> None: + """When auth_mechanism is DIGEST-MD5, a _DigestMD5SaslTransport is created.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="DIGEST-MD5") + assert isinstance(client._transport, _DigestMD5SaslTransport) + + +def test_legacy_kerberos_auth_backward_compat() -> None: + """Legacy kerberos_auth=True resolves to KERBEROS auth_mechanism.""" + client = _HiveClient.__new__(_HiveClient) + # Replicate the constructor's mechanism resolution logic + client._auth_mechanism = "KERBEROS" # what kerberos_auth=True produces + assert client._auth_mechanism == "KERBEROS" + + +def test_auth_mechanism_overrides_kerberos_auth() -> None: + """Explicit auth_mechanism takes precedence over kerberos_auth boolean.""" + client = _HiveClient(uri="thrift://localhost:9083", kerberos_auth=True, auth_mechanism="NONE") + assert isinstance(client._transport, TTransport.TBufferedTransport) + assert client._auth_mechanism == "NONE" + + +def test_auth_mechanism_case_insensitive(monkeypatch: pytest.MonkeyPatch) -> None: + """Auth mechanism should be case-insensitive.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="digest-md5") + assert isinstance(client._transport, _DigestMD5SaslTransport) + + +def test_create_hive_client_passes_auth_mechanism(monkeypatch: pytest.MonkeyPatch) -> None: + """_create_hive_client passes hive.metastore.authentication to _HiveClient.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + properties = { + "uri": "thrift://localhost:9083", + HIVE_METASTORE_AUTH: "DIGEST-MD5", + } + client = HiveCatalog._create_hive_client(properties) + assert client._auth_mechanism == "DIGEST-MD5" + + +def test_digest_md5_transport_coerces_none_to_empty_bytes(monkeypatch: pytest.MonkeyPatch) -> None: + """_DigestMD5SaslTransport.open() coerces None initial sasl.process() to b''.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + + transport = _DigestMD5SaslTransport( + TSocket.TSocket("localhost", 9083), + host="localhost", + service="hive", + mechanism="DIGEST-MD5", + username="dGVzdC1pZA==", + password="dGVzdC1wdw==", + ) + + # Build a simple sasl stand-in that returns None on first call + class FakeSasl: + def __init__(self) -> None: + self._call_count = 0 + self.complete = True + + def process(self, challenge: bytes | None = None) -> bytes | None: + self._call_count += 1 + if self._call_count == 1: + return None # DIGEST-MD5 initial response is None + return b"response-data" + + original_sasl = transport.sasl + transport.sasl = FakeSasl() # type: ignore[assignment] + + # Capture what the patched process returns during open() + captured_results: list[bytes | None] = [] + + def fake_super_open(self: TTransport.TSaslClientTransport) -> None: + captured_results.append(self.sasl.process(None)) + captured_results.append(self.sasl.process(b"challenge")) + + monkeypatch.setattr(TTransport.TSaslClientTransport, "open", fake_super_open) + + try: + transport.open() + finally: + transport.sasl = original_sasl + + # None from first process() should have been coerced to b"" + assert captured_results[0] == b"" + # Non-None result passes through unchanged + assert captured_results[1] == b"response-data" diff --git a/tests/utils/test_hadoop_credentials.py b/tests/utils/test_hadoop_credentials.py new file mode 100644 index 0000000000..e2c5035d5d --- /dev/null +++ b/tests/utils/test_hadoop_credentials.py @@ -0,0 +1,201 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=protected-access + +import base64 +import os +import struct +from io import BytesIO + +import pytest + +from pyiceberg.exceptions import HiveAuthError +from pyiceberg.utils.hadoop_credentials import ( + HADOOP_TOKEN_FILE_LOCATION, + _read_hadoop_bytes, + _read_hadoop_text, + _read_hadoop_vint, + read_hive_delegation_token, +) + + +def _write_vint(value: int) -> bytes: + """Encode a non-negative integer as a Hadoop VInt (simplified for tests).""" + if value <= 0x7F: + return bytes([value]) + # For values > 127, use 2-byte encoding (sufficient for test data) + return bytes([0x80 | ((value >> 8) & 0x7F), value & 0xFF]) + + +def _write_bytes(data: bytes) -> bytes: + """Write VInt-prefixed byte array.""" + return _write_vint(len(data)) + data + + +def _write_text(text: str) -> bytes: + """Write VInt-prefixed UTF-8 string.""" + encoded = text.encode("utf-8") + return _write_vint(len(encoded)) + encoded + + +def _build_token_file(tokens: list[tuple[bytes, bytes, str, str]]) -> bytes: + """Build a valid HDTS binary file with the given tokens. + + Each token is (identifier_bytes, password_bytes, kind, service). + """ + buf = bytearray() + buf.extend(b"HDTS") # magic + buf.append(0) # version + buf.extend(_write_vint(len(tokens))) + for identifier, password, kind, service in tokens: + buf.extend(_write_bytes(identifier)) + buf.extend(_write_bytes(password)) + buf.extend(_write_text(kind)) + buf.extend(_write_text(service)) + return bytes(buf) + + +def test_read_hadoop_vint_single_byte() -> None: + stream = BytesIO(bytes([42])) + assert _read_hadoop_vint(stream) == 42 + + +def test_read_hadoop_vint_zero() -> None: + stream = BytesIO(bytes([0])) + assert _read_hadoop_vint(stream) == 0 + + +def test_read_hadoop_vint_max_single_byte() -> None: + stream = BytesIO(bytes([0x7F])) + assert _read_hadoop_vint(stream) == 127 + + +def test_read_hadoop_vint_empty_stream() -> None: + stream = BytesIO(b"") + with pytest.raises(HiveAuthError, match="Unexpected end of token file"): + _read_hadoop_vint(stream) + + +def test_read_hadoop_bytes() -> None: + data = b"hello" + stream = BytesIO(_write_bytes(data)) + assert _read_hadoop_bytes(stream) == data + + +def test_read_hadoop_text() -> None: + stream = BytesIO(_write_text("hello")) + assert _read_hadoop_text(stream) == "hello" + + +def test_read_hive_delegation_token_valid(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + identifier = b"test-identifier-bytes" + password = b"test-password-bytes" + token_data = _build_token_file([ + (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), + ]) + + token_file = os.path.join(str(tmp_path), "token_file") + with open(token_file, "wb") as f: + f.write(token_data) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + result_id, result_pw = read_hive_delegation_token() + + assert result_id == base64.b64encode(identifier).decode("ascii") + assert result_pw == base64.b64encode(password).decode("ascii") + + +def test_read_hive_delegation_token_multiple_tokens(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + """The parser should find the HIVE_DELEGATION_TOKEN even if other tokens come first.""" + identifier = b"hive-id" + password = b"hive-pw" + token_data = _build_token_file([ + (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), + (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), + ]) + + token_file = os.path.join(str(tmp_path), "token_file") + with open(token_file, "wb") as f: + f.write(token_data) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + result_id, result_pw = read_hive_delegation_token() + + assert result_id == base64.b64encode(identifier).decode("ascii") + assert result_pw == base64.b64encode(password).decode("ascii") + + +def test_read_hive_delegation_token_env_not_set(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv(HADOOP_TOKEN_FILE_LOCATION, raising=False) + with pytest.raises(HiveAuthError, match="HADOOP_TOKEN_FILE_LOCATION.*not set"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_file_not_found(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, "/nonexistent/path/token_file") + with pytest.raises(HiveAuthError, match="not found"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_bad_magic(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + token_file = os.path.join(str(tmp_path), "token_file") + with open(token_file, "wb") as f: + f.write(b"BAAD\x00\x00") + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + with pytest.raises(HiveAuthError, match="Invalid Hadoop token file magic"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_unsupported_version(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + token_file = os.path.join(str(tmp_path), "token_file") + with open(token_file, "wb") as f: + f.write(b"HDTS\x01\x00") # version 1 + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + with pytest.raises(HiveAuthError, match="Unsupported.*version"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_no_hive_token(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + token_data = _build_token_file([ + (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), + ]) + + token_file = os.path.join(str(tmp_path), "token_file") + with open(token_file, "wb") as f: + f.write(token_data) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + with pytest.raises(HiveAuthError, match="No HIVE_DELEGATION_TOKEN found"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_truncated(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + # Build a valid file and then truncate it + token_data = _build_token_file([ + (b"test-id", b"test-pw", "HIVE_DELEGATION_TOKEN", "hive_service"), + ]) + truncated = token_data[:10] # Cut off in the middle + + token_file = os.path.join(str(tmp_path), "token_file") + with open(token_file, "wb") as f: + f.write(truncated) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + with pytest.raises(HiveAuthError, match="Unexpected end of token file"): + read_hive_delegation_token() diff --git a/uv.lock b/uv.lock index f686b2eed9..a4bb8b067f 100644 --- a/uv.lock +++ b/uv.lock @@ -1837,7 +1837,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7d/ed/6bfa4109fcb23a58819600392564fea69cdc6551ffd5e69ccf1d52a40cbc/greenlet-3.2.4-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:8c68325b0d0acf8d91dde4e6f930967dd52a5302cd4062932a6b2e7c2969f47c", size = 271061, upload-time = "2025-08-07T13:17:15.373Z" }, { url = "https://files.pythonhosted.org/packages/2a/fc/102ec1a2fc015b3a7652abab7acf3541d58c04d3d17a8d3d6a44adae1eb1/greenlet-3.2.4-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:94385f101946790ae13da500603491f04a76b6e4c059dab271b3ce2e283b2590", size = 629475, upload-time = "2025-08-07T13:42:54.009Z" }, { url = "https://files.pythonhosted.org/packages/c5/26/80383131d55a4ac0fb08d71660fd77e7660b9db6bdb4e8884f46d9f2cc04/greenlet-3.2.4-cp310-cp310-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:f10fd42b5ee276335863712fa3da6608e93f70629c631bf77145021600abc23c", size = 640802, upload-time = "2025-08-07T13:45:25.52Z" }, - { url = "https://files.pythonhosted.org/packages/9f/7c/e7833dbcd8f376f3326bd728c845d31dcde4c84268d3921afcae77d90d08/greenlet-3.2.4-cp310-cp310-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c8c9e331e58180d0d83c5b7999255721b725913ff6bc6cf39fa2a45841a4fd4b", size = 636703, upload-time = "2025-08-07T13:53:12.622Z" }, { url = "https://files.pythonhosted.org/packages/e9/49/547b93b7c0428ede7b3f309bc965986874759f7d89e4e04aeddbc9699acb/greenlet-3.2.4-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:58b97143c9cc7b86fc458f215bd0932f1757ce649e05b640fea2e79b54cedb31", size = 635417, upload-time = "2025-08-07T13:18:25.189Z" }, { url = "https://files.pythonhosted.org/packages/7f/91/ae2eb6b7979e2f9b035a9f612cf70f1bf54aad4e1d125129bef1eae96f19/greenlet-3.2.4-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c2ca18a03a8cfb5b25bc1cbe20f3d9a4c80d8c3b13ba3df49ac3961af0b1018d", size = 584358, upload-time = "2025-08-07T13:18:23.708Z" }, { url = "https://files.pythonhosted.org/packages/f7/85/433de0c9c0252b22b16d413c9407e6cb3b41df7389afc366ca204dbc1393/greenlet-3.2.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:9fe0a28a7b952a21e2c062cd5756d34354117796c6d9215a87f55e38d15402c5", size = 1113550, upload-time = "2025-08-07T13:42:37.467Z" }, @@ -1848,7 +1847,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a4/de/f28ced0a67749cac23fecb02b694f6473f47686dff6afaa211d186e2ef9c/greenlet-3.2.4-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:96378df1de302bc38e99c3a9aa311967b7dc80ced1dcc6f171e99842987882a2", size = 272305, upload-time = "2025-08-07T13:15:41.288Z" }, { url = "https://files.pythonhosted.org/packages/09/16/2c3792cba130000bf2a31c5272999113f4764fd9d874fb257ff588ac779a/greenlet-3.2.4-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:1ee8fae0519a337f2329cb78bd7a8e128ec0f881073d43f023c7b8d4831d5246", size = 632472, upload-time = "2025-08-07T13:42:55.044Z" }, { url = "https://files.pythonhosted.org/packages/ae/8f/95d48d7e3d433e6dae5b1682e4292242a53f22df82e6d3dda81b1701a960/greenlet-3.2.4-cp311-cp311-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:94abf90142c2a18151632371140b3dba4dee031633fe614cb592dbb6c9e17bc3", size = 644646, upload-time = "2025-08-07T13:45:26.523Z" }, - { url = "https://files.pythonhosted.org/packages/d5/5e/405965351aef8c76b8ef7ad370e5da58d57ef6068df197548b015464001a/greenlet-3.2.4-cp311-cp311-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:4d1378601b85e2e5171b99be8d2dc85f594c79967599328f95c1dc1a40f1c633", size = 640519, upload-time = "2025-08-07T13:53:13.928Z" }, { url = "https://files.pythonhosted.org/packages/25/5d/382753b52006ce0218297ec1b628e048c4e64b155379331f25a7316eb749/greenlet-3.2.4-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:0db5594dce18db94f7d1650d7489909b57afde4c580806b8d9203b6e79cdc079", size = 639707, upload-time = "2025-08-07T13:18:27.146Z" }, { url = "https://files.pythonhosted.org/packages/1f/8e/abdd3f14d735b2929290a018ecf133c901be4874b858dd1c604b9319f064/greenlet-3.2.4-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2523e5246274f54fdadbce8494458a2ebdcdbc7b802318466ac5606d3cded1f8", size = 587684, upload-time = "2025-08-07T13:18:25.164Z" }, { url = "https://files.pythonhosted.org/packages/5d/65/deb2a69c3e5996439b0176f6651e0052542bb6c8f8ec2e3fba97c9768805/greenlet-3.2.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1987de92fec508535687fb807a5cea1560f6196285a4cde35c100b8cd632cc52", size = 1116647, upload-time = "2025-08-07T13:42:38.655Z" }, @@ -1859,7 +1857,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/44/69/9b804adb5fd0671f367781560eb5eb586c4d495277c93bde4307b9e28068/greenlet-3.2.4-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:3b67ca49f54cede0186854a008109d6ee71f66bd57bb36abd6d0a0267b540cdd", size = 274079, upload-time = "2025-08-07T13:15:45.033Z" }, { url = "https://files.pythonhosted.org/packages/46/e9/d2a80c99f19a153eff70bc451ab78615583b8dac0754cfb942223d2c1a0d/greenlet-3.2.4-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddf9164e7a5b08e9d22511526865780a576f19ddd00d62f8a665949327fde8bb", size = 640997, upload-time = "2025-08-07T13:42:56.234Z" }, { url = "https://files.pythonhosted.org/packages/3b/16/035dcfcc48715ccd345f3a93183267167cdd162ad123cd93067d86f27ce4/greenlet-3.2.4-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:f28588772bb5fb869a8eb331374ec06f24a83a9c25bfa1f38b6993afe9c1e968", size = 655185, upload-time = "2025-08-07T13:45:27.624Z" }, - { url = "https://files.pythonhosted.org/packages/31/da/0386695eef69ffae1ad726881571dfe28b41970173947e7c558d9998de0f/greenlet-3.2.4-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:5c9320971821a7cb77cfab8d956fa8e39cd07ca44b6070db358ceb7f8797c8c9", size = 649926, upload-time = "2025-08-07T13:53:15.251Z" }, { url = "https://files.pythonhosted.org/packages/68/88/69bf19fd4dc19981928ceacbc5fd4bb6bc2215d53199e367832e98d1d8fe/greenlet-3.2.4-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c60a6d84229b271d44b70fb6e5fa23781abb5d742af7b808ae3f6efd7c9c60f6", size = 651839, upload-time = "2025-08-07T13:18:30.281Z" }, { url = "https://files.pythonhosted.org/packages/19/0d/6660d55f7373b2ff8152401a83e02084956da23ae58cddbfb0b330978fe9/greenlet-3.2.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b3812d8d0c9579967815af437d96623f45c0f2ae5f04e366de62a12d83a8fb0", size = 607586, upload-time = "2025-08-07T13:18:28.544Z" }, { url = "https://files.pythonhosted.org/packages/8e/1a/c953fdedd22d81ee4629afbb38d2f9d71e37d23caace44775a3a969147d4/greenlet-3.2.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:abbf57b5a870d30c4675928c37278493044d7c14378350b3aa5d484fa65575f0", size = 1123281, upload-time = "2025-08-07T13:42:39.858Z" }, @@ -1870,7 +1867,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/49/e8/58c7f85958bda41dafea50497cbd59738c5c43dbbea5ee83d651234398f4/greenlet-3.2.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:1a921e542453fe531144e91e1feedf12e07351b1cf6c9e8a3325ea600a715a31", size = 272814, upload-time = "2025-08-07T13:15:50.011Z" }, { url = "https://files.pythonhosted.org/packages/62/dd/b9f59862e9e257a16e4e610480cfffd29e3fae018a68c2332090b53aac3d/greenlet-3.2.4-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd3c8e693bff0fff6ba55f140bf390fa92c994083f838fece0f63be121334945", size = 641073, upload-time = "2025-08-07T13:42:57.23Z" }, { url = "https://files.pythonhosted.org/packages/f7/0b/bc13f787394920b23073ca3b6c4a7a21396301ed75a655bcb47196b50e6e/greenlet-3.2.4-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:710638eb93b1fa52823aa91bf75326f9ecdfd5e0466f00789246a5280f4ba0fc", size = 655191, upload-time = "2025-08-07T13:45:29.752Z" }, - { url = "https://files.pythonhosted.org/packages/f2/d6/6adde57d1345a8d0f14d31e4ab9c23cfe8e2cd39c3baf7674b4b0338d266/greenlet-3.2.4-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c5111ccdc9c88f423426df3fd1811bfc40ed66264d35aa373420a34377efc98a", size = 649516, upload-time = "2025-08-07T13:53:16.314Z" }, { url = "https://files.pythonhosted.org/packages/7f/3b/3a3328a788d4a473889a2d403199932be55b1b0060f4ddd96ee7cdfcad10/greenlet-3.2.4-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d76383238584e9711e20ebe14db6c88ddcedc1829a9ad31a584389463b5aa504", size = 652169, upload-time = "2025-08-07T13:18:32.861Z" }, { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497, upload-time = "2025-08-07T13:18:31.636Z" }, { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662, upload-time = "2025-08-07T13:42:41.117Z" }, @@ -1881,7 +1877,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586, upload-time = "2025-08-07T13:16:08.004Z" }, { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346, upload-time = "2025-08-07T13:42:59.944Z" }, { url = "https://files.pythonhosted.org/packages/c0/aa/687d6b12ffb505a4447567d1f3abea23bd20e73a5bed63871178e0831b7a/greenlet-3.2.4-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:c17b6b34111ea72fc5a4e4beec9711d2226285f0386ea83477cbb97c30a3f3a5", size = 699218, upload-time = "2025-08-07T13:45:30.969Z" }, - { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659, upload-time = "2025-08-07T13:53:17.759Z" }, { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355, upload-time = "2025-08-07T13:18:34.517Z" }, { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512, upload-time = "2025-08-07T13:18:33.969Z" }, { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508, upload-time = "2025-11-04T12:42:23.427Z" }, @@ -4414,6 +4409,7 @@ hf = [ { name = "huggingface-hub" }, ] hive = [ + { name = "pure-sasl" }, { name = "thrift" }, ] hive-kerberos = [ @@ -4520,6 +4516,7 @@ requires-dist = [ { name = "pandas", marker = "extra == 'ray'", specifier = ">=1.0.0" }, { name = "polars", marker = "extra == 'polars'", specifier = ">=1.21.0,<2" }, { name = "psycopg2-binary", marker = "extra == 'sql-postgres'", specifier = ">=2.9.6" }, + { name = "pure-sasl", marker = "extra == 'hive'", specifier = ">=0.6.0" }, { name = "pyarrow", marker = "extra == 'duckdb'", specifier = ">=17.0.0" }, { name = "pyarrow", marker = "extra == 'pandas'", specifier = ">=17.0.0" }, { name = "pyarrow", marker = "extra == 'pyarrow'", specifier = ">=17.0.0" }, From b87a20addf3ffbd239a72f580a56b0456d849e82 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Mon, 16 Mar 2026 14:06:40 -0700 Subject: [PATCH 2/3] Fix code review issues in DIGEST-MD5 delegation token auth Address all findings from code review: Critical: - Rewrite VInt decoder to match Java WritableUtils.readVLong exactly, using signed-byte interpretation and correct prefix/length semantics High: - Catch OSError (not just FileNotFoundError) when reading token file - Reject unknown auth mechanisms with HiveAuthError instead of silently falling back to unauthenticated TBufferedTransport - Replace monkey-patching sasl.process in _DigestMD5SaslTransport with a clean send_sasl_msg override (thread-safe, no shared state mutation) Medium: - Fix kerberos_service_name default from config key to actual value - Wrap UnicodeDecodeError in HiveAuthError for invalid UTF-8 in tokens - Rewrite VInt test encoder to match real Hadoop encoding format - Fix dead kerberos backward-compat tests to actually exercise __init__ Low: - Add upper bound to pure-sasl dependency (<1.0.0) - Fix tmp_path typing from object to pathlib.Path - Fix docs to say pure-sasl (pip package name) not puresasl Co-Authored-By: Claude Opus 4.6 --- mkdocs/docs/configuration.md | 2 +- pyiceberg/catalog/hive.py | 28 ++--- pyiceberg/utils/hadoop_credentials.py | 47 +++++---- pyproject.toml | 2 +- tests/catalog/test_hive.py | 74 ++++++------- tests/utils/test_hadoop_credentials.py | 139 ++++++++++++++++++------- uv.lock | 2 +- 7 files changed, 173 insertions(+), 121 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 8ef40572d4..e1c6e27052 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -687,7 +687,7 @@ catalog: | ugi | t-1234:secret | Hadoop UGI for Hive client. | | hive.metastore.authentication | DIGEST-MD5 | Auth mechanism: `NONE` (default), `KERBEROS`, or `DIGEST-MD5` | -When using DIGEST-MD5 authentication, PyIceberg reads a Hive delegation token from the file pointed to by the `$HADOOP_TOKEN_FILE_LOCATION` environment variable. This is the standard mechanism used in secure Hadoop environments where delegation tokens are distributed to jobs. Install PyIceberg with `pip install "pyiceberg[hive]"` to get the required `puresasl` dependency. +When using DIGEST-MD5 authentication, PyIceberg reads a Hive delegation token from the file pointed to by the `$HADOOP_TOKEN_FILE_LOCATION` environment variable. This is the standard mechanism used in secure Hadoop environments where delegation tokens are distributed to jobs. Install PyIceberg with `pip install "pyiceberg[hive]"` to get the required `pure-sasl` dependency. When using Hive 2.x, make sure to set the compatibility flag: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 749bca393a..672084d64b 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -154,21 +154,8 @@ class _DigestMD5SaslTransport(TTransport.TSaslClientTransport): coerces ``None`` to ``b""`` so the SASL handshake proceeds normally. """ - def open(self) -> None: - # Intercept sasl.process to coerce the initial None response - original_process = self.sasl.process - - def _patched_process(challenge: bytes | None = None) -> bytes | None: - result = original_process(challenge) - if result is None: - return b"" - return result - - self.sasl.process = _patched_process - try: - super().open() - finally: - self.sasl.process = original_process + def send_sasl_msg(self, status: int, body: bytes | None) -> None: # type: ignore[override] + super().send_sasl_msg(status, body if body is not None else b"") class _HiveClient: @@ -182,7 +169,7 @@ def __init__( uri: str, ugi: str | None = None, kerberos_auth: bool | None = HIVE_KERBEROS_AUTH_DEFAULT, - kerberos_service_name: str | None = HIVE_KERBEROS_SERVICE_NAME, + kerberos_service_name: str | None = HIVE_KERBEROS_SERVICE_NAME_DEFAULT, auth_mechanism: str | None = None, ): self._uri = uri @@ -204,7 +191,9 @@ def _init_thrift_transport(self) -> TTransport: url_parts = urlparse(self._uri) socket = TSocket.TSocket(url_parts.hostname, url_parts.port) - if self._auth_mechanism == "KERBEROS": + if self._auth_mechanism == "NONE": + return TTransport.TBufferedTransport(socket) + elif self._auth_mechanism == "KERBEROS": return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service=self._kerberos_service_name) elif self._auth_mechanism == "DIGEST-MD5": identifier, password = read_hive_delegation_token() @@ -217,7 +206,10 @@ def _init_thrift_transport(self) -> TTransport: password=password, ) else: - return TTransport.TBufferedTransport(socket) + raise HiveAuthError( + f"Unknown auth mechanism: {self._auth_mechanism!r}. " + f"Valid values: NONE, KERBEROS, DIGEST-MD5" + ) def _client(self) -> Client: protocol = TBinaryProtocol.TBinaryProtocol(self._transport) diff --git a/pyiceberg/utils/hadoop_credentials.py b/pyiceberg/utils/hadoop_credentials.py index aa5c9ed9ba..9fd1de90c3 100644 --- a/pyiceberg/utils/hadoop_credentials.py +++ b/pyiceberg/utils/hadoop_credentials.py @@ -36,29 +36,32 @@ def _read_hadoop_vint(stream: BytesIO) -> int: - """Decode a Hadoop WritableUtils VInt/VLong from a byte stream.""" + """Decode a Hadoop WritableUtils VInt/VLong from a byte stream. + + Matches the encoding in Java's ``WritableUtils.readVInt``/``readVLong``: + - If the first byte (interpreted as signed) is >= -112, it *is* the value. + - Otherwise the first byte encodes both a negativity flag and the number + of additional big-endian payload bytes that carry the actual value. + """ first = stream.read(1) if not first: raise HiveAuthError("Unexpected end of token file while reading VInt") + # Reinterpret as signed byte to match Java's signed-byte semantics b = first[0] - if b <= 0x7F: + if b > 127: + b -= 256 + if b >= -112: return b - # Number of additional bytes is encoded in leading 1-bits - num_extra = 0 - mask = 0x80 - while b & mask: - num_extra += 1 - mask >>= 1 - # First byte contributes the remaining bits - result = b & (mask - 1) - extra = stream.read(num_extra) - if len(extra) != num_extra: + negative = b < -120 + length = (-119 - b) if negative else (-111 - b) + extra = stream.read(length) + if len(extra) != length: raise HiveAuthError("Unexpected end of token file while reading VInt") - for byte in extra: - result = (result << 8) | byte - # Sign-extend if negative (high bit of decoded value is set) - if result >= (1 << (8 * num_extra + (8 - num_extra - 1) - 1)): - result -= 1 << (8 * num_extra + (8 - num_extra - 1)) + result = 0 + for byte_val in extra: + result = (result << 8) | byte_val + if negative: + result = ~result return result @@ -75,7 +78,11 @@ def _read_hadoop_bytes(stream: BytesIO) -> bytes: def _read_hadoop_text(stream: BytesIO) -> str: """Read a VInt-prefixed UTF-8 string from a Hadoop token stream.""" - return _read_hadoop_bytes(stream).decode("utf-8") + raw = _read_hadoop_bytes(stream) + try: + return raw.decode("utf-8") + except UnicodeDecodeError as e: + raise HiveAuthError(f"Token file contains invalid UTF-8 in text field: {e}") from e def read_hive_delegation_token() -> tuple[str, str]: @@ -99,8 +106,8 @@ def read_hive_delegation_token() -> tuple[str, str]: try: with open(token_file, "rb") as f: data = f.read() - except FileNotFoundError: - raise HiveAuthError(f"Hadoop token file not found: {token_file}") + except OSError as e: + raise HiveAuthError(f"Cannot read Hadoop token file {token_file}: {e}") from e stream = BytesIO(data) diff --git a/pyproject.toml b/pyproject.toml index 14a028c08f..6dfccc06f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,7 @@ polars = ["polars>=1.21.0,<2"] snappy = ["python-snappy>=0.6.0,<1.0.0"] hive = [ "thrift>=0.13.0,<1.0.0", - "pure-sasl>=0.6.0", + "pure-sasl>=0.6.0,<1.0.0", ] hive-kerberos = [ "thrift>=0.13.0,<1.0.0", diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 9f537510aa..ea3c28a759 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -1429,15 +1429,13 @@ def test_auth_mechanism_none_creates_buffered_transport_explicit() -> None: assert client._auth_mechanism == "NONE" -def test_auth_mechanism_kerberos_resolved() -> None: - """When auth_mechanism is KERBEROS, _auth_mechanism is set correctly. - - We don't fully instantiate because TSaslClientTransport with GSSAPI - requires the kerberos C module which may not be installed. - """ - client = _HiveClient.__new__(_HiveClient) - client._auth_mechanism = "KERBEROS" +def test_auth_mechanism_kerberos_resolved(monkeypatch: pytest.MonkeyPatch) -> None: + """When auth_mechanism is KERBEROS, _auth_mechanism is resolved correctly.""" + # Stub TSaslClientTransport.__init__ to avoid requiring the kerberos C module + monkeypatch.setattr(TTransport.TSaslClientTransport, "__init__", lambda *a, **kw: None) + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="KERBEROS") assert client._auth_mechanism == "KERBEROS" + assert isinstance(client._transport, TTransport.TSaslClientTransport) def test_auth_mechanism_digest_md5_creates_digest_transport(monkeypatch: pytest.MonkeyPatch) -> None: @@ -1447,12 +1445,12 @@ def test_auth_mechanism_digest_md5_creates_digest_transport(monkeypatch: pytest. assert isinstance(client._transport, _DigestMD5SaslTransport) -def test_legacy_kerberos_auth_backward_compat() -> None: +def test_legacy_kerberos_auth_backward_compat(monkeypatch: pytest.MonkeyPatch) -> None: """Legacy kerberos_auth=True resolves to KERBEROS auth_mechanism.""" - client = _HiveClient.__new__(_HiveClient) - # Replicate the constructor's mechanism resolution logic - client._auth_mechanism = "KERBEROS" # what kerberos_auth=True produces + monkeypatch.setattr(TTransport.TSaslClientTransport, "__init__", lambda *a, **kw: None) + client = _HiveClient(uri="thrift://localhost:9083", kerberos_auth=True) assert client._auth_mechanism == "KERBEROS" + assert isinstance(client._transport, TTransport.TSaslClientTransport) def test_auth_mechanism_overrides_kerberos_auth() -> None: @@ -1462,6 +1460,14 @@ def test_auth_mechanism_overrides_kerberos_auth() -> None: assert client._auth_mechanism == "NONE" +def test_auth_mechanism_unknown_raises() -> None: + """Unknown auth mechanism should raise HiveAuthError, not silently fall back.""" + from pyiceberg.exceptions import HiveAuthError + + with pytest.raises(HiveAuthError, match="Unknown auth mechanism.*PLAIN"): + _HiveClient(uri="thrift://localhost:9083", auth_mechanism="PLAIN") + + def test_auth_mechanism_case_insensitive(monkeypatch: pytest.MonkeyPatch) -> None: """Auth mechanism should be case-insensitive.""" monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) @@ -1480,8 +1486,8 @@ def test_create_hive_client_passes_auth_mechanism(monkeypatch: pytest.MonkeyPatc assert client._auth_mechanism == "DIGEST-MD5" -def test_digest_md5_transport_coerces_none_to_empty_bytes(monkeypatch: pytest.MonkeyPatch) -> None: - """_DigestMD5SaslTransport.open() coerces None initial sasl.process() to b''.""" +def test_digest_md5_transport_send_sasl_msg_coerces_none(monkeypatch: pytest.MonkeyPatch) -> None: + """_DigestMD5SaslTransport.send_sasl_msg coerces None body to b''.""" monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) transport = _DigestMD5SaslTransport( @@ -1493,36 +1499,18 @@ def test_digest_md5_transport_coerces_none_to_empty_bytes(monkeypatch: pytest.Mo password="dGVzdC1wdw==", ) - # Build a simple sasl stand-in that returns None on first call - class FakeSasl: - def __init__(self) -> None: - self._call_count = 0 - self.complete = True - - def process(self, challenge: bytes | None = None) -> bytes | None: - self._call_count += 1 - if self._call_count == 1: - return None # DIGEST-MD5 initial response is None - return b"response-data" - - original_sasl = transport.sasl - transport.sasl = FakeSasl() # type: ignore[assignment] - - # Capture what the patched process returns during open() - captured_results: list[bytes | None] = [] + # Capture what the parent send_sasl_msg receives + captured_calls: list[tuple[int, bytes | None]] = [] - def fake_super_open(self: TTransport.TSaslClientTransport) -> None: - captured_results.append(self.sasl.process(None)) - captured_results.append(self.sasl.process(b"challenge")) + def capture_send(self: TTransport.TSaslClientTransport, status: int, body: bytes | None) -> None: + captured_calls.append((status, body)) - monkeypatch.setattr(TTransport.TSaslClientTransport, "open", fake_super_open) + monkeypatch.setattr(TTransport.TSaslClientTransport, "send_sasl_msg", capture_send) - try: - transport.open() - finally: - transport.sasl = original_sasl + # Send with None body — should be coerced to b"" + transport.send_sasl_msg(1, None) + # Send with real body — should pass through unchanged + transport.send_sasl_msg(2, b"real-data") - # None from first process() should have been coerced to b"" - assert captured_results[0] == b"" - # Non-None result passes through unchanged - assert captured_results[1] == b"response-data" + assert captured_calls[0] == (1, b""), "None body should be coerced to b''" + assert captured_calls[1] == (2, b"real-data"), "Non-None body should pass through" diff --git a/tests/utils/test_hadoop_credentials.py b/tests/utils/test_hadoop_credentials.py index e2c5035d5d..bef80c6328 100644 --- a/tests/utils/test_hadoop_credentials.py +++ b/tests/utils/test_hadoop_credentials.py @@ -17,7 +17,7 @@ # pylint: disable=protected-access import base64 -import os +import pathlib import struct from io import BytesIO @@ -34,11 +34,22 @@ def _write_vint(value: int) -> bytes: - """Encode a non-negative integer as a Hadoop VInt (simplified for tests).""" - if value <= 0x7F: - return bytes([value]) - # For values > 127, use 2-byte encoding (sufficient for test data) - return bytes([0x80 | ((value >> 8) & 0x7F), value & 0xFF]) + """Encode an integer as a Hadoop VInt (matching Java WritableUtils.writeVLong).""" + if -112 <= value <= 127: + return struct.pack("b", value) + negative = value < 0 + work = ~value if negative else value + # Java: len starts at -120 (negative) or -112 (positive), + # decrements for each significant byte in the value + prefix = -120 if negative else -112 + tmp = work + while tmp != 0: + tmp >>= 8 + prefix -= 1 + num_bytes = (-119 - prefix) if negative else (-111 - prefix) + result = struct.pack("b", prefix) + result += work.to_bytes(num_bytes, byteorder="big", signed=False) + return result def _write_bytes(data: bytes) -> bytes: @@ -69,6 +80,9 @@ def _build_token_file(tokens: list[tuple[bytes, bytes, str, str]]) -> bytes: return bytes(buf) +# --- VInt unit tests --- + + def test_read_hadoop_vint_single_byte() -> None: stream = BytesIO(bytes([42])) assert _read_hadoop_vint(stream) == 42 @@ -84,12 +98,44 @@ def test_read_hadoop_vint_max_single_byte() -> None: assert _read_hadoop_vint(stream) == 127 +def test_read_hadoop_vint_negative_single_byte() -> None: + """Values -112 through -1 are single-byte in Hadoop VInt.""" + for value in [-1, -50, -112]: + encoded = _write_vint(value) + assert _read_hadoop_vint(BytesIO(encoded)) == value + + +def test_read_hadoop_vint_multi_byte_positive() -> None: + """Values > 127 require multi-byte encoding.""" + for value in [128, 255, 256, 1000, 65535]: + encoded = _write_vint(value) + assert _read_hadoop_vint(BytesIO(encoded)) == value + + +def test_read_hadoop_vint_multi_byte_negative() -> None: + """Values < -112 require multi-byte encoding with negative flag.""" + for value in [-113, -200, -1000]: + encoded = _write_vint(value) + assert _read_hadoop_vint(BytesIO(encoded)) == value + + def test_read_hadoop_vint_empty_stream() -> None: stream = BytesIO(b"") with pytest.raises(HiveAuthError, match="Unexpected end of token file"): _read_hadoop_vint(stream) +def test_read_hadoop_vint_truncated_multi_byte() -> None: + """Multi-byte VInt with missing payload bytes should raise.""" + # Prefix byte for 2-byte positive value, but no payload + stream = BytesIO(struct.pack("b", -113)) + with pytest.raises(HiveAuthError, match="Unexpected end of token file"): + _read_hadoop_vint(stream) + + +# --- Bytes/Text unit tests --- + + def test_read_hadoop_bytes() -> None: data = b"hello" stream = BytesIO(_write_bytes(data)) @@ -101,25 +147,35 @@ def test_read_hadoop_text() -> None: assert _read_hadoop_text(stream) == "hello" -def test_read_hive_delegation_token_valid(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: +def test_read_hadoop_text_invalid_utf8() -> None: + """Invalid UTF-8 in text field should raise HiveAuthError.""" + invalid_bytes = b"\xff\xfe" + raw = _write_vint(len(invalid_bytes)) + invalid_bytes + with pytest.raises(HiveAuthError, match="invalid UTF-8"): + _read_hadoop_text(BytesIO(raw)) + + +# --- Token file tests --- + + +def test_read_hive_delegation_token_valid(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: identifier = b"test-identifier-bytes" password = b"test-password-bytes" token_data = _build_token_file([ (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), ]) - token_file = os.path.join(str(tmp_path), "token_file") - with open(token_file, "wb") as f: - f.write(token_data) + token_file = tmp_path / "token_file" + token_file.write_bytes(token_data) - monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) result_id, result_pw = read_hive_delegation_token() assert result_id == base64.b64encode(identifier).decode("ascii") assert result_pw == base64.b64encode(password).decode("ascii") -def test_read_hive_delegation_token_multiple_tokens(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: +def test_read_hive_delegation_token_multiple_tokens(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """The parser should find the HIVE_DELEGATION_TOKEN even if other tokens come first.""" identifier = b"hive-id" password = b"hive-pw" @@ -128,11 +184,10 @@ def test_read_hive_delegation_token_multiple_tokens(tmp_path: object, monkeypatc (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), ]) - token_file = os.path.join(str(tmp_path), "token_file") - with open(token_file, "wb") as f: - f.write(token_data) + token_file = tmp_path / "token_file" + token_file.write_bytes(token_data) - monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) result_id, result_pw = read_hive_delegation_token() assert result_id == base64.b64encode(identifier).decode("ascii") @@ -147,55 +202,65 @@ def test_read_hive_delegation_token_env_not_set(monkeypatch: pytest.MonkeyPatch) def test_read_hive_delegation_token_file_not_found(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, "/nonexistent/path/token_file") - with pytest.raises(HiveAuthError, match="not found"): + with pytest.raises(HiveAuthError, match="Cannot read Hadoop token file"): read_hive_delegation_token() -def test_read_hive_delegation_token_bad_magic(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: - token_file = os.path.join(str(tmp_path), "token_file") - with open(token_file, "wb") as f: - f.write(b"BAAD\x00\x00") +def test_read_hive_delegation_token_permission_error(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Permission errors on the token file should raise HiveAuthError.""" + token_file = tmp_path / "token_file" + token_file.write_bytes(b"HDTS\x00\x00") + token_file.chmod(0o000) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + try: + with pytest.raises(HiveAuthError, match="Cannot read Hadoop token file"): + read_hive_delegation_token() + finally: + token_file.chmod(0o644) + + +def test_read_hive_delegation_token_bad_magic(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + token_file = tmp_path / "token_file" + token_file.write_bytes(b"BAAD\x00\x00") - monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) with pytest.raises(HiveAuthError, match="Invalid Hadoop token file magic"): read_hive_delegation_token() -def test_read_hive_delegation_token_unsupported_version(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: - token_file = os.path.join(str(tmp_path), "token_file") - with open(token_file, "wb") as f: - f.write(b"HDTS\x01\x00") # version 1 +def test_read_hive_delegation_token_unsupported_version(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + token_file = tmp_path / "token_file" + token_file.write_bytes(b"HDTS\x01\x00") # version 1 - monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) with pytest.raises(HiveAuthError, match="Unsupported.*version"): read_hive_delegation_token() -def test_read_hive_delegation_token_no_hive_token(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: +def test_read_hive_delegation_token_no_hive_token(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: token_data = _build_token_file([ (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), ]) - token_file = os.path.join(str(tmp_path), "token_file") - with open(token_file, "wb") as f: - f.write(token_data) + token_file = tmp_path / "token_file" + token_file.write_bytes(token_data) - monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) with pytest.raises(HiveAuthError, match="No HIVE_DELEGATION_TOKEN found"): read_hive_delegation_token() -def test_read_hive_delegation_token_truncated(tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: +def test_read_hive_delegation_token_truncated(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: # Build a valid file and then truncate it token_data = _build_token_file([ (b"test-id", b"test-pw", "HIVE_DELEGATION_TOKEN", "hive_service"), ]) truncated = token_data[:10] # Cut off in the middle - token_file = os.path.join(str(tmp_path), "token_file") - with open(token_file, "wb") as f: - f.write(truncated) + token_file = tmp_path / "token_file" + token_file.write_bytes(truncated) - monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, token_file) + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) with pytest.raises(HiveAuthError, match="Unexpected end of token file"): read_hive_delegation_token() diff --git a/uv.lock b/uv.lock index a4bb8b067f..46253d1bbd 100644 --- a/uv.lock +++ b/uv.lock @@ -4516,7 +4516,7 @@ requires-dist = [ { name = "pandas", marker = "extra == 'ray'", specifier = ">=1.0.0" }, { name = "polars", marker = "extra == 'polars'", specifier = ">=1.21.0,<2" }, { name = "psycopg2-binary", marker = "extra == 'sql-postgres'", specifier = ">=2.9.6" }, - { name = "pure-sasl", marker = "extra == 'hive'", specifier = ">=0.6.0" }, + { name = "pure-sasl", marker = "extra == 'hive'", specifier = ">=0.6.0,<1.0.0" }, { name = "pyarrow", marker = "extra == 'duckdb'", specifier = ">=17.0.0" }, { name = "pyarrow", marker = "extra == 'pandas'", specifier = ">=17.0.0" }, { name = "pyarrow", marker = "extra == 'pyarrow'", specifier = ">=17.0.0" }, From 34d1bddb697bdc5cc6bbc3ba99b4edabaafbde40 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Mon, 16 Mar 2026 14:34:02 -0700 Subject: [PATCH 3/3] first round of reviews --- pyiceberg/catalog/hive.py | 5 +--- pyiceberg/utils/hadoop_credentials.py | 3 +-- tests/catalog/test_hive.py | 20 +++++++-------- tests/utils/test_hadoop_credentials.py | 34 ++++++++++++++++---------- 4 files changed, 32 insertions(+), 30 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 672084d64b..0b3a0b9741 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -206,10 +206,7 @@ def _init_thrift_transport(self) -> TTransport: password=password, ) else: - raise HiveAuthError( - f"Unknown auth mechanism: {self._auth_mechanism!r}. " - f"Valid values: NONE, KERBEROS, DIGEST-MD5" - ) + raise HiveAuthError(f"Unknown auth mechanism: {self._auth_mechanism!r}. Valid values: NONE, KERBEROS, DIGEST-MD5") def _client(self) -> Client: protocol = TBinaryProtocol.TBinaryProtocol(self._transport) diff --git a/pyiceberg/utils/hadoop_credentials.py b/pyiceberg/utils/hadoop_credentials.py index 9fd1de90c3..6b345a83ee 100644 --- a/pyiceberg/utils/hadoop_credentials.py +++ b/pyiceberg/utils/hadoop_credentials.py @@ -138,6 +138,5 @@ def read_hive_delegation_token() -> tuple[str, str]: ) raise HiveAuthError( - f"No {HIVE_DELEGATION_TOKEN_KIND} found in token file: {token_file}. " - f"File contains {num_tokens} token(s)." + f"No {HIVE_DELEGATION_TOKEN_KIND} found in token file: {token_file}. File contains {num_tokens} token(s)." ) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index ea3c28a759..6821aaae68 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -26,7 +26,6 @@ import pytest import thrift.transport.TSocket -from thrift.transport import TSocket, TTransport from hive_metastore.ttypes import ( AlreadyExistsException, EnvironmentContext, @@ -42,6 +41,7 @@ ) from hive_metastore.ttypes import Database as HiveDatabase from hive_metastore.ttypes import Table as HiveTable +from thrift.transport import TSocket, TTransport from pyiceberg.catalog import PropertiesUpdateSummary from pyiceberg.catalog.hive import ( @@ -1329,9 +1329,7 @@ def test_create_hive_client_success() -> None: with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client: client = HiveCatalog._create_hive_client(properties) - mock_hive_client.assert_called_once_with( - "thrift://localhost:10000", "user", False, "hive", auth_mechanism=None - ) + mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", False, "hive", auth_mechanism=None) assert client is not None @@ -1344,9 +1342,7 @@ def test_create_hive_client_with_kerberos_success() -> None: } with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client: client = HiveCatalog._create_hive_client(properties) - mock_hive_client.assert_called_once_with( - "thrift://localhost:10000", "user", True, "hiveuser", auth_mechanism=None - ) + mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", True, "hiveuser", auth_mechanism=None) assert client is not None @@ -1358,10 +1354,12 @@ def test_create_hive_client_multiple_uris() -> None: client = HiveCatalog._create_hive_client(properties) assert mock_hive_client.call_count == 2 - mock_hive_client.assert_has_calls([ - call("thrift://localhost:10000", "user", False, "hive", auth_mechanism=None), - call("thrift://localhost:10001", "user", False, "hive", auth_mechanism=None), - ]) + mock_hive_client.assert_has_calls( + [ + call("thrift://localhost:10000", "user", False, "hive", auth_mechanism=None), + call("thrift://localhost:10001", "user", False, "hive", auth_mechanism=None), + ] + ) assert client is not None diff --git a/tests/utils/test_hadoop_credentials.py b/tests/utils/test_hadoop_credentials.py index bef80c6328..9ad4e21872 100644 --- a/tests/utils/test_hadoop_credentials.py +++ b/tests/utils/test_hadoop_credentials.py @@ -161,9 +161,11 @@ def test_read_hadoop_text_invalid_utf8() -> None: def test_read_hive_delegation_token_valid(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: identifier = b"test-identifier-bytes" password = b"test-password-bytes" - token_data = _build_token_file([ - (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), - ]) + token_data = _build_token_file( + [ + (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), + ] + ) token_file = tmp_path / "token_file" token_file.write_bytes(token_data) @@ -179,10 +181,12 @@ def test_read_hive_delegation_token_multiple_tokens(tmp_path: pathlib.Path, monk """The parser should find the HIVE_DELEGATION_TOKEN even if other tokens come first.""" identifier = b"hive-id" password = b"hive-pw" - token_data = _build_token_file([ - (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), - (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), - ]) + token_data = _build_token_file( + [ + (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), + (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), + ] + ) token_file = tmp_path / "token_file" token_file.write_bytes(token_data) @@ -239,9 +243,11 @@ def test_read_hive_delegation_token_unsupported_version(tmp_path: pathlib.Path, def test_read_hive_delegation_token_no_hive_token(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: - token_data = _build_token_file([ - (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), - ]) + token_data = _build_token_file( + [ + (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), + ] + ) token_file = tmp_path / "token_file" token_file.write_bytes(token_data) @@ -253,9 +259,11 @@ def test_read_hive_delegation_token_no_hive_token(tmp_path: pathlib.Path, monkey def test_read_hive_delegation_token_truncated(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: # Build a valid file and then truncate it - token_data = _build_token_file([ - (b"test-id", b"test-pw", "HIVE_DELEGATION_TOKEN", "hive_service"), - ]) + token_data = _build_token_file( + [ + (b"test-id", b"test-pw", "HIVE_DELEGATION_TOKEN", "hive_service"), + ] + ) truncated = token_data[:10] # Cut off in the middle token_file = tmp_path / "token_file"