|
18 | 18 | _ESCAPED_PERCENT_MARKER = "__MSSQL_PYFORMAT_ESCAPED_PERCENT_PLACEHOLDER__" |
19 | 19 |
|
20 | 20 |
|
| 21 | +def _skip_quoted_context(sql: str, i: int, length: int) -> int: |
| 22 | + """ |
| 23 | + If position i starts a SQL quoted context, skip past it and return the new position. |
| 24 | + Returns -1 if no quoted context starts at position i. |
| 25 | +
|
| 26 | + Handles: |
| 27 | + - Single-line comments: -- ... (to end of line) |
| 28 | + - Multi-line comments: /* ... */ (to closing delimiter) |
| 29 | + - Single-quoted string literals: '...' (with '' escape handling) |
| 30 | + - Double-quoted identifiers: "..." |
| 31 | + - Bracketed identifiers: [...] |
| 32 | +
|
| 33 | + Args: |
| 34 | + sql: Full SQL query string |
| 35 | + i: Current scan position |
| 36 | + length: Length of sql (len(sql)) |
| 37 | +
|
| 38 | + Returns: |
| 39 | + New position after the quoted context, or -1 if position i |
| 40 | + does not start a quoted context. |
| 41 | + """ |
| 42 | + ch = sql[i] |
| 43 | + |
| 44 | + # Single-line comment: skip to end of line |
| 45 | + if ch == "-" and i + 1 < length and sql[i + 1] == "-": |
| 46 | + i += 2 |
| 47 | + while i < length and sql[i] != "\n": |
| 48 | + i += 1 |
| 49 | + return i |
| 50 | + |
| 51 | + # Multi-line comment: skip to closing */ |
| 52 | + if ch == "/" and i + 1 < length and sql[i + 1] == "*": |
| 53 | + i += 2 |
| 54 | + while i + 1 < length and not (sql[i] == "*" and sql[i + 1] == "/"): |
| 55 | + i += 1 |
| 56 | + return min(i + 2, length) # skip past */, or to end if unterminated |
| 57 | + |
| 58 | + # Single-quoted string literal: skip to closing ' |
| 59 | + # Handles escaped quotes ('') inside strings |
| 60 | + if ch == "'": |
| 61 | + i += 1 |
| 62 | + while i < length: |
| 63 | + if sql[i] == "'": |
| 64 | + if i + 1 < length and sql[i + 1] == "'": |
| 65 | + i += 2 # skip escaped quote |
| 66 | + continue |
| 67 | + break |
| 68 | + i += 1 |
| 69 | + return i + 1 # skip closing quote |
| 70 | + |
| 71 | + # Double-quoted identifier: skip to closing " |
| 72 | + if ch == '"': |
| 73 | + i += 1 |
| 74 | + while i < length and sql[i] != '"': |
| 75 | + i += 1 |
| 76 | + return i + 1 # skip closing quote |
| 77 | + |
| 78 | + # Bracketed identifier: skip to closing ] |
| 79 | + # Handles escaped brackets (]]) inside identifiers |
| 80 | + if ch == "[": |
| 81 | + i += 1 |
| 82 | + while i < length: |
| 83 | + if sql[i] == "]": |
| 84 | + if i + 1 < length and sql[i + 1] == "]": |
| 85 | + i += 2 # skip escaped bracket |
| 86 | + continue |
| 87 | + break |
| 88 | + i += 1 |
| 89 | + return min(i + 1, length) # skip closing bracket |
| 90 | + |
| 91 | + return -1 |
| 92 | + |
| 93 | + |
| 94 | +def _has_unquoted_question_marks(sql: str) -> bool: |
| 95 | + """ |
| 96 | + Check if SQL contains ? characters that are actual qmark parameter placeholders. |
| 97 | +
|
| 98 | + Uses _skip_quoted_context to skip ? characters that appear inside |
| 99 | + bracketed identifiers, string literals, quoted identifiers, and comments. |
| 100 | +
|
| 101 | + Args: |
| 102 | + sql: SQL query string to check |
| 103 | +
|
| 104 | + Returns: |
| 105 | + True if the SQL contains at least one unquoted/unbracketed ? character |
| 106 | +
|
| 107 | + Examples: |
| 108 | + >>> _has_unquoted_question_marks("SELECT * FROM t WHERE id = ?") |
| 109 | + True |
| 110 | +
|
| 111 | + >>> _has_unquoted_question_marks("SELECT [q?marks] FROM t") |
| 112 | + False |
| 113 | +
|
| 114 | + >>> _has_unquoted_question_marks("SELECT 'what?' FROM t") |
| 115 | + False |
| 116 | + """ |
| 117 | + i = 0 |
| 118 | + length = len(sql) |
| 119 | + |
| 120 | + while i < length: |
| 121 | + # Skip any quoted context (brackets, strings, comments) |
| 122 | + skipped = _skip_quoted_context(sql, i, length) |
| 123 | + if skipped >= 0: |
| 124 | + i = skipped |
| 125 | + continue |
| 126 | + |
| 127 | + # Unquoted question mark — this is a real placeholder |
| 128 | + if sql[i] == "?": |
| 129 | + return True |
| 130 | + |
| 131 | + i += 1 |
| 132 | + |
| 133 | + return False |
| 134 | + |
| 135 | + |
21 | 136 | def parse_pyformat_params(sql: str) -> List[str]: |
22 | 137 | """ |
23 | 138 | Extract %(name)s parameter names from SQL string. |
@@ -317,7 +432,9 @@ def detect_and_convert_parameters( |
317 | 432 | ) |
318 | 433 |
|
319 | 434 | # Check if SQL appears to have qmark placeholders |
320 | | - if "?" in sql and not parse_pyformat_params(sql): |
| 435 | + # Fast short-circuit: skip the O(n) context-aware scan if no ? exists at all |
| 436 | + # Then use context-aware check that ignores ? inside brackets, quotes, and comments |
| 437 | + if "?" in sql and _has_unquoted_question_marks(sql) and not parse_pyformat_params(sql): |
321 | 438 | logger.error( |
322 | 439 | "detect_and_convert_parameters: Parameter style mismatch - SQL has ? placeholders but received dict" |
323 | 440 | ) |
|
0 commit comments