Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/library/bluetooth.rst
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,19 @@ Pairing and bonding

On successful pairing, the ``_IRQ_ENCRYPTION_UPDATE`` event will be raised.

.. method:: BLE.gap_unpair([addr_type, addr, /])

Remove bond information from persistent storage. If *addr_type* and *addr*
are provided, removes the bond for that specific peer. If called with no
arguments, removes all bonds. The address can be obtained from the
``_IRQ_CENTRAL_CONNECT`` or ``_IRQ_PERIPHERAL_CONNECT`` events.

When removing a specific peer, raises ``OSError(ENOENT)`` if no matching
bond exists.

**Note:** The peer should be disconnected before calling ``gap_unpair``.
Behaviour when the peer is still connected is backend-dependent.

.. method:: BLE.gap_passkey(conn_handle, action, passkey, /)

Respond to a ``_IRQ_PASSKEY_ACTION`` event for the specified *conn_handle*
Expand Down
21 changes: 21 additions & 0 deletions extmod/btstack/modbluetooth_btstack.c
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,27 @@ int mp_bluetooth_gap_pair(uint16_t conn_handle) {
return 0;
}

int mp_bluetooth_gap_unpair(uint8_t addr_type, const uint8_t *addr) {
DEBUG_printf("mp_bluetooth_gap_unpair: addr=%p\n", addr);
int count = le_device_db_max_count();
for (int i = 0; i < count; i++) {
int entry_type;
bd_addr_t entry_addr;
sm_key_t irk;
le_device_db_info(i, &entry_type, entry_addr, irk);
if (entry_type == (int)BD_ADDR_TYPE_UNKNOWN) {
continue;
}
if (addr == NULL || ((int)addr_type == entry_type && memcmp(addr, entry_addr, BD_ADDR_LEN) == 0)) {
le_device_db_remove(i);
if (addr != NULL) {
return 0;
}
}
}
return (addr == NULL) ? 0 : MP_ENOENT;
}

int mp_bluetooth_gap_passkey(uint16_t conn_handle, uint8_t action, mp_int_t passkey) {
DEBUG_printf("mp_bluetooth_gap_passkey: conn_handle=%d action=%d passkey=%d\n", conn_handle, action, (int)passkey);
return MP_EOPNOTSUPP;
Expand Down
31 changes: 31 additions & 0 deletions extmod/modbluetooth.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@
#error pairing and bonding require synchronous modbluetooth events
#endif

// Ensure QSTRs for pairing/bonding features are always extracted even when feature is disabled.
// These are used in config() and method tables when MICROPY_PY_BLUETOOTH_ENABLE_PAIRING_BONDING=1.
// Force QSTR extraction for conditional features - must be outside #ifndef NO_QSTR
static inline void __attribute__((unused)) _force_qstr_extraction(void) {
(void)MP_QSTR_bond;
(void)MP_QSTR_mitm;
(void)MP_QSTR_io;
(void)MP_QSTR_le_secure;
(void)MP_QSTR_gap_pair;
(void)MP_QSTR_gap_passkey;
(void)MP_QSTR_gap_unpair;
}

// NimBLE can have fragmented data for GATTC events, so requires reassembly.
#define MICROPY_PY_BLUETOOTH_USE_GATTC_EVENT_DATA_REASSEMBLY MICROPY_BLUETOOTH_NIMBLE

Expand Down Expand Up @@ -717,6 +730,23 @@ static mp_obj_t bluetooth_ble_gap_pair(mp_obj_t self_in, mp_obj_t conn_handle_in
}
static MP_DEFINE_CONST_FUN_OBJ_2(bluetooth_ble_gap_pair_obj, bluetooth_ble_gap_pair);

static mp_obj_t bluetooth_ble_gap_unpair(size_t n_args, const mp_obj_t *args) {
(void)args[0]; // self
if (n_args == 1) {
// No args: clear all bonds.
return bluetooth_handle_errno(mp_bluetooth_gap_unpair(0, NULL));
}
// Two args: addr_type, addr.
uint8_t addr_type = mp_obj_get_int(args[1]);
mp_buffer_info_t bufinfo = {0};
mp_get_buffer_raise(args[2], &bufinfo, MP_BUFFER_READ);
if (bufinfo.len != 6) {
mp_raise_ValueError(MP_ERROR_TEXT("invalid addr"));
}
return bluetooth_handle_errno(mp_bluetooth_gap_unpair(addr_type, bufinfo.buf));
}
static MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(bluetooth_ble_gap_unpair_obj, 1, 3, bluetooth_ble_gap_unpair);

static mp_obj_t bluetooth_ble_gap_passkey(size_t n_args, const mp_obj_t *args) {
uint16_t conn_handle = mp_obj_get_int(args[1]);
uint8_t action = mp_obj_get_int(args[2]);
Expand Down Expand Up @@ -945,6 +975,7 @@ static const mp_rom_map_elem_t bluetooth_ble_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_gap_disconnect), MP_ROM_PTR(&bluetooth_ble_gap_disconnect_obj) },
#if MICROPY_PY_BLUETOOTH_ENABLE_PAIRING_BONDING
{ MP_ROM_QSTR(MP_QSTR_gap_pair), MP_ROM_PTR(&bluetooth_ble_gap_pair_obj) },
{ MP_ROM_QSTR(MP_QSTR_gap_unpair), MP_ROM_PTR(&bluetooth_ble_gap_unpair_obj) },
{ MP_ROM_QSTR(MP_QSTR_gap_passkey), MP_ROM_PTR(&bluetooth_ble_gap_passkey_obj) },
#endif
// GATT Server
Expand Down
3 changes: 3 additions & 0 deletions extmod/modbluetooth.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ int mp_bluetooth_gap_pair(uint16_t conn_handle);

// Respond to a pairing request.
int mp_bluetooth_gap_passkey(uint16_t conn_handle, uint8_t action, mp_int_t passkey);

// Remove bond for addr, or all bonds if addr is NULL.
int mp_bluetooth_gap_unpair(uint8_t addr_type, const uint8_t *addr);
#endif // MICROPY_PY_BLUETOOTH_ENABLE_PAIRING_BONDING

#if MICROPY_PY_BLUETOOTH_ENABLE_CENTRAL_MODE
Expand Down
39 changes: 39 additions & 0 deletions extmod/nimble/modbluetooth_nimble.c
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,45 @@ int mp_bluetooth_gap_pair(uint16_t conn_handle) {
return ble_hs_err_to_errno(ble_gap_security_initiate(conn_handle));
}

int mp_bluetooth_gap_unpair(uint8_t addr_type, const uint8_t *addr) {
DEBUG_printf("mp_bluetooth_gap_unpair: addr=%p\n", addr);

// Cannot use ble_gap_unpair(), ble_store_util_delete_peer(), or
// ble_store_clear() directly:
//
// All of these ultimately call ble_store_util_delete_all() which loops
// calling store_delete_cb until it returns non-zero. Our callback invokes
// the Python _IRQ_SET_SECRET handler which returns True (success)
// unconditionally — NimBLE has no internal store to deplete, so the loop
// never terminates if a Python IRQ handler is registered.
//
// Instead, notify the Python handler directly to delete bond data,
// bypassing ble_store_util_delete_all's broken loop.

if (addr == NULL) {
// Use a zeroed address (BLE_ADDR_ANY) to signal "delete all" to the
// Python handler.
ble_addr_t any = {0};
mp_bluetooth_gap_on_set_secret(BLE_STORE_OBJ_TYPE_OUR_SEC, (const uint8_t *)&any, sizeof(ble_addr_t), NULL, 0);
mp_bluetooth_gap_on_set_secret(BLE_STORE_OBJ_TYPE_PEER_SEC, (const uint8_t *)&any, sizeof(ble_addr_t), NULL, 0);
return 0;
}

ble_addr_t ble_addr;
ble_addr.type = addr_type;
memcpy(ble_addr.val, addr, 6);

// Remove from controller resolving list (best-effort, ignore errors).
ble_hs_pvcy_remove_entry(addr_type, addr);

// Notify Python handler to delete persistent bond data.
// Track whether either secret type had a matching bond.
bool deleted = false;
deleted |= mp_bluetooth_gap_on_set_secret(BLE_STORE_OBJ_TYPE_OUR_SEC, (const uint8_t *)&ble_addr, sizeof(ble_addr_t), NULL, 0);
deleted |= mp_bluetooth_gap_on_set_secret(BLE_STORE_OBJ_TYPE_PEER_SEC, (const uint8_t *)&ble_addr, sizeof(ble_addr_t), NULL, 0);
return deleted ? 0 : MP_ENOENT;
}

int mp_bluetooth_gap_passkey(uint16_t conn_handle, uint8_t action, mp_int_t passkey) {
struct ble_sm_io io = {0};

Expand Down
1 change: 1 addition & 0 deletions tests/multi_bluetooth/ble_gap_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,5 @@ def instance1():
ble = bluetooth.BLE()
ble.config(mitm=True, le_secure=True, bond=False)
ble.active(1)
ble.gap_unpair() # Clear stale bonds/CCC from persistent storage
ble.irq(irq)
1 change: 1 addition & 0 deletions tests/multi_bluetooth/ble_gap_pair_bond.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,5 @@ def instance1():
ble = bluetooth.BLE()
ble.config(mitm=True, le_secure=True, bond=True)
ble.active(1)
ble.gap_unpair() # Clear stale bonds/CCC from persistent storage
ble.irq(irq)
140 changes: 140 additions & 0 deletions tests/multi_bluetooth/ble_gap_unpair.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Test BLE gap_unpair: clear-all (no args) and per-address bond removal.
#
# Both instances call gap_unpair() with no args at startup to clear all bonds,
# then pair with bonding, disconnect, and unpair the specific peer by address.

from micropython import const
import time, machine, bluetooth

if not hasattr(bluetooth.BLE, "gap_unpair"):
print("SKIP")
raise SystemExit

TIMEOUT_MS = 5000

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_SET_SECRET = const(30)

_FLAG_READ = const(0x0002)
_FLAG_READ_ENCRYPTED = const(0x0200)

SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (CHAR_UUID, _FLAG_READ | _FLAG_READ_ENCRYPTED)
SERVICE = (SERVICE_UUID, (CHAR,))

waiting_events = {}


def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = (data[0], data[1], bytes(data[2]))
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_READ_REQUEST:
# Don't print here - print after wait_for_event for consistent ordering.
pass
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = (data[0], data[1], bytes(data[2]))
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_READ_RESULT:
print("_IRQ_GATTC_READ_RESULT", bytes(data[-1]))
elif event == _IRQ_ENCRYPTION_UPDATE:
print("_IRQ_ENCRYPTION_UPDATE", data[1], data[2], data[3])
elif event == _IRQ_SET_SECRET:
return True

if event not in waiting_events:
waiting_events[event] = None


def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))


# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services((SERVICE,))
ble.gatts_write(char_handle, "encrypted")

print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect and pair.
conn_handle, addr_type, addr = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
wait_for_event(_IRQ_ENCRYPTION_UPDATE, TIMEOUT_MS)
wait_for_event(_IRQ_GATTS_READ_REQUEST, TIMEOUT_MS)
print("_IRQ_GATTS_READ_REQUEST")
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)

# Unpair the central by address (tests per-address removal).
print("gap_unpair_addr")
ble.gap_unpair(addr_type, addr)
print("gap_unpair_addr done")
finally:
ble.active(0)


# Acting in central role.
def instance1():
multitest.next()
try:
# Connect, pair, read encrypted char, disconnect.
print("gap_connect")
ble.gap_connect(*BDADDR)
conn_handle, addr_type, addr = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)

ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)

print("gap_pair")
ble.gap_pair(conn_handle)
wait_for_event(_IRQ_ENCRYPTION_UPDATE, TIMEOUT_MS)

print("gattc_read")
ble.gattc_read(conn_handle, value_handle)
wait_for_event(_IRQ_GATTC_READ_RESULT, TIMEOUT_MS)

print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)

# Unpair the peripheral by address (tests per-address removal on central).
print("gap_unpair_addr")
ble.gap_unpair(addr_type, addr)
print("gap_unpair_addr done")
finally:
ble.active(0)


ble = bluetooth.BLE()
ble.config(mitm=True, le_secure=True, bond=True)
ble.active(1)
ble.gap_unpair() # Clear all bonds at start (tests no-arg form).
ble.irq(irq)
21 changes: 21 additions & 0 deletions tests/multi_bluetooth/ble_gap_unpair.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
--- instance0 ---
gap_advertise
_IRQ_CENTRAL_CONNECT
_IRQ_ENCRYPTION_UPDATE 1 0 1
_IRQ_GATTS_READ_REQUEST
_IRQ_CENTRAL_DISCONNECT
gap_unpair_addr
gap_unpair_addr done
--- instance1 ---
gap_connect
_IRQ_PERIPHERAL_CONNECT
_IRQ_GATTC_CHARACTERISTIC_RESULT UUID('00000000-1111-2222-3333-444444444444')
_IRQ_GATTC_CHARACTERISTIC_DONE
gap_pair
_IRQ_ENCRYPTION_UPDATE 1 0 1
gattc_read
_IRQ_GATTC_READ_RESULT b'encrypted'
gap_disconnect: True
_IRQ_PERIPHERAL_DISCONNECT
gap_unpair_addr
gap_unpair_addr done
1 change: 1 addition & 0 deletions tests/multi_bluetooth/ble_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,5 @@ def instance1():

ble = bluetooth.BLE()
ble.active(1)
ble.gap_unpair() # Clear stale bonds/CCC from persistent storage
ble.irq(irq)
Loading