Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,52 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import logging
from typing import Any, Optional
from urllib.parse import urlparse
from weakref import ReferenceType

from azure.core.pipeline import PipelineResponse, policies

from azure.monitor.opentelemetry.exporter._quickpulse._constants import _QUICKPULSE_REDIRECT_HEADER_NAME
from azure.monitor.opentelemetry.exporter._quickpulse._generated.livemetrics import LiveMetricsClient
from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
_QUICKPULSE_REDIRECT_HEADER_NAME,
)
from azure.monitor.opentelemetry.exporter._quickpulse._generated.livemetrics import (
LiveMetricsClient,
)

_logger = logging.getLogger(__name__)

# Allowed domain suffixes for QuickPulse redirect targets.
# Only redirects to these trusted Azure Monitor domains are accepted.
_ALLOWED_REDIRECT_DOMAIN_SUFFIXES = (
".livediagnostics.monitor.azure.com",
".monitor.azure.com",
".services.visualstudio.com",
".applicationinsights.azure.com",
".monitor.azure.us",
".applicationinsights.azure.us",
".monitor.azure.cn",
".applicationinsights.azure.cn",
)


def _is_redirect_target_allowed(netloc: str) -> bool:
"""Validate that the redirect target host belongs to a known Azure Monitor domain.

:param str netloc: The network location (host:port) from the parsed redirect URL.
:return: True if the host is in an allowed Azure Monitor domain, False otherwise.
:rtype: bool
"""
# Use urlparse to safely extract the hostname, which handles port stripping
# and detects userinfo (username/password) that could be used to spoof the host.
parsed = urlparse(f"//{netloc}")
if parsed.username is not None or parsed.password is not None:
return False
host = parsed.hostname
if host is None:
return False
return any(host.endswith(suffix) for suffix in _ALLOWED_REDIRECT_DOMAIN_SUFFIXES)


# Quickpulse endpoint handles redirects via header instead of status codes
Expand All @@ -27,6 +65,19 @@ def get_redirect_location(self, response: PipelineResponse) -> Optional[str]:
if redirect_location:
redirected_url = urlparse(redirect_location)
if redirected_url.scheme and redirected_url.netloc:
# Only allow HTTPS redirects to trusted Azure Monitor domains
if redirected_url.scheme.lower() != "https":
_logger.warning(
"QuickPulse redirect rejected: non-HTTPS scheme '%s' in redirect target.",
redirected_url.scheme,
)
return None
if not _is_redirect_target_allowed(redirected_url.netloc):
_logger.warning(
"QuickPulse redirect rejected: host '%s' is not in the allowed domain list.",
redirected_url.netloc,
)
return None
if self._qp_client_ref:
qp_client = self._qp_client_ref()
if qp_client and qp_client._client:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import unittest
from unittest import mock

from azure.monitor.opentelemetry.exporter._quickpulse._policy import _QuickpulseRedirectPolicy
from azure.monitor.opentelemetry.exporter._quickpulse._policy import (
_QuickpulseRedirectPolicy,
_is_redirect_target_allowed,
)


# pylint: disable=line-too-long
Expand Down Expand Up @@ -74,5 +77,135 @@ def test_get_redirect_location_no_client(self):
pipeline_resp_mock.http_response = http_resp_mock
policy = _QuickpulseRedirectPolicy()
redirect_location = policy.get_redirect_location(pipeline_resp_mock)
self.assertEqual(redirect_location, "https://eastus.livediagnostics.monitor.azure.com/QuickPulseService.svc")
self.assertEqual(
redirect_location,
"https://eastus.livediagnostics.monitor.azure.com/QuickPulseService.svc",
)
self.assertIsNone(policy._qp_client_ref)

def test_get_redirect_location_rejects_untrusted_host(self):
policy = _QuickpulseRedirectPolicy()
pipeline_resp_mock = mock.Mock()
http_resp_mock = mock.Mock()
headers = {"x-ms-qps-service-endpoint-redirect-v2": "https://evil.attacker.com/exfiltrate"}
http_resp_mock.headers = headers
pipeline_resp_mock.http_response = http_resp_mock
qp_client_mock = mock.Mock()
client_mock = mock.Mock()
client_mock._base_url = "https://original.livediagnostics.monitor.azure.com"
qp_client_mock._client = client_mock
qp_client_ref = weakref.ref(qp_client_mock)
policy._qp_client_ref = qp_client_ref
redirect_location = policy.get_redirect_location(pipeline_resp_mock)
# Redirect should be rejected and return None
self.assertIsNone(redirect_location)
# Base URL must not be changed
self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com")

def test_get_redirect_location_rejects_http_scheme(self):
policy = _QuickpulseRedirectPolicy()
pipeline_resp_mock = mock.Mock()
http_resp_mock = mock.Mock()
headers = {
"x-ms-qps-service-endpoint-redirect-v2": "http://eastus.livediagnostics.monitor.azure.com/QuickPulseService.svc"
}
http_resp_mock.headers = headers
pipeline_resp_mock.http_response = http_resp_mock
qp_client_mock = mock.Mock()
client_mock = mock.Mock()
client_mock._base_url = "https://original.livediagnostics.monitor.azure.com"
qp_client_mock._client = client_mock
qp_client_ref = weakref.ref(qp_client_mock)
policy._qp_client_ref = qp_client_ref
redirect_location = policy.get_redirect_location(pipeline_resp_mock)
# HTTP downgrade should be rejected
self.assertIsNone(redirect_location)
self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com")

def test_get_redirect_location_allows_visualstudio_domain(self):
policy = _QuickpulseRedirectPolicy()
pipeline_resp_mock = mock.Mock()
http_resp_mock = mock.Mock()
headers = {
"x-ms-qps-service-endpoint-redirect-v2": "https://rt.services.visualstudio.com/QuickPulseService.svc"
}
http_resp_mock.headers = headers
pipeline_resp_mock.http_response = http_resp_mock
qp_client_mock = mock.Mock()
client_mock = mock.Mock()
client_mock._base_url = "https://original.livediagnostics.monitor.azure.com"
qp_client_mock._client = client_mock
qp_client_ref = weakref.ref(qp_client_mock)
policy._qp_client_ref = qp_client_ref
redirect_location = policy.get_redirect_location(pipeline_resp_mock)
self.assertEqual(
redirect_location,
"https://rt.services.visualstudio.com/QuickPulseService.svc",
)
self.assertEqual(client_mock._base_url, "https://rt.services.visualstudio.com")

def test_get_redirect_location_rejects_spoofed_suffix(self):
"""Attacker uses a domain that contains an allowed suffix but is not actually that domain."""
policy = _QuickpulseRedirectPolicy()
pipeline_resp_mock = mock.Mock()
http_resp_mock = mock.Mock()
# Reject a host like "monitor.azure.com.evil.com": it starts with the allowed-looking
# "monitor.azure.com" string, but the actual hostname is a subdomain of "evil.com".
headers = {"x-ms-qps-service-endpoint-redirect-v2": "https://monitor.azure.com.evil.com/exfiltrate"}
http_resp_mock.headers = headers
pipeline_resp_mock.http_response = http_resp_mock
qp_client_mock = mock.Mock()
client_mock = mock.Mock()
client_mock._base_url = "https://original.livediagnostics.monitor.azure.com"
qp_client_mock._client = client_mock
qp_client_ref = weakref.ref(qp_client_mock)
policy._qp_client_ref = qp_client_ref
redirect_location = policy.get_redirect_location(pipeline_resp_mock)
self.assertIsNone(redirect_location)
self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com")

def test_get_redirect_location_rejects_userinfo_bypass(self):
"""Reject redirect URLs that use userinfo (@) to disguise the real host."""
policy = _QuickpulseRedirectPolicy()
pipeline_resp_mock = mock.Mock()
http_resp_mock = mock.Mock()
# URL with userinfo: urlparse sees "evil.com" as the real host, not the allowed domain.
headers = {
"x-ms-qps-service-endpoint-redirect-v2": "https://eastus.livediagnostics.monitor.azure.com:443@evil.com/exfiltrate"
}
http_resp_mock.headers = headers
pipeline_resp_mock.http_response = http_resp_mock
qp_client_mock = mock.Mock()
client_mock = mock.Mock()
client_mock._base_url = "https://original.livediagnostics.monitor.azure.com"
qp_client_mock._client = client_mock
qp_client_ref = weakref.ref(qp_client_mock)
policy._qp_client_ref = qp_client_ref
redirect_location = policy.get_redirect_location(pipeline_resp_mock)
self.assertIsNone(redirect_location)
self.assertEqual(client_mock._base_url, "https://original.livediagnostics.monitor.azure.com")


class TestIsRedirectTargetAllowed(unittest.TestCase):
def test_allowed_domains(self):
self.assertTrue(_is_redirect_target_allowed("eastus.livediagnostics.monitor.azure.com"))
self.assertTrue(_is_redirect_target_allowed("global.livediagnostics.monitor.azure.com"))
self.assertTrue(_is_redirect_target_allowed("rt.services.visualstudio.com"))
self.assertTrue(_is_redirect_target_allowed("westus.in.applicationinsights.azure.com"))
self.assertTrue(_is_redirect_target_allowed("settings.monitor.azure.com"))
self.assertTrue(_is_redirect_target_allowed("eastus.monitor.azure.us"))
self.assertTrue(_is_redirect_target_allowed("eastus.monitor.azure.cn"))

def test_allowed_domains_with_port(self):
self.assertTrue(_is_redirect_target_allowed("eastus.livediagnostics.monitor.azure.com:443"))

def test_disallowed_domains(self):
self.assertFalse(_is_redirect_target_allowed("evil.attacker.com"))
self.assertFalse(_is_redirect_target_allowed("monitor.azure.com.evil.com"))
self.assertFalse(_is_redirect_target_allowed("localhost"))
self.assertFalse(_is_redirect_target_allowed("192.168.1.1"))
self.assertFalse(_is_redirect_target_allowed("attacker.com"))

def test_disallowed_userinfo_bypass(self):
self.assertFalse(_is_redirect_target_allowed("eastus.livediagnostics.monitor.azure.com:443@evil.com"))
self.assertFalse(_is_redirect_target_allowed("user:pass@evil.com"))
Loading