diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_policy.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_policy.py index e46b7f49c646..c1bc9b3c672a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_policy.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_policy.py @@ -1,14 +1,52 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import logging from typing import Any, Optional from urllib.parse import urlparse from weakref import ReferenceType from azure.core.pipeline import PipelineResponse, policies -from azure.monitor.opentelemetry.exporter._quickpulse._constants import _QUICKPULSE_REDIRECT_HEADER_NAME -from azure.monitor.opentelemetry.exporter._quickpulse._generated.livemetrics import LiveMetricsClient +from azure.monitor.opentelemetry.exporter._quickpulse._constants import ( + _QUICKPULSE_REDIRECT_HEADER_NAME, +) +from azure.monitor.opentelemetry.exporter._quickpulse._generated.livemetrics import ( + LiveMetricsClient, +) + +_logger = logging.getLogger(__name__) + +# Allowed domain suffixes for QuickPulse redirect targets. +# Only redirects to these trusted Azure Monitor domains are accepted. +_ALLOWED_REDIRECT_DOMAIN_SUFFIXES = ( + ".livediagnostics.monitor.azure.com", + ".monitor.azure.com", + ".services.visualstudio.com", + ".applicationinsights.azure.com", + ".monitor.azure.us", + ".applicationinsights.azure.us", + ".monitor.azure.cn", + ".applicationinsights.azure.cn", +) + + +def _is_redirect_target_allowed(netloc: str) -> bool: + """Validate that the redirect target host belongs to a known Azure Monitor domain. + + :param str netloc: The network location (host:port) from the parsed redirect URL. + :return: True if the host is in an allowed Azure Monitor domain, False otherwise. + :rtype: bool + """ + # Use urlparse to safely extract the hostname, which handles port stripping + # and detects userinfo (username/password) that could be used to spoof the host. + parsed = urlparse(f"//{netloc}") + if parsed.username is not None or parsed.password is not None: + return False + host = parsed.hostname + if host is None: + return False + return any(host.endswith(suffix) for suffix in _ALLOWED_REDIRECT_DOMAIN_SUFFIXES) # Quickpulse endpoint handles redirects via header instead of status codes @@ -27,6 +65,19 @@ def get_redirect_location(self, response: PipelineResponse) -> Optional[str]: if redirect_location: redirected_url = urlparse(redirect_location) if redirected_url.scheme and redirected_url.netloc: + # Only allow HTTPS redirects to trusted Azure Monitor domains + if redirected_url.scheme.lower() != "https": + _logger.warning( + "QuickPulse redirect rejected: non-HTTPS scheme '%s' in redirect target.", + redirected_url.scheme, + ) + return None + if not _is_redirect_target_allowed(redirected_url.netloc): + _logger.warning( + "QuickPulse redirect rejected: host '%s' is not in the allowed domain list.", + redirected_url.netloc, + ) + return None if self._qp_client_ref: qp_client = self._qp_client_ref() if qp_client and qp_client._client: diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_policy.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_policy.py index db2bab1528b0..43dabb777481 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_policy.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/quickpulse/test_policy.py @@ -4,7 +4,10 @@ import unittest from unittest import mock -from azure.monitor.opentelemetry.exporter._quickpulse._policy import _QuickpulseRedirectPolicy +from azure.monitor.opentelemetry.exporter._quickpulse._policy import ( + _QuickpulseRedirectPolicy, + _is_redirect_target_allowed, +) # pylint: disable=line-too-long @@ -74,5 +77,135 @@ def test_get_redirect_location_no_client(self): pipeline_resp_mock.http_response = http_resp_mock policy = _QuickpulseRedirectPolicy() redirect_location = policy.get_redirect_location(pipeline_resp_mock) - self.assertEqual(redirect_location, "https://eastus.livediagnostics.monitor.azure.com/QuickPulseService.svc") + self.assertEqual( + redirect_location, + "https://eastus.livediagnostics.monitor.azure.com/QuickPulseService.svc", + ) self.assertIsNone(policy._qp_client_ref) + + def test_get_redirect_location_rejects_untrusted_host(self): + policy = _QuickpulseRedirectPolicy() + pipeline_resp_mock = mock.Mock() + http_resp_mock = mock.Mock() + headers = {"x-ms-qps-service-endpoint-redirect-v2": "https://evil.attacker.com/exfiltrate"} + http_resp_mock.headers = headers + pipeline_resp_mock.http_response = http_resp_mock + qp_client_mock = mock.Mock() + client_mock = mock.Mock() + client_mock._base_url = "https://original.livediagnostics.monitor.azure.com" + qp_client_mock._client = client_mock + qp_client_ref = weakref.ref(qp_client_mock) + policy._qp_client_ref = qp_client_ref + redirect_location = policy.get_redirect_location(pipeline_resp_mock) + # Redirect should be rejected and return None + self.assertIsNone(redirect_location) + # Base URL must not be changed + self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com") + + def test_get_redirect_location_rejects_http_scheme(self): + policy = _QuickpulseRedirectPolicy() + pipeline_resp_mock = mock.Mock() + http_resp_mock = mock.Mock() + headers = { + "x-ms-qps-service-endpoint-redirect-v2": "http://eastus.livediagnostics.monitor.azure.com/QuickPulseService.svc" + } + http_resp_mock.headers = headers + pipeline_resp_mock.http_response = http_resp_mock + qp_client_mock = mock.Mock() + client_mock = mock.Mock() + client_mock._base_url = "https://original.livediagnostics.monitor.azure.com" + qp_client_mock._client = client_mock + qp_client_ref = weakref.ref(qp_client_mock) + policy._qp_client_ref = qp_client_ref + redirect_location = policy.get_redirect_location(pipeline_resp_mock) + # HTTP downgrade should be rejected + self.assertIsNone(redirect_location) + self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com") + + def test_get_redirect_location_allows_visualstudio_domain(self): + policy = _QuickpulseRedirectPolicy() + pipeline_resp_mock = mock.Mock() + http_resp_mock = mock.Mock() + headers = { + "x-ms-qps-service-endpoint-redirect-v2": "https://rt.services.visualstudio.com/QuickPulseService.svc" + } + http_resp_mock.headers = headers + pipeline_resp_mock.http_response = http_resp_mock + qp_client_mock = mock.Mock() + client_mock = mock.Mock() + client_mock._base_url = "https://original.livediagnostics.monitor.azure.com" + qp_client_mock._client = client_mock + qp_client_ref = weakref.ref(qp_client_mock) + policy._qp_client_ref = qp_client_ref + redirect_location = policy.get_redirect_location(pipeline_resp_mock) + self.assertEqual( + redirect_location, + "https://rt.services.visualstudio.com/QuickPulseService.svc", + ) + self.assertEqual(client_mock._base_url, "https://rt.services.visualstudio.com") + + def test_get_redirect_location_rejects_spoofed_suffix(self): + """Attacker uses a domain that contains an allowed suffix but is not actually that domain.""" + policy = _QuickpulseRedirectPolicy() + pipeline_resp_mock = mock.Mock() + http_resp_mock = mock.Mock() + # Reject a host like "monitor.azure.com.evil.com": it starts with the allowed-looking + # "monitor.azure.com" string, but the actual hostname is a subdomain of "evil.com". + headers = {"x-ms-qps-service-endpoint-redirect-v2": "https://monitor.azure.com.evil.com/exfiltrate"} + http_resp_mock.headers = headers + pipeline_resp_mock.http_response = http_resp_mock + qp_client_mock = mock.Mock() + client_mock = mock.Mock() + client_mock._base_url = "https://original.livediagnostics.monitor.azure.com" + qp_client_mock._client = client_mock + qp_client_ref = weakref.ref(qp_client_mock) + policy._qp_client_ref = qp_client_ref + redirect_location = policy.get_redirect_location(pipeline_resp_mock) + self.assertIsNone(redirect_location) + self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com") + + def test_get_redirect_location_rejects_userinfo_bypass(self): + """Reject redirect URLs that use userinfo (@) to disguise the real host.""" + policy = _QuickpulseRedirectPolicy() + pipeline_resp_mock = mock.Mock() + http_resp_mock = mock.Mock() + # URL with userinfo: urlparse sees "evil.com" as the real host, not the allowed domain. + headers = { + "x-ms-qps-service-endpoint-redirect-v2": "https://eastus.livediagnostics.monitor.azure.com:443@evil.com/exfiltrate" + } + http_resp_mock.headers = headers + pipeline_resp_mock.http_response = http_resp_mock + qp_client_mock = mock.Mock() + client_mock = mock.Mock() + client_mock._base_url = "https://original.livediagnostics.monitor.azure.com" + qp_client_mock._client = client_mock + qp_client_ref = weakref.ref(qp_client_mock) + policy._qp_client_ref = qp_client_ref + redirect_location = policy.get_redirect_location(pipeline_resp_mock) + self.assertIsNone(redirect_location) + self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com") + + +class TestIsRedirectTargetAllowed(unittest.TestCase): + def test_allowed_domains(self): + self.assertTrue(_is_redirect_target_allowed("eastus.livediagnostics.monitor.azure.com")) + self.assertTrue(_is_redirect_target_allowed("global.livediagnostics.monitor.azure.com")) + self.assertTrue(_is_redirect_target_allowed("rt.services.visualstudio.com")) + self.assertTrue(_is_redirect_target_allowed("westus.in.applicationinsights.azure.com")) + self.assertTrue(_is_redirect_target_allowed("settings.monitor.azure.com")) + self.assertTrue(_is_redirect_target_allowed("eastus.monitor.azure.us")) + self.assertTrue(_is_redirect_target_allowed("eastus.monitor.azure.cn")) + + def test_allowed_domains_with_port(self): + self.assertTrue(_is_redirect_target_allowed("eastus.livediagnostics.monitor.azure.com:443")) + + def test_disallowed_domains(self): + self.assertFalse(_is_redirect_target_allowed("evil.attacker.com")) + self.assertFalse(_is_redirect_target_allowed("monitor.azure.com.evil.com")) + self.assertFalse(_is_redirect_target_allowed("localhost")) + self.assertFalse(_is_redirect_target_allowed("192.168.1.1")) + self.assertFalse(_is_redirect_target_allowed("attacker.com")) + + def test_disallowed_userinfo_bypass(self): + self.assertFalse(_is_redirect_target_allowed("eastus.livediagnostics.monitor.azure.com:443@evil.com")) + self.assertFalse(_is_redirect_target_allowed("user:pass@evil.com"))