diff --git a/CMakeLists.txt b/CMakeLists.txt index cc82669f..4468f9ed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -691,6 +691,11 @@ install(FILES ug31.pdf DESTINATION share/DSView RENAME ug31.pdf) install(DIRECTORY libsigrokdecode4DSL/decoders DESTINATION share/libsigrokdecode4DSL) install(DIRECTORY lang DESTINATION share/DSView) +#=============================================================================== +#= dsview-cli (headless command-line tool) +#------------------------------------------------------------------------------- +add_subdirectory(cli) + #=============================================================================== #= Packaging (handled by CPack) #------------------------------------------------------------------------------- @@ -705,6 +710,11 @@ set(CPACK_SOURCE_PACKAGE_FILE_NAME "${CMAKE_PROJECT_NAME}-${DS_VERSION_MAJOR}.${DS_VERSION_MINOR}.${DS_VERSION_MICRO}") set(CPACK_SOURCE_GENERATOR "TGZ") + +SET(CPACK_GENERATOR "DEB") +SET(CPACK_DEBIAN_PACKAGE_MAINTAINER "Nishanth Menon ") +SET(CPACK_DEBIAN_PACKAGE_DEPENDS "git, gcc, g++, make, cmake, qtbase5-dev, qt5-qmake, libglib2.0-dev, zlib1g-dev, libusb-1.0-0-dev, libboost-dev, libfftw3-dev, python3-dev, libudev-dev, pkg-config") + include(CPack) #=============================================================================== diff --git a/cli/CLAUDE.md b/cli/CLAUDE.md new file mode 100644 index 00000000..e83e3486 --- /dev/null +++ b/cli/CLAUDE.md @@ -0,0 +1,462 @@ +# DSView CLI -- Claude Code Instructions + +This file is read automatically by Claude Code when working in this directory. + +## What this directory provides + +Seven MCP tools (registered under the `dsview` server) that let Claude +control DreamSourceLab logic analyzers and oscilloscopes over USB: + +| Tool | One-line purpose | +|------|-----------------| +| `scan_devices` | List all connected instruments | +| `device_info` | Get model, version, channel count, channel modes, device mode | +| `capture_logic` | Record a logic capture with named channels and trigger | +| `capture_dso` | Record an oscilloscope capture with vdiv, coupling, probe config | +| `signal_summary` | High-level activity report (logic or voltage stats) | +| `decode_capture` | Per-sample waveform and edge list (auto-detects mode) | +| `decode_analog` | Per-sample voltage waveform and statistics for DSO captures | + +--- + +## When to use logic analyzer vs oscilloscope + +**Use Logic Analyzer (`capture_logic`)** for: +- Protocol decoding: I2C addresses/data, SPI transactions, UART frames +- Monitoring many signals at once (up to 16 channels) +- Digital state verification: GPIO toggling, interrupt timing, bus arbitration +- Any task where you need **protocol content** (what was sent on the bus) +- Export to `.sr` format for sigrok protocol decoders + +**Use Oscilloscope (`capture_dso`)** for: +- Measuring voltage levels, amplitude, Vpp, DC offset +- Analog timing compliance: rise/fall time at VIL/VIH thresholds +- I2C/SPI setup time, hold time, bus frequency from analog waveforms +- Signal integrity: overshoot, undershoot, ringing, noise margin +- RC charge curves on open-drain buses (I2C pull-up characterization) +- Power rail analysis: ripple measurement with AC coupling + +**Key distinction**: Protocol decoders (sigrok) work on **digital** signals +from the logic analyzer -- they tell you *what* was communicated. Analog +timing compliance requires the **oscilloscope** -- it tells you *how well* +the electrical signals meet spec (rise/fall time at VIL/VIH, setup/hold +time, signal integrity). For complete bus analysis, use **both** instruments. + +--- + +## Device mode detection + +DreamSourceLab instruments operate in one of three modes: + +| Mode | `mode_name` | Capture tool | Decode tool | +|------|-------------|--------------|-------------| +| Logic analyzer | `LOGIC` | `capture_logic()` | `decode_capture()` | +| Oscilloscope (buffered) | `DSO` | `capture_dso()` | `decode_analog()` | +| Oscilloscope (streaming) | `ANALOG` | `capture_dso()` | `decode_analog()` | + +**Always check `device_info()` first** -- the `mode_name` field tells you +which capture tool to use. Using the wrong tool will fail. + +`decode_capture()` auto-detects DSO files and redirects to `decode_analog()`. + +--- + +## Logic analyzer workflow + +``` +scan_devices() # 1. discover what is plugged in +device_info(device_index=N) # 2. confirm mode_name='LOGIC', read channel_modes +capture_logic(...) # 3. record digital signals +signal_summary(file_path) # 4. quick overview (transitions, frequency) +decode_capture(file_path, ...) # 5. detailed waveform / edge analysis +``` + +**Never call `capture_logic` before checking `device_info`.** +The `channel_modes` field in `device_info` output is the authoritative +table of how many channels can be active at each maximum samplerate. + +--- + +## Oscilloscope workflow + +``` +scan_devices() # 1. discover instruments +device_info(device_index=N) # 2. confirm mode_name='DSO', see analog_channels +capture_dso(...) # 3. record analog waveform +signal_summary(file_path) # 4. voltage stats (min/max/Vpp/Vrms/freq) +decode_analog(file_path, ...) # 5. per-sample voltage waveform +``` + +**Never call `capture_dso` before checking `device_info`.** +The `analog_channels` field shows current vdiv, coupling, and probe settings. + +--- + +## Voltage division and probe factor (DSO) + +The oscilloscope uses an 8-bit ADC (0-255). The voltage conversion formula: + +``` +full_scale_mV = vdiv_mV * probe_factor * 10 (10 vertical divisions) +voltage_mV = (hw_offset - raw_sample) * full_scale_mV / 255 +``` + +### vdiv (voltage per division) + +Sets the vertical scale. Available options: 10, 20, 50, 100, 200, 500, 1000, 2000 mV. + +Choose vdiv so the expected signal fills most of the screen: +- 3.3V logic signal: `vdiv=500` gives +/-2.5V range +- 1.8V signal: `vdiv=200` gives +/-1.0V range +- 12V power rail: `vdiv=2000` with 10x probe gives +/-100V range + +```python +capture_dso(vdiv="500", ...) # both channels at 500 mV/div +capture_dso(vdiv="0:500,1:1000", ...) # CH0=500, CH1=1000 mV/div +``` + +### probe_factor + +Must match your physical probe attenuation: 1 (1x), 2 (2x), 10 (10x), 20 (20x). + +```python +capture_dso(probe_factor="10", ...) # both channels 10x probe +capture_dso(probe_factor="0:1,1:10", ...) # CH0=1x, CH1=10x +``` + +--- + +## Coupling modes (DSO) + +| Mode | Description | Use when | +|------|-------------|----------| +| `DC` | Full signal passes through | Measuring DC levels, digital signals | +| `AC` | DC component is blocked | Measuring AC ripple on a DC rail | + +```python +capture_dso(coupling="DC", ...) # both channels DC +capture_dso(coupling="0:DC,1:AC", ...) # CH0=DC, CH1=AC +``` + +--- + +## DSO trigger modes + +| `trigger_type` | Description | +|----------------|-------------| +| `none` | Free-run / auto trigger (default) | +| `rising` | Trigger on rising edge | +| `falling` | Trigger on falling edge | + +- `trigger_channel`: 0 or 1 (must be an enabled channel), -1 for auto +- `trigger_pos`: 0-100, percentage of window before trigger (50 = centre) + +```python +capture_dso( + trigger_channel=0, + trigger_type="rising", + trigger_pos=10, # 10% pre-trigger, 90% post-trigger + ... +) +``` + +--- + +## Channel modes (channels vs max samplerate) + +The DSLogic U3Pro16 (SuperSpeed USB) channel mode table from `device_info`: + +| Active channels | Max samplerate | +|:-:|:-:| +| 16 | 125 MHz | +| 12 | 250 MHz | +| 6 | 500 MHz | +| 3 | 1 GHz | + +Other DreamSourceLab models have different tables -- always read +`channel_modes` from `device_info` for the attached device. + +Example: to capture at 500 MHz you must limit to 6 or fewer active channels: +```python +capture_logic(channels="0-5", samplerate="500M", ...) +``` + +--- + +## Capture duration + +Instead of specifying `num_samples`, you can specify `duration` with +a time suffix. The sample count is computed automatically from +`samplerate * duration`. + +Supported suffixes: `s`, `ms`, `us`, `ns`. + +Works with both `capture_logic()` and `capture_dso()`. + +```python +# Logic: Capture 100 ms of I2C at 10 MHz +capture_logic( + channels="0,1", + channel_names="SDA,SCL", + samplerate="10M", + duration="100ms", +) + +# DSO: Capture 50 ms at 10 MHz +capture_dso( + channels="0", + channel_names="VOUT", + samplerate="10M", + duration="50ms", + vdiv="500", +) +``` + +When `duration` is provided, `num_samples` is ignored. The result +includes `requested_duration` and `computed_num_samples` fields. + +--- + +## Multi-device support + +When multiple instruments are connected simultaneously: + +1. `scan_devices()` returns each with a unique `index`. +2. Pass `device_index=N` to `device_info`, `capture_logic`, or `capture_dso`. +3. Each device is independent -- you can run captures on different devices + within the same session. + +Example with a logic analyzer and an oscilloscope: +``` +scan_devices() +# returns [{index:0, name:"DSLogic U3Pro16"}, {index:1, name:"DSCope U3P100"}] + +device_info(device_index=0) # mode_name='LOGIC' +capture_logic(device_index=0, ...) + +device_info(device_index=1) # mode_name='DSO' +capture_dso(device_index=1, ...) +``` + +--- + +## Naming channels + +Always provide `channel_names` when the signals have meaningful roles. +The names are stored in the `.meta.json` sidecar and appear in all +subsequent analysis tool outputs. + +```python +# Logic analyzer +capture_logic( + channels="0,1,2,3", + channel_names="SDA,SCL,TX,RX", + ... +) + +# Oscilloscope +capture_dso( + channels="0,1", + channel_names="VOUT,GND_REF", + ... +) +``` + +Names must be in the **same order** as the channel indices in `channels`. + +--- + +## Voltage threshold (logic analyzer only) + +Pro devices (e.g. DSLogic U3Pro16) support variable input voltage +thresholds via `voltage_threshold` in `capture_logic()`. The default +is 1.0V (set by the device firmware). + +Common values: +- `0.8` -- LVCMOS 1.2V/1.5V signals +- `1.0` -- default, suitable for 1.8V and above +- `1.2` -- 2.5V LVCMOS +- `1.5` -- 3.3V LVCMOS +- `2.5` -- 5V TTL + +Check `device_info()` output for the current `vth` value. + +```python +capture_logic( + channels="0,1", + channel_names="SDA,SCL", + voltage_threshold=1.8, + ... +) +``` + +--- + +## Triggers (logic analyzer) + +- `trigger_channel` must be one of the physical indices listed in `channels`. +- `trigger_type`: `none` (free-run), `rising`, `falling`, `high`, `low`. +- `trigger_pos` (0-100): percentage of samples captured **before** the + trigger event. Default 50 centres the trigger in the window. + Use a low value (e.g. 10) to see mostly post-trigger data. + +--- + +## Output files + +Every `capture_logic` and `capture_dso` call produces two files: + +``` +.bin raw binary sample data +.bin.meta.json samplerate, channel map, trigger config, mode +``` + +The metadata sidecar includes a `"mode"` field (`"logic"`, `"dso"`, or +`"analog"`) so analysis tools automatically choose the right decoder. + +Pass `out_file` explicitly when you want a predictable path, otherwise a +temp file under `/tmp/dsview_*.bin` is created automatically. + +Both `signal_summary` and `decode_capture`/`decode_analog` require the +`.bin` path; they locate the sidecar automatically. + +--- + +## Output format + +Tool results are **TOON** (Terse Object-Oriented Notation) by default, +which is more compact than JSON. Use `toon.decode(result)` in Python +to convert back to a dict when you need to inspect values programmatically. + +--- + +## Export formats + +| Format | Logic | DSO | Notes | +|--------|:-----:|:---:|-------| +| `bin` | Yes | Yes | Native binary + .meta.json (default) | +| `csv` | Yes | Yes | Logic: 0/1 values. DSO: voltage in mV with time column | +| `sr` | Yes | Yes | Sigrok session ZIP. DSO: 32-bit float voltages | +| `vcd` | Yes | Yes* | VCD is digital. DSO signals thresholded at midpoint | +| `sigrok-binary` | Yes | Yes | Raw bytes without header | + +*DSO VCD export thresholds analog signals at the ADC midpoint to produce +digital waveforms. Use CSV for actual voltage values. + +--- + +## Protocol decoding with sigrok + +For protocol-level analysis (I2C, SPI, UART, etc.), use the sigrok MCP +server's `decode_protocol` tool on exported captures. The recommended +workflow: + +1. Capture with `out_format="sr"` (or `"vcd"`) and meaningful channel names. +2. Use `signal_summary()` to confirm channels have activity. +3. Use sigrok's `decode_protocol()` for protocol analysis. + +```python +# Step 1: Capture I2C with .sr export +result = capture_logic( + channels="0,1", + channel_names="SDA,SCL", + samplerate="10M", + out_format="sr", +) + +# Step 2: Decode with sigrok +decode_protocol( + input_file=result["export_file"], + protocol_decoders="i2c:scl=SCL:sda=SDA", +) +``` + +Common protocol decoder strings: +- **I2C**: `i2c:scl=SCL:sda=SDA` +- **SPI**: `spi:clk=CLK:mosi=MOSI:miso=MISO:cs=CS` +- **UART**: `uart:rx=RX:baudrate=115200` +- **1-Wire**: `onewire_link:owr=OWR` + +The `capture_logic()` return value includes a `decode_hint` field with +auto-detected protocol suggestions based on channel names. + +**Samplerate guidelines for protocol decoding:** +- I2C (100kHz-400kHz bus): 1-10 MHz samplerate +- SPI (1-50 MHz clock): 10-100 MHz samplerate (>= 4x clock frequency) +- UART (9600-115200 baud): 1-10 MHz samplerate +- UART (1-3 Mbaud): 10-50 MHz samplerate + +--- + +## Samplerate optimisation (Nyquist hints) + +`signal_summary()` automatically analyses captured signals and provides +samplerate recommendations based on Nyquist sampling theory. This works +for both logic and DSO captures. + +- **Per-channel fields** (when frequency is detected): + - `min_samplerate_hz` -- 4x measured frequency (absolute minimum) + - `rec_samplerate_hz` -- 10x measured frequency (recommended) + - `oversampling_ratio` -- current samplerate / signal frequency + - `hint` -- textual recommendation when oversampling > 20x + +- **Top-level `samplerate_hint`** when the capture is heavily oversampled. + +**Recommended workflow for optimising capture duration:** +1. Do an initial capture at a generous samplerate (e.g. 10 MHz) +2. Run `signal_summary()` to see actual signal frequencies +3. Check the `samplerate_hint` -- if oversampled, re-capture at the + recommended rate to get a proportionally longer capture window +4. Use `device_info()` `capture_budget` table to see max duration at + the recommended rate + +Example: if `signal_summary()` reports an I2C bus at ~400 kHz captured +at 100 MHz (250x oversample), it recommends 4 MHz (10x Nyquist). +Switching to 4 MHz allows 25x longer captures at the same memory depth. + +--- + +## Build / environment + +The tools require the compiled `dsview-cli` binary. Build from the +DSView root directory: + +```sh +cd /path/to/DSView +mkdir -p build && cd build && cmake .. && make -j$(nproc) +``` + +This builds both the DSView GUI and `dsview-cli`. The binary is placed +in `build.dir/dsview-cli` alongside the DSView GUI binary. + +Firmware files live in `DSView/res/` and are located automatically +at runtime relative to the binary location. The system install path +`/usr/share/DSView/res/` is used as a fallback. + +--- + +## Key implementation files + +| File | Role | +|------|------| +| `cli/dsview_mcp.py` | MCP server (Python, FastMCP) -- registered in `~/.claude.json` | +| `cli/dsview_cli.c` | C bridge -- `scan` / `info` / `capture` subcommands, JSON output | +| `cli/CMakeLists.txt` | Builds `dsview-cli`; links against DSView's libsigrok4DSL sources | +| `DSView/res/` | FPGA firmware loaded via libsigrok4DSL on device open | + +--- + +## MCP server registration + +Update your `~/.claude.json` to point to the new location: + +```json +{ + "mcpServers": { + "dsview": { + "command": "python3", + "args": ["/path/to/DSView/cli/dsview_mcp.py"] + } + } +} +``` diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt new file mode 100644 index 00000000..f60a9462 --- /dev/null +++ b/cli/CMakeLists.txt @@ -0,0 +1,80 @@ +## +## cli/CMakeLists.txt - Build dsview-cli (headless command-line bridge) +## +## This target reuses libsigrok4DSL and common sources from the parent +## DSView tree but does NOT require Qt, Boost, or FFTW. +## + +# The parent-scope source lists use paths relative to CMAKE_SOURCE_DIR. +# CMake resolves source paths relative to the current source directory, +# so we must prepend CMAKE_SOURCE_DIR to each entry. +set(_cli_lib_sources "") +foreach(_src ${libsigrok4DSL_SOURCES}) + list(APPEND _cli_lib_sources "${CMAKE_SOURCE_DIR}/${_src}") +endforeach() + +set(_cli_common_sources "") +foreach(_src ${common_SOURCES}) + list(APPEND _cli_common_sources "${CMAKE_SOURCE_DIR}/${_src}") +endforeach() + +add_executable(dsview-cli + dsview_cli.c + ${_cli_lib_sources} + ${_cli_common_sources} +) + +target_include_directories(dsview-cli PRIVATE + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/libsigrok4DSL + ${CMAKE_SOURCE_DIR}/libsigrokdecode4DSL + ${CMAKE_SOURCE_DIR}/common + ${GLIB_INCLUDE_DIRS} + ${LIBUSB_1_INCLUDE_DIRS} + ${ZLIB_INCLUDE_DIRS} +) + +# Python3 include dirs - handle both find_package(Python3) and find_package(PythonLibs) +if(Python3_FOUND) + target_include_directories(dsview-cli PRIVATE ${Python3_INCLUDE_DIRS}) +elseif(PYTHONLIBS_FOUND) + target_include_directories(dsview-cli PRIVATE ${PYTHON_INCLUDE_DIRS}) +endif() + +target_compile_definitions(dsview-cli PRIVATE + HAVE_LIBUSB_1_0=1 + HAVE_DSL_DEVICE=1 + HAVE_LA_DEMO=1 +) + +if(CMAKE_SYSTEM_NAME MATCHES "Linux") + target_compile_definitions(dsview-cli PRIVATE _DEFAULT_SOURCE) +endif() + +target_compile_options(dsview-cli PRIVATE + -std=c99 + -Wno-unused-result + -Wno-unused-variable + -Wno-unused-function + -Wno-implicit-function-declaration +) + +target_link_libraries(dsview-cli + -lz + -lglib-2.0 + -lm + ${CMAKE_THREAD_LIBS_INIT} + ${LIBUSB_1_LIBRARIES} + ${PY_LIB} +) + +# Place dsview-cli alongside the DSView GUI binary +set_target_properties(dsview-cli PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_SOURCE_DIR}/build.dir" +) + +install(TARGETS dsview-cli DESTINATION ${CMAKE_INSTALL_BINDIR}) + +# Install MCP server and Python dependencies alongside the binary +install(FILES dsview_mcp.py requirements.txt + DESTINATION share/DSView/cli) diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 00000000..d05b53e4 --- /dev/null +++ b/cli/README.md @@ -0,0 +1,159 @@ +# DSView CLI -- Headless MCP Agent + +A headless command-line bridge and [Model Context Protocol][mcp] (MCP) server +for DreamSourceLab USB instruments (DSLogic logic analyzers, DSCope +oscilloscopes). This lets AI coding assistants such as [Claude Code][cc] +directly control the hardware for automated signal capture, protocol +decoding, and analog timing analysis. + +[mcp]: https://modelcontextprotocol.io/ +[cc]: https://docs.anthropic.com/en/docs/claude-code + +## What is included + +| File | Purpose | +|------|---------| +| `dsview_cli.c` | C bridge -- `scan`, `info`, `capture` subcommands (JSON output) | +| `dsview_mcp.py` | Python MCP server wrapping the CLI (7 tools for Claude) | +| `CMakeLists.txt` | Builds `dsview-cli`; links against DSView's libsigrok4DSL | +| `requirements.txt` | Python runtime dependencies | +| `CLAUDE.md` | Context file read automatically by Claude Code | + +## Prerequisites + +### Build dependencies + +Everything the main DSView GUI needs, minus the Qt/Boost/FFTW +libraries (the CLI does not use them): + +``` +sudo apt install build-essential cmake pkg-config \ + libglib2.0-dev libusb-1.0-0-dev zlib1g-dev python3-dev +``` + +### Python dependencies + +``` +pip install -r cli/requirements.txt +``` + +This installs: + +- `mcp` (>= 1.8.1) -- the MCP SDK (FastMCP is bundled inside) +- `python-toon` (>= 0.1.3) -- compact serializer (optional, falls back to JSON) + +## Building + +From the DSView repository root: + +```bash +mkdir -p build && cd build +cmake .. +make dsview-cli -j$(nproc) +``` + +The binary is placed in `build.dir/dsview-cli` alongside the DSView GUI. + +To build everything (GUI + CLI): + +```bash +make -j$(nproc) +``` + +### Verify the build + +```bash +./build.dir/dsview-cli scan +``` + +This should print JSON listing connected DreamSourceLab devices (or an +empty list if none are plugged in). + +## Registering the MCP server + +### Claude Code + +Add the following to `~/.claude.json` (create the file if it does not +exist): + +```json +{ + "mcpServers": { + "dsview": { + "command": "python3", + "args": ["/absolute/path/to/DSView/cli/dsview_mcp.py"] + } + } +} +``` + +Replace the path with the actual location of your DSView checkout. +Restart Claude Code after editing the config. + +### Other MCP clients + +Any MCP-compatible client can use the server. The transport is +**stdio** -- launch `python3 cli/dsview_mcp.py` and communicate over +stdin/stdout using the MCP JSON-RPC protocol. + +## Available MCP tools + +| Tool | Description | +|------|-------------| +| `scan_devices` | List all connected DreamSourceLab instruments | +| `device_info` | Query model, channels, samplerate limits, analog config | +| `capture_logic` | Record digital signals with named channels and trigger | +| `capture_dso` | Record analog waveforms with vdiv, coupling, probe config | +| `signal_summary` | Per-channel activity report (transitions, voltage stats) | +| `decode_capture` | Per-sample digital waveform and edge list | +| `decode_analog` | Per-sample voltage waveform and statistics | + +## Quick start + +Once the MCP server is registered, ask Claude Code: + +``` +> Scan for connected instruments +> Show me the device info for the logic analyzer +> Capture 100ms of I2C on channels 0,1 at 10MHz and decode it +> Capture an analog waveform on channel 0 at 100MHz with 500mV/div +``` + +### Typical logic analyzer workflow + +1. `scan_devices()` -- discover connected instruments +2. `device_info(device_index=N)` -- confirm mode is LOGIC, read channel modes +3. `capture_logic(channels="0,1", channel_names="SDA,SCL", samplerate="10M", out_format="sr")` -- capture +4. `signal_summary(file_path)` -- quick overview +5. Use sigrok for protocol decode: `sigrok-cli -i file.sr -P i2c:scl=SCL:sda=SDA` + +### Typical oscilloscope workflow + +1. `scan_devices()` -- discover instruments +2. `device_info(device_index=N)` -- confirm mode is DSO +3. `capture_dso(channels="0,1", channel_names="SCL,SDA", samplerate="100M", vdiv="500")` -- capture +4. `signal_summary(file_path)` -- voltage statistics (min/max/Vpp/Vrms) +5. `decode_analog(file_path)` -- per-sample voltage waveform + +## When to use logic analyzer vs oscilloscope + +**Logic analyzer** (`capture_logic`): protocol decoding (I2C, SPI, UART), +monitoring many digital signals, verifying bus transactions. Tells you +*what* was communicated. + +**Oscilloscope** (`capture_dso`): voltage measurements, rise/fall time, +setup/hold time at VIL/VIH thresholds, signal integrity analysis. Tells +you *how well* the electrical signals meet spec. + +For complete bus analysis, use both instruments simultaneously (they +operate independently via `device_index`). + +## Firmware + +DSView instruments require FPGA firmware files at runtime. These are +located automatically from `DSView/res/` (relative to the binary) or +from the system install path `/usr/share/DSView/res/`. + +## License + +Same as DSView -- GPLv3+. See the top-level [LICENSE](../COPYING) file. diff --git a/cli/dsview_cli.c b/cli/dsview_cli.c new file mode 100644 index 00000000..a9b613ea --- /dev/null +++ b/cli/dsview_cli.c @@ -0,0 +1,1378 @@ +/* + * dsview_cli.c - Command-line bridge to libsigrok4DSL for DreamSourceLab devices. + * + * Subcommands: + * scan - list connected devices as JSON + * info - print device capabilities as JSON + * capture - capture logic data to binary file, print JSON status + * + * Trigger values passed to ds_trigger_probe_set(): + * 'X' = don't care 'R' = rising 'F' = falling '1' = high '0' = low + */ + +/* Feature-test macro: expose POSIX + GNU extensions (readlink, clock_gettime, + * CLOCK_REALTIME, strcasecmp, ETIMEDOUT). Must precede any #include. */ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "libsigrok4DSL/libsigrok.h" +#include "libsigrok4DSL/libsigrok-internal.h" + +/* ------------------------------------------------------------------------- + * Constants + * ------------------------------------------------------------------------- */ + +#define MAX_CH 16 + +/* ------------------------------------------------------------------------- + * Global capture state + * ------------------------------------------------------------------------- */ + +static FILE *g_capture_file = NULL; +static pthread_mutex_t g_state_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t g_state_cond = PTHREAD_COND_INITIALIZER; +static volatile int g_capture_done = 0; +static volatile int g_capture_error = 0; +static uint64_t g_sample_bytes = 0; +static int g_unitsize = 2; /* bytes per sample, set from channel count */ +static int g_dev_mode = 0; /* LOGIC=0, DSO=1, ANALOG=2 */ +static uint64_t g_limit_samples = 0; /* requested sample count for DSO stop */ +static int g_hw_nch = 0; /* hw mode channel count (FPGA data packing) */ + +/* Leftover buffer for cross-to-parallel conversion. + * USB transfers and receive_transfer() truncation can deliver data + * that is not a whole multiple of nch*8 bytes (one cross-format group). + * We buffer partial groups here and prepend them to the next callback. */ +static uint8_t g_cross_leftover[MAX_CH * 8]; /* max group = 16 * 8 = 128 */ +static size_t g_cross_leftover_len = 0; + +/* ------------------------------------------------------------------------- + * Channel / trigger config (filled by argument parsing) + * ------------------------------------------------------------------------- */ + +static int g_enabled_chs[MAX_CH]; /* physical channel indices */ +static int g_n_enabled_chs = 0; /* how many are in the list */ +static char g_ch_names[MAX_CH][64]; /* parallel name array */ + +static int g_trig_ch = -1; /* -1 = no trigger */ +static char g_trig_type[16] = "none"; /* rising/falling/high/low */ +static int g_trig_pos = 50; /* pre-trigger % */ +static double g_vth = -1.0; /* -1 = don't change default */ + +/* DSO / analog oscilloscope config */ +static uint64_t g_vdiv[2] = { 0, 0 }; /* mV per div, 0 = don't change */ +static int g_coupling[2] = { -1, -1 }; /* 0=DC, 1=AC, -1=default */ +static uint64_t g_probe_factor[2] = { 0, 0 }; /* 1,2,10,20; 0 = don't change */ + +/* ------------------------------------------------------------------------- + * Library callbacks + * ------------------------------------------------------------------------- */ + +static void event_callback(int event) +{ + switch (event) { + case DS_EV_COLLECT_TASK_START: + break; + case DS_EV_COLLECT_TASK_END: + case DS_EV_COLLECT_TASK_END_BY_DETACHED: + pthread_mutex_lock(&g_state_mutex); + g_capture_done = 1; + pthread_cond_signal(&g_state_cond); + pthread_mutex_unlock(&g_state_mutex); + break; + case DS_EV_COLLECT_TASK_END_BY_ERROR: + pthread_mutex_lock(&g_state_mutex); + g_capture_done = 1; + g_capture_error = 1; + pthread_cond_signal(&g_state_cond); + pthread_mutex_unlock(&g_state_mutex); + break; + default: + break; + } +} + +/* + * Convert one complete cross-format group to parallel format and write it. + * + * One group = nch * 8 input bytes -> 64 parallel samples of unitsize bytes. + * The DSLogic FPGA sends LA_CROSS_DATA: data interleaved at 8-byte (64-bit) + * boundaries per enabled channel. With N enabled channels, each group is: + * [ch0 uint64][ch1 uint64]...[chN-1 uint64] + * Each uint64 holds 64 consecutive sample bits for that one channel. + * + * Parallel format (what sigrok / our Python layer expects): + * Each sample is unitsize bytes, with bit K = channel K value. + */ +static void convert_one_group(const uint8_t *gp, int nch, int unitsize, + FILE *fp, uint64_t *written) +{ + uint8_t out[64 * 2]; /* max unitsize=2, 64 samples per group */ + + memset(out, 0, (size_t)(64 * unitsize)); + + for (int ch = 0; ch < nch; ch++) { + /* 8 bytes (64 bits) for this channel in this group */ + const uint8_t *ch_bytes = gp + ch * 8; + int bit = ch; /* bit position in the output sample */ + + for (int b = 0; b < 64; b++) { + int byte_idx = b / 8; + int bit_idx = b % 8; + if (ch_bytes[byte_idx] & (1 << bit_idx)) { + out[b * unitsize + bit / 8] + |= (uint8_t)(1 << (bit % 8)); + } + } + } + fwrite(out, (size_t)unitsize, 64, fp); + *written += (uint64_t)(64 * unitsize); +} + +/* + * Convert cross-format logic data to parallel format, handling partial + * groups across callbacks. + * + * USB transfers and receive_transfer() truncation (dsl.c:2403) can + * deliver data that is not a whole multiple of nch*8 bytes. We buffer + * leftover bytes in g_cross_leftover[] and prepend them to the next + * callback's data before processing complete groups. + */ +static void cross_to_parallel(const uint8_t *src, size_t src_len, + int nch, int unitsize, FILE *fp, + uint64_t *written) +{ + size_t grp_in = (size_t)nch * 8; + const uint8_t *p = src; + size_t remain = src_len; + + /* If we have leftover bytes from the previous callback, try to + * complete the partial group by appending from the new data. */ + if (g_cross_leftover_len > 0) { + size_t need = grp_in - g_cross_leftover_len; + if (remain >= need) { + memcpy(g_cross_leftover + g_cross_leftover_len, + p, need); + convert_one_group(g_cross_leftover, nch, unitsize, + fp, written); + p += need; + remain -= need; + g_cross_leftover_len = 0; + } else { + /* Still not enough for a full group -- accumulate */ + memcpy(g_cross_leftover + g_cross_leftover_len, + p, remain); + g_cross_leftover_len += remain; + return; + } + } + + /* Process all complete groups in the current buffer */ + while (remain >= grp_in) { + convert_one_group(p, nch, unitsize, fp, written); + p += grp_in; + remain -= grp_in; + } + + /* Save any remaining bytes for the next callback */ + if (remain > 0) { + memcpy(g_cross_leftover, p, remain); + g_cross_leftover_len = remain; + } +} + +static void datafeed_callback(const struct sr_dev_inst *sdi, + const struct sr_datafeed_packet *packet) +{ + (void)sdi; + if (packet->type == SR_DF_LOGIC && g_capture_file) { + const struct sr_datafeed_logic *logic = + (const struct sr_datafeed_logic *)packet->payload; + if (logic && logic->data && logic->length > 0) { + if (logic->format == LA_CROSS_DATA && + g_hw_nch > 0) { + cross_to_parallel( + (const uint8_t *)logic->data, + (size_t)logic->length, + g_hw_nch, + g_unitsize, + g_capture_file, + &g_sample_bytes); + } else { + fwrite(logic->data, 1, + (size_t)logic->length, + g_capture_file); + g_sample_bytes += logic->length; + } + } + } + /* DSO mode: interleaved 8-bit samples [ch0_s0, ch1_s0, ch0_s1, ...] */ + if (packet->type == SR_DF_DSO && g_capture_file) { + const struct sr_datafeed_dso *dso = + (const struct sr_datafeed_dso *)packet->payload; + unsigned nch = (unsigned)(g_n_enabled_chs > 0 ? g_n_enabled_chs : 1); + if (dso && dso->data && dso->num_samples > 0 && !g_capture_done) { + /* Only write up to g_limit_samples per channel */ + uint64_t nsamp = (uint64_t) dso->num_samples; + uint64_t have = g_sample_bytes / nch; + uint64_t want = g_limit_samples ? g_limit_samples : nsamp; + if (have < want) { + uint64_t remaining = want - have; + uint64_t take = (nsamp < remaining) ? nsamp : remaining; + size_t len = (size_t)take * nch; + fwrite(dso->data, 1, len, g_capture_file); + g_sample_bytes += len; + } + /* Signal done once we have enough */ + have = g_sample_bytes / nch; + if (have >= want) { + pthread_mutex_lock(&g_state_mutex); + g_capture_done = 1; + pthread_cond_signal(&g_state_cond); + pthread_mutex_unlock(&g_state_mutex); + } + } + } + /* ANALOG (DAQ) mode: same interleaved 8-bit format */ + if (packet->type == SR_DF_ANALOG && g_capture_file) { + const struct sr_datafeed_analog *analog = + (const struct sr_datafeed_analog *)packet->payload; + unsigned nch = (unsigned)(g_n_enabled_chs > 0 ? g_n_enabled_chs : 1); + if (analog && analog->data && analog->num_samples > 0 && !g_capture_done) { + uint64_t nsamp = (uint64_t) analog->num_samples; + uint64_t have = g_sample_bytes / nch; + uint64_t want = g_limit_samples ? g_limit_samples : nsamp; + if (have < want) { + uint64_t remaining = want - have; + uint64_t take = (nsamp < remaining) ? nsamp : remaining; + size_t len = (size_t)take * nch; + fwrite(analog->data, 1, len, g_capture_file); + g_sample_bytes += len; + } + have = g_sample_bytes / nch; + if (have >= want) { + pthread_mutex_lock(&g_state_mutex); + g_capture_done = 1; + pthread_cond_signal(&g_state_cond); + pthread_mutex_unlock(&g_state_mutex); + } + } + } +} + +/* ------------------------------------------------------------------------- + * Library init + * ------------------------------------------------------------------------- */ + +static void get_paths(char *fw_dir, size_t fw_sz, char *ud_dir, size_t ud_sz) +{ + const char *home = getenv("HOME"); + if (!home) + home = "/tmp"; + + char exe_path[512]; + ssize_t len = readlink("/proc/self/exe", exe_path, sizeof(exe_path) - 1); + if (len > 0) { + exe_path[len] = '\0'; + char *slash = strrchr(exe_path, '/'); + if (slash) { + *slash = '\0'; + /* Primary: DSView/res/ relative to the binary's directory. + * When built in-tree, the binary is at build.dir/dsview-cli + * so ../DSView/res/ points to the firmware. + * When built in build/cli/, ../../../DSView/res/ is the path, + * but we try the shorter path first. */ + snprintf(fw_dir, fw_sz, "%s/../DSView/res", exe_path); + + /* Check if primary path exists, fall back to source tree layout */ + if (access(fw_dir, R_OK) != 0) { + snprintf(fw_dir, fw_sz, "%s/../../DSView/res", exe_path); + } + } + } else { + /* Fallback: system install path */ + snprintf(fw_dir, fw_sz, "/usr/share/DSView/res"); + } + + /* Final fallback: system install path if nothing found yet */ + if (access(fw_dir, R_OK) != 0) { + snprintf(fw_dir, fw_sz, "/usr/share/DSView/res"); + } + + snprintf(ud_dir, ud_sz, "%s/.config/dsview-cli", home); + g_mkdir_with_parents(ud_dir, 0755); +} + +static int lib_init(void) +{ + char fw_dir[512], ud_dir[512]; + get_paths(fw_dir, sizeof(fw_dir), ud_dir, sizeof(ud_dir)); + + ds_set_firmware_resource_dir(fw_dir); + ds_set_user_data_dir(ud_dir); + ds_set_event_callback(event_callback); + ds_set_datafeed_callback(datafeed_callback); + + int ret = ds_lib_init(); + if (ret != SR_OK) + fprintf(stderr, "{\"error\": \"ds_lib_init failed: %d\"}\n", ret); + return ret; +} + +/* ------------------------------------------------------------------------- + * Device activation with exclusive-access check + * ------------------------------------------------------------------------- */ + +static const char *device_error_hint(int err_code) +{ + switch (err_code) { + case SR_ERR_DEVICE_IS_EXCLUSIVE: + return "device is in use by another application " + "(DSView GUI or another dsview-cli instance)"; + case SR_ERR_FIRMWARE_NOT_EXIST: + return "firmware file not found"; + case SR_ERR_DEVICE_FIRMWARE_VERSION_LOW: + return "device firmware version too low, update via DSView GUI"; + case SR_ERR_DEVICE_USB_IO_ERROR: + return "USB I/O error"; + case SR_ERR_DEVICE_NO_DRIVER: + return "no driver for this device"; + default: + return NULL; + } +} + +static int activate_device(int dev_index, struct ds_device_base_info *list, int count) +{ + if (dev_index < 0 || dev_index >= count) { + fprintf(stderr, "device index %d out of range (0-%d)\n", + dev_index, count - 1); + return -1; + } + + ds_device_handle requested_handle = list[dev_index].handle; + + int ret = ds_active_device_by_index(dev_index); + if (ret != SR_OK) { + int last_err = ds_get_last_error(); + const char *hint = device_error_hint(last_err); + if (hint) + fprintf(stderr, "failed to activate device %d: %s\n", + dev_index, hint); + else + fprintf(stderr, "failed to activate device %d (code %d)\n", + dev_index, last_err); + return -1; + } + + /* The library may silently fall back to the Demo Device when the + * requested USB device is exclusively held by another process. + * Detect this by comparing the activated handle with what we asked for. */ + struct ds_device_full_info activated; + memset(&activated, 0, sizeof(activated)); + ds_get_actived_device_info(&activated); + + if (activated.handle != requested_handle) { + fprintf(stderr, "device %d (\"%s\") is in use by another application " + "(DSView GUI or another dsview-cli instance); " + "library fell back to \"%s\"\n", + dev_index, list[dev_index].name, activated.name); + return -2; + } + + return 0; +} + +/* ------------------------------------------------------------------------- + * Metadata sidecar (.bin.meta.json written alongside the capture file) + * ------------------------------------------------------------------------- */ + +/* Per-channel DSO metadata, filled during capture for sidecar output */ +static uint64_t g_ch_vdiv[MAX_CH]; /* actual vdiv (mV) per ch */ +static uint64_t g_ch_vfactor[MAX_CH]; /* actual probe factor */ +static uint8_t g_ch_coupling[MAX_CH]; /* 0=DC, 1=AC */ +static uint16_t g_ch_hw_offset[MAX_CH]; /* hardware ADC offset */ +static uint8_t g_ch_bits[MAX_CH]; /* bits per sample (8) */ + +static void write_metadata(const char *bin_path, + uint64_t samplerate, uint64_t n_samples, int unitsize) +{ + char meta_path[600]; + snprintf(meta_path, sizeof(meta_path), "%s.meta.json", bin_path); + + FILE *f = fopen(meta_path, "w"); + if (!f) + return; + + int trig_en = (g_trig_ch >= 0 && strcmp(g_trig_type, "none") != 0); + const char *mode_name = (g_dev_mode == DSO) ? "dso" : + (g_dev_mode == ANALOG) ? "analog" : "logic"; + + fprintf(f, "{\n"); + fprintf(f, " \"mode\": \"%s\",\n", mode_name); + fprintf(f, " \"samplerate\": %llu,\n", (unsigned long long)samplerate); + fprintf(f, " \"samples\": %llu,\n", (unsigned long long)n_samples); + fprintf(f, " \"unitsize\": %d,\n", unitsize); + fprintf(f, " \"channel_map\": [\n"); + for (int i = 0; i < g_n_enabled_chs; i++) { + int phys = g_enabled_chs[i]; + const char *ch_type = (g_dev_mode == DSO || g_dev_mode == ANALOG) + ? "dso" : "logic"; + fprintf(f, " {\"seq\": %d, \"phys\": %d, \"name\": \"%s\", " + "\"type\": \"%s\"", + i, phys, g_ch_names[i][0] ? g_ch_names[i] : "", ch_type); + if (g_dev_mode == DSO || g_dev_mode == ANALOG) { + const char *coup = (g_ch_coupling[i] == 1) ? "AC" : "DC"; + fprintf(f, ", \"vdiv_mV\": %llu, \"probe_factor\": %llu, " + "\"coupling\": \"%s\", \"hw_offset\": %u, \"bits\": %u", + (unsigned long long)g_ch_vdiv[i], + (unsigned long long)g_ch_vfactor[i], + coup, + (unsigned)g_ch_hw_offset[i], (unsigned)g_ch_bits[i]); + } + fprintf(f, "}%s\n", (i < g_n_enabled_chs - 1) ? "," : ""); + } + fprintf(f, " ],\n"); + fprintf(f, " \"trigger\": {\n"); + fprintf(f, " \"enabled\": %s", trig_en ? "true" : "false"); + if (trig_en) { + fprintf(f, ",\n \"channel\": %d", g_trig_ch); + fprintf(f, ",\n \"type\": \"%s\"", g_trig_type); + } + fprintf(f, ",\n \"pos_pct\": %d\n", g_trig_pos); + fprintf(f, " }\n"); + fprintf(f, "}\n"); + fclose(f); +} + +/* ------------------------------------------------------------------------- + * Trigger setup + * ------------------------------------------------------------------------- */ + +static void apply_trigger(void) +{ + ds_trigger_reset(); + + if (g_trig_ch < 0 || strcmp(g_trig_type, "none") == 0) { + ds_trigger_set_en(0); + return; + } + + char t0; + if (!strcmp(g_trig_type, "rising")) + t0 = 'R'; + else if (!strcmp(g_trig_type, "falling")) + t0 = 'F'; + else if (!strcmp(g_trig_type, "high")) + t0 = '1'; + else if (!strcmp(g_trig_type, "low")) + t0 = '0'; + else + t0 = 'X'; + + ds_trigger_set_en(1); + ds_trigger_set_stage(1); + ds_trigger_set_pos((uint16_t) g_trig_pos); + /* second condition ('X') = don't care */ + ds_trigger_probe_set((uint16_t) g_trig_ch, (unsigned char)t0, 'X'); +} + +/* ------------------------------------------------------------------------- + * Channel setup helper + * ------------------------------------------------------------------------- */ + +static int setup_channels(int total_ch) +{ + /* Enable ALL channels within the hardware mode range, matching + * the DSView GUI behaviour. The FPGA packs cross-format data + * for every enabled channel. Disabling a subset within the mode + * causes the FPGA packing to differ from what cross_to_parallel() + * expects (nch = g_hw_nch = mode channel count). + * + * Channels beyond the mode range are disabled. + * User-requested channels get their names set. */ + for (int ch = 0; ch < total_ch; ch++) { + gboolean en = (ch < g_hw_nch) ? TRUE : FALSE; + ds_enable_device_channel_index(ch, en); + + /* Set name for user-requested channels */ + int name_idx = -1; + for (int j = 0; j < g_n_enabled_chs; j++) { + if (g_enabled_chs[j] == ch) { + name_idx = j; + break; + } + } + if (name_idx >= 0 && g_ch_names[name_idx][0]) + ds_set_device_channel_name(ch, g_ch_names[name_idx]); + } + return SR_OK; +} + +/* ------------------------------------------------------------------------- + * scan + * ------------------------------------------------------------------------- */ + +static int cmd_scan(void) +{ + if (lib_init() != SR_OK) + return 1; + + ds_reload_device_list(); + g_usleep(800000); + + struct ds_device_base_info *list = NULL; + int count = 0; + ds_get_device_list(&list, &count); + + printf("[\n"); + for (int i = 0; i < count; i++) { + /* Escape quotes in name */ + char safe[200]; + int j = 0; + for (const char *s = list[i].name; *s && j < 197; s++) { + if (*s == '"') + safe[j++] = '\\'; + safe[j++] = *s; + } + safe[j] = '\0'; + printf(" {\"index\": %d, \"handle\": %llu, \"name\": \"%s\"}%s\n", + i, (unsigned long long)list[i].handle, safe, + (i < count - 1) ? "," : ""); + } + printf("]\n"); + + g_free(list); + ds_lib_exit(); + return 0; +} + +/* ------------------------------------------------------------------------- + * info + * ------------------------------------------------------------------------- */ + +static int cmd_info(int dev_index) +{ + if (lib_init() != SR_OK) + return 1; + + ds_reload_device_list(); + g_usleep(800000); + + struct ds_device_base_info *list = NULL; + int count = 0; + ds_get_device_list(&list, &count); + + if (count == 0) { + printf("{\"error\": \"no devices found\"}\n"); + g_free(list); + ds_lib_exit(); + return 1; + } + + int act_ret = activate_device(dev_index, list, count); + if (act_ret != 0) { + if (act_ret == -2) + printf + ("{\"error\": \"device %d is in use by another application\"}\n", + dev_index); + else + printf("{\"error\": \"failed to activate device %d\"}\n", + dev_index); + g_free(list); + ds_lib_exit(); + return 1; + } + + int init_status = 0; + for (int i = 0; i < 100; i++) { + ds_get_actived_device_init_status(&init_status); + if (init_status) + break; + g_usleep(100000); + } + + struct ds_device_full_info info; + memset(&info, 0, sizeof(info)); + ds_get_actived_device_info(&info); + int n_ch = info.di ? (int)g_slist_length(info.di->channels) : 0; + + GVariant *v = NULL; + uint64_t samplerate = 0, limit_samples = 0; + double vth = -1.0; + if (ds_get_actived_device_config(NULL, NULL, SR_CONF_SAMPLERATE, &v) == SR_OK + && v) { + samplerate = g_variant_get_uint64(v); + g_variant_unref(v); + v = NULL; + } + if (ds_get_actived_device_config(NULL, NULL, SR_CONF_LIMIT_SAMPLES, &v) == SR_OK + && v) { + limit_samples = g_variant_get_uint64(v); + g_variant_unref(v); + v = NULL; + } + if (ds_get_actived_device_config(NULL, NULL, SR_CONF_VTH, &v) == SR_OK && v) { + vth = g_variant_get_double(v); + g_variant_unref(v); + v = NULL; + } + + /* Channel modes: each entry describes channels-in-use vs max samplerate */ + GVariant *cm_data = NULL; + char cm_json[2048] = "[]"; + if (ds_get_actived_device_config_list(NULL, SR_CONF_CHANNEL_MODE, &cm_data) == + SR_OK && cm_data) { + struct sr_list_item *modes = + (struct sr_list_item *)(uintptr_t) g_variant_get_uint64(cm_data); + g_variant_unref(cm_data); + int pos = 0; + pos += snprintf(cm_json + pos, sizeof(cm_json) - (size_t)pos, "["); + int first = 1; + for (int i = 0; modes[i].id >= 0; i++) { + /* escape any quotes in description */ + char safe_desc[256]; + int j = 0; + const char *src = modes[i].name ? modes[i].name : ""; + for (; *src && j < 253; src++) { + if (*src == '"') + safe_desc[j++] = '\\'; + safe_desc[j++] = *src; + } + safe_desc[j] = '\0'; + if (!first) + pos += + snprintf(cm_json + pos, sizeof(cm_json) - (size_t)pos, + ", "); + pos += + snprintf(cm_json + pos, sizeof(cm_json) - (size_t)pos, + "{\"id\": %d, \"desc\": \"%s\"}", modes[i].id, + safe_desc); + first = 0; + } + snprintf(cm_json + pos, sizeof(cm_json) - (size_t)pos, "]"); + } + + /* Library / DSView version */ + const char *lib_ver = sr_get_lib_version_string(); + + int dev_mode = ds_get_actived_device_mode(); + const char *mode_name = (dev_mode == DSO) ? "DSO" : + (dev_mode == ANALOG) ? "ANALOG" : "LOGIC"; + + printf("{\n"); + printf(" \"index\": %d,\n", dev_index); + printf(" \"handle\": %llu,\n", (unsigned long long)info.handle); + printf(" \"name\": \"%s\",\n", info.name); + printf(" \"driver\": \"%s\",\n", info.driver_name); + printf(" \"dsview_version\": \"1.3.2\",\n"); + printf(" \"libsigrok4dsl_version\": \"%s\",\n", lib_ver ? lib_ver : "unknown"); + printf(" \"channels\": %d,\n", n_ch); + printf(" \"channel_range\": [0, %d],\n", n_ch - 1); + printf(" \"mode\": %d,\n", dev_mode); + printf(" \"mode_name\": \"%s\",\n", mode_name); + printf(" \"samplerate\": %llu,\n", (unsigned long long)samplerate); + printf(" \"limit_samples\": %llu,\n", (unsigned long long)limit_samples); + + if (dev_mode == DSO || dev_mode == ANALOG) { + /* Per-channel analog info */ + printf(" \"analog_channels\": [\n"); + GSList *channels = ds_get_actived_device_channels(); + int ch_idx = 0; + for (GSList * l = channels; l; l = l->next, ch_idx++) { + struct sr_channel *ch = (struct sr_channel *)l->data; + const char *coup_str = + (ch->coupling == SR_AC_COUPLING) ? "AC" : "DC"; + printf(" {\"index\": %d, \"name\": \"%s\", \"enabled\": %s, " "\"vdiv_mV\": %llu, \"probe_factor\": %llu, " "\"coupling\": \"%s\", \"bits\": %u, \"hw_offset\": %u}%s\n", (int)ch->index, ch->name ? ch->name : "", ch->enabled ? "true" : "false", (unsigned long long)ch->vdiv, (unsigned long long)ch->vfactor, coup_str, (unsigned)ch->bits, (unsigned)ch->offset, /* use software offset; hw_offset only valid after acq */ + (l->next) ? "," : ""); + } + printf(" ],\n"); + printf(" \"vdiv_options\": [10, 20, 50, 100, 200, 500, 1000, 2000],\n"); + printf(" \"coupling_options\": [\"DC\", \"AC\"],\n"); + printf(" \"probe_factor_options\": [1, 2, 10, 20],\n"); + printf(" \"trigger_types\": [\"none\", \"rising\", \"falling\"],\n"); + } else { + printf(" \"vth\": %.2f,\n", vth); + printf + (" \"trigger_types\": [\"none\", \"rising\", \"falling\", \"high\", \"low\"],\n"); + } + + printf(" \"channel_modes\": %s\n", cm_json); + printf("}\n"); + + g_free(list); + ds_lib_exit(); + return 0; +} + +/* ------------------------------------------------------------------------- + * capture + * ------------------------------------------------------------------------- */ + +static int cmd_capture(int dev_index, uint64_t samplerate, uint64_t limit_samples, + const char *outfile) +{ + g_capture_done = g_capture_error = 0; + g_sample_bytes = 0; + g_cross_leftover_len = 0; + g_hw_nch = 0; + memset(g_ch_vdiv, 0, sizeof(g_ch_vdiv)); + memset(g_ch_vfactor, 0, sizeof(g_ch_vfactor)); + memset(g_ch_coupling, 0, sizeof(g_ch_coupling)); + memset(g_ch_hw_offset, 0, sizeof(g_ch_hw_offset)); + memset(g_ch_bits, 0, sizeof(g_ch_bits)); + + if (lib_init() != SR_OK) + return 1; + + ds_reload_device_list(); + g_usleep(800000); + + struct ds_device_base_info *list = NULL; + int count = 0; + ds_get_device_list(&list, &count); + + if (count == 0) { + printf("{\"success\": false, \"error\": \"no devices found\"}\n"); + g_free(list); + ds_lib_exit(); + return 1; + } + + int act_ret = activate_device(dev_index, list, count); + if (act_ret != 0) { + if (act_ret == -2) + printf("{\"success\": false, \"error\": \"device %d is in use by " + "another application\"}\n", dev_index); + else + printf("{\"success\": false, \"error\": \"failed to activate " + "device %d\"}\n", dev_index); + g_free(list); + ds_lib_exit(); + return 1; + } + + /* Device list no longer needed after activation */ + g_free(list); + list = NULL; + + /* Wait for device init */ + int init_status = 0; + for (int i = 0; i < 100; i++) { + ds_get_actived_device_init_status(&init_status); + if (init_status) + break; + g_usleep(100000); + } + if (!init_status) { + printf("{\"success\": false, \"error\": \"device init timed out\"}\n"); + ds_lib_exit(); + return 1; + } + + /* Detect device mode */ + g_dev_mode = ds_get_actived_device_mode(); + int is_dso = (g_dev_mode == DSO || g_dev_mode == ANALOG); + + /* LOGIC mode: select the best hardware channel mode for the number + * of requested channels. The DSLogic FPGA must be told which + * channel mode to use -- simply enabling/disabling individual + * channels is not enough. Without this, the FPGA stays in the + * default 16-channel mode and packs data for 16 channels even if + * only 2 are requested, causing misaligned bit extraction. */ + int ch_mode_num = 0; /* channel count of the selected hw mode */ + if (!is_dso) { + GVariant *cm_data = NULL; + if (ds_get_actived_device_config_list(NULL, + SR_CONF_CHANNEL_MODE, &cm_data) == SR_OK && cm_data) { + struct sr_list_item *modes = + (struct sr_list_item *)(uintptr_t) g_variant_get_uint64(cm_data); + g_variant_unref(cm_data); + + /* The hw mode must cover ALL requested channel indices. + * For contiguous channels 0-6 (count=7), max index is 6, + * so min_hw_channels = 7. For non-contiguous channels + * like 0,3,7 (count=3), max index is 7, so + * min_hw_channels = 8. The mode must have >= this many + * channels because the FPGA addresses channels by index. */ + int max_ch_idx = 0; + for (int i = 0; i < g_n_enabled_chs; i++) { + if (g_enabled_chs[i] > max_ch_idx) + max_ch_idx = g_enabled_chs[i]; + } + int min_hw_chs = max_ch_idx + 1; + if (min_hw_chs < g_n_enabled_chs) + min_hw_chs = g_n_enabled_chs; + + /* Find the mode with the smallest channel count + * that can still accommodate all requested channels. + * Parse channel count from desc: "Use N Channels ..." */ + int best_id = -1; + int best_nch = 9999; + for (int i = 0; modes[i].id >= 0; i++) { + int nch = 0; + const char *p = modes[i].name; + if (p) { + /* Look for "N Channel" pattern */ + const char *u = strstr(p, "se "); + if (u) { + nch = atoi(u + 3); + } else { + /* Try "Channels 0~N" pattern */ + const char *t = strstr(p, "0~"); + if (t) + nch = atoi(t + 2) + 1; + } + } + if (nch >= min_hw_chs && nch < best_nch) { + best_nch = nch; + best_id = modes[i].id; + } + } + if (best_id >= 0) { + GVariant *v = g_variant_new_int16((int16_t)best_id); + if (ds_set_actived_device_config(NULL, NULL, + SR_CONF_CHANNEL_MODE, v) != SR_OK) + fprintf(stderr, + "Warning: set channel mode %d failed\n", + best_id); + else + ch_mode_num = best_nch; + } + } + } + + /* Store the hw channel mode count globally. The FPGA packs cross- + * format data for ALL enabled channels. Like the DSView GUI, we + * keep all channels in the mode enabled (not just user-requested + * ones) so the FPGA packing matches our expectation exactly. */ + g_hw_nch = ch_mode_num > 0 ? ch_mode_num : g_n_enabled_chs; + + /* bytes-per-sample: DSO = 1 byte per channel (8-bit ADC), + * LOGIC = use the hw channel mode's channel count for unitsize */ + if (is_dso) + g_unitsize = 1; /* 8-bit ADC, 1 byte per channel per sample */ + else + g_unitsize = (g_hw_nch <= 8) ? 1 : 2; + + /* Samplerate -- sr_config_set() takes ownership of the GVariant + * (ref_sink + unref), so we must NOT unref after the call. */ + GVariant *v = g_variant_new_uint64(samplerate); + if (ds_set_actived_device_config(NULL, NULL, SR_CONF_SAMPLERATE, v) != SR_OK) + fprintf(stderr, "Warning: set samplerate returned error\n"); + + /* Limit samples */ + v = g_variant_new_uint64(limit_samples); + if (ds_set_actived_device_config(NULL, NULL, SR_CONF_LIMIT_SAMPLES, v) != SR_OK) + fprintf(stderr, "Warning: set limit_samples returned error\n"); + + /* For DSO/ANALOG: datafeed callback uses this to stop after enough data */ + g_limit_samples = is_dso ? limit_samples : 0; + + /* Logic-analyzer-specific: Voltage threshold */ + double vth_actual = -1.0; + if (!is_dso) { + if (g_vth >= 0.0) { + if (g_vth > 5.0) + fprintf(stderr, + "Warning: VTH %.2f exceeds 5.0V maximum\n", + g_vth); + v = g_variant_new_double(g_vth); + if (ds_set_actived_device_config(NULL, NULL, SR_CONF_VTH, v) != + SR_OK) + fprintf(stderr, "Warning: set VTH returned error\n"); + } + v = NULL; + if (ds_get_actived_device_config(NULL, NULL, SR_CONF_VTH, &v) == SR_OK + && v) { + vth_actual = g_variant_get_double(v); + g_variant_unref(v); + v = NULL; + } + } + + /* DSO buffered mode requires both channels enabled; force-enable both + * if user only requested one, adding a default name for the extra. */ + if (is_dso && g_n_enabled_chs < 2) { + int need[2] = { 0, 1 }; + int found[2] = { 0, 0 }; + for (int j = 0; j < g_n_enabled_chs; j++) + if (g_enabled_chs[j] < 2) + found[g_enabled_chs[j]] = 1; + g_n_enabled_chs = 0; + for (int j = 0; j < 2; j++) { + g_enabled_chs[g_n_enabled_chs] = need[j]; + if (!found[need[j]] && !g_ch_names[g_n_enabled_chs][0]) + snprintf(g_ch_names[g_n_enabled_chs], + sizeof(g_ch_names[0]), "CH%d", need[j]); + g_n_enabled_chs++; + } + } + + /* Channel enable / name. + * + * After the channel mode change, dsl_adjust_probes() may have + * resized the probe list (e.g. 16 -> 8 for "Use 8 Channels"). + * Re-query the channel count and enable ALL channels within the + * hw mode range (matching DSView GUI behaviour). The FPGA packs + * cross-format data for every enabled channel -- selectively + * disabling channels within the mode causes the data packing to + * differ from what cross_to_parallel() expects. + * dsl_en_ch_num() will return g_hw_nch after setup_channels(). */ + struct ds_device_full_info finfo; + memset(&finfo, 0, sizeof(finfo)); + ds_get_actived_device_info(&finfo); + int total_ch = finfo.di ? (int)g_slist_length(finfo.di->channels) : MAX_CH; + setup_channels(total_ch); + + /* DSO-specific: apply per-channel analog config (vdiv, coupling, probe) */ + if (is_dso) { + GSList *channels = ds_get_actived_device_channels(); + for (GSList * l = channels; l; l = l->next) { + struct sr_channel *ch = (struct sr_channel *)l->data; + int idx = (int)ch->index; + if (idx < 0 || idx >= 2) + continue; + + if (g_vdiv[idx] > 0) { + v = g_variant_new_uint64(g_vdiv[idx]); + ds_set_actived_device_config(ch, NULL, SR_CONF_PROBE_VDIV, + v); + } + if (g_coupling[idx] >= 0) { + v = g_variant_new_byte((uint8_t) g_coupling[idx]); + ds_set_actived_device_config(ch, NULL, + SR_CONF_PROBE_COUPLING, v); + } + if (g_probe_factor[idx] > 0) { + v = g_variant_new_uint64(g_probe_factor[idx]); + ds_set_actived_device_config(ch, NULL, + SR_CONF_PROBE_FACTOR, v); + } + } + + /* Read back actual per-channel settings for metadata */ + channels = ds_get_actived_device_channels(); + int seq = 0; + for (GSList * l = channels; l; l = l->next) { + struct sr_channel *ch = (struct sr_channel *)l->data; + if (!ch->enabled) + continue; + if (seq < MAX_CH) { + g_ch_vdiv[seq] = ch->vdiv; + g_ch_vfactor[seq] = ch->vfactor; + g_ch_coupling[seq] = ch->coupling; + g_ch_hw_offset[seq] = ch->offset; /* use software offset (128 for 8-bit ADC); hw_offset only valid after acq start */ + g_ch_bits[seq] = ch->bits; + seq++; + } + } + } + + /* Trigger setup */ + if (is_dso) { + /* DSO trigger: use SR_CONF_TRIGGER_SOURCE and SR_CONF_TRIGGER_SLOPE */ + uint8_t trig_src; + if (g_trig_ch < 0 || strcmp(g_trig_type, "none") == 0) + trig_src = DSO_TRIGGER_AUTO; + else if (g_trig_ch == 0) + trig_src = DSO_TRIGGER_CH0; + else + trig_src = DSO_TRIGGER_CH1; + + v = g_variant_new_byte(trig_src); + ds_set_actived_device_config(NULL, NULL, SR_CONF_TRIGGER_SOURCE, v); + + if (g_trig_ch >= 0 && strcmp(g_trig_type, "none") != 0) { + uint8_t slope = DSO_TRIGGER_RISING; + if (!strcmp(g_trig_type, "falling")) + slope = DSO_TRIGGER_FALLING; + v = g_variant_new_byte(slope); + ds_set_actived_device_config(NULL, NULL, SR_CONF_TRIGGER_SLOPE, + v); + } + + /* Trigger position for DSO (percentage) */ + v = g_variant_new_byte((uint8_t) g_trig_pos); + ds_set_actived_device_config(NULL, NULL, SR_CONF_HORIZ_TRIGGERPOS, v); + } else { + /* Logic-analyzer trigger */ + apply_trigger(); + } + + /* Open output file -- 12-byte header: [uint64 samplerate][uint32 n_channels] */ + g_capture_file = fopen(outfile, "wb"); + if (!g_capture_file) { + printf("{\"success\": false, \"error\": \"cannot open: %s\"}\n", outfile); + ds_lib_exit(); + return 1; + } + uint64_t hdr_sr = samplerate; + /* Write the hw mode channel count so the Python layer computes + * the correct unitsize for the parallel-format data we produce. */ + uint32_t hdr_ch = is_dso ? (uint32_t)g_n_enabled_chs + : (uint32_t)g_hw_nch; + fwrite(&hdr_sr, sizeof(hdr_sr), 1, g_capture_file); + fwrite(&hdr_ch, sizeof(hdr_ch), 1, g_capture_file); + + /* Start */ + pthread_mutex_lock(&g_state_mutex); + g_capture_done = 0; + + if (ds_start_collect() != SR_OK) { + pthread_mutex_unlock(&g_state_mutex); + fclose(g_capture_file); + g_capture_file = NULL; + printf("{\"success\": false, \"error\": \"ds_start_collect failed\"}\n"); + ds_lib_exit(); + return 1; + } + + /* Wait for DS_EV_COLLECT_TASK_END (120 s max) */ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 120; + while (!g_capture_done) { + if (pthread_cond_timedwait(&g_state_cond, &g_state_mutex, &ts) == + ETIMEDOUT) { + g_capture_error = 1; + break; + } + } + pthread_mutex_unlock(&g_state_mutex); + + ds_stop_collect(); + fclose(g_capture_file); + g_capture_file = NULL; + + /* For DSO, n_samples = total_bytes / n_enabled_channels (1 byte per ch) */ + uint64_t n_samples; + if (is_dso) + n_samples = (g_n_enabled_chs > 0) ? + (g_sample_bytes / (uint64_t) g_n_enabled_chs) : 0; + else + n_samples = (g_unitsize > 0) ? + (g_sample_bytes / (uint64_t) g_unitsize) : 0; + + /* Write sidecar metadata */ + if (!g_capture_error) + write_metadata(outfile, samplerate, n_samples, g_unitsize); + + /* Build channel_map JSON inline */ + char ch_map[2048]; + int pos = 0; + pos += snprintf(ch_map + pos, sizeof(ch_map) - (size_t)pos, "["); + for (int i = 0; i < g_n_enabled_chs; i++) { + if (is_dso) { + const char *coup = (g_ch_coupling[i] == 1) ? "AC" : "DC"; + pos += snprintf(ch_map + pos, sizeof(ch_map) - (size_t)pos, + "{\"seq\":%d,\"phys\":%d,\"name\":\"%s\"," + "\"type\":\"dso\"," + "\"vdiv_mV\":%llu,\"probe_factor\":%llu," + "\"coupling\":\"%s\",\"hw_offset\":%u,\"bits\":%u}%s", + i, g_enabled_chs[i], + g_ch_names[i][0] ? g_ch_names[i] : "", + (unsigned long long)g_ch_vdiv[i], + (unsigned long long)g_ch_vfactor[i], + coup, + (unsigned)g_ch_hw_offset[i], + (unsigned)g_ch_bits[i], + (i < g_n_enabled_chs - 1) ? "," : ""); + } else { + pos += snprintf(ch_map + pos, sizeof(ch_map) - (size_t)pos, + "{\"seq\":%d,\"phys\":%d,\"name\":\"%s\"}%s", + i, g_enabled_chs[i], + g_ch_names[i][0] ? g_ch_names[i] : "", + (i < g_n_enabled_chs - 1) ? "," : ""); + } + } + pos += snprintf(ch_map + pos, sizeof(ch_map) - (size_t)pos, "]"); + + int trig_en = (g_trig_ch >= 0 && strcmp(g_trig_type, "none") != 0); + const char *mode_str = is_dso ? (g_dev_mode == DSO ? "dso" : "analog") : "logic"; + + if (g_capture_error) { + printf("{\"success\": false, \"error\": \"capture error or timeout\"," + " \"samples\": %llu}\n", (unsigned long long)n_samples); + ds_lib_exit(); + return 1; + } + + printf("{\"success\": true," + " \"mode\": \"%s\"," + " \"samples\": %llu," + " \"samplerate\": %llu," + " \"unitsize\": %d,", + mode_str, + (unsigned long long)n_samples, (unsigned long long)samplerate, g_unitsize); + + if (!is_dso) + printf(" \"vth\": %.2f,", vth_actual); + + printf(" \"channel_map\": %s," + " \"trigger\": {\"enabled\": %s, \"channel\": %d," + " \"type\": \"%s\", \"pos_pct\": %d}," + " \"file\": \"%s\"," + " \"meta\": \"%s.meta.json\"}\n", + ch_map, + trig_en ? "true" : "false", g_trig_ch, g_trig_type, g_trig_pos, + outfile, outfile); + + ds_lib_exit(); + return 0; +} + +/* ------------------------------------------------------------------------- + * Argument parsing + * ------------------------------------------------------------------------- */ + +static uint64_t parse_si(const char *s) +{ + char *end; + double val = strtod(s, &end); + if (*end == 'k' || *end == 'K') + val *= 1e3; + else if (*end == 'M' || *end == 'm') + val *= 1e6; + else if (*end == 'G' || *end == 'g') + val *= 1e9; + return (uint64_t) val; +} + +/* Parse "CH:VALUE" into array[CH] = VALUE, e.g. "0:500" -> arr[0]=500 */ +static int parse_ch_uint64(const char *s, uint64_t arr[2]) +{ + int ch; + uint64_t val; + if (sscanf(s, "%d:%llu", &ch, (unsigned long long *)&val) == 2) { + if (ch < 0 || ch > 1) { + fprintf(stderr, "Error: channel index %d out of range (0-1)\n", + ch); + return -1; + } + arr[ch] = val; + return 0; + } + fprintf(stderr, "Error: expected CH:VALUE format, got '%s'\n", s); + return -1; +} + +/* Parse "CH:MODE" for coupling, e.g. "0:DC" -> arr[0]=0, "1:AC" -> arr[1]=1 */ +static int parse_ch_coupling(const char *s, int arr[2]) +{ + char mode[16]; + int ch; + if (sscanf(s, "%d:%15s", &ch, mode) == 2) { + if (ch < 0 || ch > 1) { + fprintf(stderr, "Error: channel index %d out of range (0-1)\n", + ch); + return -1; + } + if (!strcasecmp(mode, "DC")) + arr[ch] = 0; + else if (!strcasecmp(mode, "AC")) + arr[ch] = 1; + else { + fprintf(stderr, "Error: unknown coupling '%s' (use DC or AC)\n", + mode); + return -1; + } + return 0; + } + fprintf(stderr, "Error: expected CH:MODE format, got '%s'\n", s); + return -1; +} + +/* Parse "0,1,4,7" into g_enabled_chs / g_n_enabled_chs */ +static void parse_channels(const char *s) +{ + char buf[256]; + strncpy(buf, s, sizeof(buf) - 1); + char *tok = strtok(buf, ","); + while (tok && g_n_enabled_chs < MAX_CH) { + int ch = atoi(tok); + if (ch >= 0 && ch < MAX_CH) + g_enabled_chs[g_n_enabled_chs++] = ch; + tok = strtok(NULL, ","); + } +} + +/* Parse "SDA,SCL,TX,RX" into g_ch_names (parallel to g_enabled_chs) */ +static void parse_names(const char *s) +{ + char buf[512]; + strncpy(buf, s, sizeof(buf) - 1); + char *tok = strtok(buf, ","); + int i = 0; + while (tok && i < MAX_CH) { + strncpy(g_ch_names[i], tok, sizeof(g_ch_names[i]) - 1); + i++; + tok = strtok(NULL, ","); + } +} + +static void usage(const char *prog) +{ + fprintf(stderr, + "Usage:\n" + " %s scan\n" + " List connected DreamSourceLab devices.\n\n" + " %s info [-d|--dev N]\n" + " Show capabilities (channels, samplerates, trigger types).\n\n" + " %s capture [options] -o|--out FILE\n" + " -d, --dev N Device index (default 0)\n" + " -s, --samplerate RATE e.g. 1M, 10M, 100M (default 1M)\n" + " -n, --samples COUNT e.g. 100k, 1M (default 1M)\n" + " -c, --enable-chs LIST Comma-sep channel indices to enable (default 0-15)\n" + " e.g. -c 0,1,4,7\n" + " -N, --ch-names LIST Comma-sep names matching --enable-chs order\n" + " e.g. -N SDA,SCL,TX,RX\n" + " -t, --trig-ch N Channel to trigger on (default -1 = free-run)\n" + " -T, --trig-type TYPE rising|falling|high|low|none (default none)\n" + " -p, --trig-pos PCT Pre-trigger %% 0-100 (default 50)\n" + " -V, --vth VOLTS Voltage threshold 0.0-5.0 (logic analyzers only)\n" + " -o, --out FILE Output binary file path\n" + "\n DSO/oscilloscope options (DSCope devices):\n" + " --vdiv CH:VAL Voltage per division in mV. CH is 0 or 1.\n" + " e.g. --vdiv 0:500 --vdiv 1:1000\n" + " Valid: 10,20,50,100,200,500,1000,2000\n" + " --coupling CH:MODE DC or AC coupling. CH is 0 or 1.\n" + " e.g. --coupling 0:DC --coupling 1:AC\n" + " --probe CH:FACTOR Probe attenuation factor. CH is 0 or 1.\n" + " e.g. --probe 0:1 --probe 1:10\n" + " Valid: 1,2,10,20\n" + "\n -h, --help Show this help message\n", + prog, prog, prog); +} + +/* ------------------------------------------------------------------------- + * main + * ------------------------------------------------------------------------- */ + +int main(int argc, char **argv) +{ + if (argc < 2) { + usage(argv[0]); + return 1; + } + const char *cmd = argv[1]; + + /* Defaults */ + int dev_index = 0; + uint64_t samplerate = 1000000ULL; + uint64_t limit_samples = 1000000ULL; + char outfile[512] = "/tmp/dsview_capture.bin"; + + /* Default: all 16 channels */ + for (int i = 0; i < MAX_CH; i++) { + g_enabled_chs[i] = i; + g_ch_names[i][0] = '\0'; + } + g_n_enabled_chs = MAX_CH; + + /* Long-only options use IDs >= 256 to avoid clashing with short opts */ + enum { OPT_VDIV = 256, OPT_COUPLING, OPT_PROBE }; + + static const struct option long_options[] = { + { "dev", required_argument, NULL, 'd' }, + { "samplerate", required_argument, NULL, 's' }, + { "samples", required_argument, NULL, 'n' }, + { "enable-chs", required_argument, NULL, 'c' }, + { "ch-names", required_argument, NULL, 'N' }, + { "trig-ch", required_argument, NULL, 't' }, + { "trig-type", required_argument, NULL, 'T' }, + { "trig-pos", required_argument, NULL, 'p' }, + { "out", required_argument, NULL, 'o' }, + { "vth", required_argument, NULL, 'V' }, + { "vdiv", required_argument, NULL, OPT_VDIV }, + { "coupling", required_argument, NULL, OPT_COUPLING }, + { "probe", required_argument, NULL, OPT_PROBE }, + { "help", no_argument, NULL, 'h' }, + { NULL, 0, NULL, 0 } + }; + + optind = 2; /* skip argv[0] (program) and argv[1] (subcommand) */ + int opt; + while ((opt = getopt_long(argc, argv, "d:s:n:c:N:t:T:p:o:V:h", + long_options, NULL)) != -1) { + switch (opt) { + case 'd': + dev_index = atoi(optarg); + break; + case 's': + samplerate = parse_si(optarg); + break; + case 'n': + limit_samples = parse_si(optarg); + break; + case 'c': + g_n_enabled_chs = 0; + memset(g_ch_names, 0, sizeof(g_ch_names)); + parse_channels(optarg); + break; + case 'N': + parse_names(optarg); + break; + case 't': + g_trig_ch = atoi(optarg); + break; + case 'T': + strncpy(g_trig_type, optarg, sizeof(g_trig_type) - 1); + break; + case 'p': + g_trig_pos = atoi(optarg); + break; + case 'o': + strncpy(outfile, optarg, sizeof(outfile) - 1); + break; + case 'V': + g_vth = atof(optarg); + if (g_vth < 0.0) { + fprintf(stderr, "Error: --vth must be >= 0 (got %.2f)\n", + g_vth); + return 1; + } + break; + case OPT_VDIV: + if (parse_ch_uint64(optarg, g_vdiv) != 0) + return 1; + break; + case OPT_COUPLING: + if (parse_ch_coupling(optarg, g_coupling) != 0) + return 1; + break; + case OPT_PROBE: + if (parse_ch_uint64(optarg, g_probe_factor) != 0) + return 1; + break; + case 'h': + usage(argv[0]); + return 0; + default: + usage(argv[0]); + return 1; + } + } + + if (!strcmp(cmd, "scan")) { + return cmd_scan(); + } else if (!strcmp(cmd, "info")) { + return cmd_info(dev_index); + } else if (!strcmp(cmd, "capture")) { + return cmd_capture(dev_index, samplerate, limit_samples, outfile); + } else { + fprintf(stderr, "Unknown command: %s\n\n", cmd); + usage(argv[0]); + return 1; + } +} diff --git a/cli/dsview_mcp.py b/cli/dsview_mcp.py new file mode 100644 index 00000000..7991227f --- /dev/null +++ b/cli/dsview_mcp.py @@ -0,0 +1,1865 @@ +#!/usr/bin/env python3 +"""DSView MCP Server + +Exposes DreamSourceLab instruments (DSLogic, DScope) as MCP tools so that +Claude can scan devices, capture logic-analyzer data, and inspect signals. + +Build dsview-cli first (from the DSView root): + mkdir -p build && cd build && cmake .. && make -j$(nproc) +""" + +import json +import math +import os +import struct +import subprocess +import tempfile +import zipfile +from pathlib import Path + +try: + import toon + _HAS_TOON = True +except ImportError: + _HAS_TOON = False + +from mcp.server.fastmcp import FastMCP + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- + +SCRIPT_DIR = Path(__file__).parent.resolve() +DSVIEW_ROOT = SCRIPT_DIR.parent + +# Search for the dsview-cli binary in likely build output locations +_CLI_CANDIDATES = [ + # in-tree build (EXECUTABLE_OUTPUT_PATH) + DSVIEW_ROOT / "build.dir" / "dsview-cli", + DSVIEW_ROOT / "build" / "cli" / "dsview-cli", # out-of-source: build/cli/ + DSVIEW_ROOT / "build" / "dsview-cli", # out-of-source: build/ + # system install (Debian package) + Path("/usr/bin/dsview-cli"), + Path("/usr/local/bin/dsview-cli"), # local install +] + +CLI_BINARY = None +for _candidate in _CLI_CANDIDATES: + if _candidate.exists(): + CLI_BINARY = _candidate + break +if CLI_BINARY is None: + CLI_BINARY = _CLI_CANDIDATES[0] # default for error messages + +mcp = FastMCP( + "dsview", + instructions=( + "Control DreamSourceLab USB logic analyzers (DSLogic) and oscilloscopes (DScope). " + "Multiple devices can be connected simultaneously; use device_index to select.\n\n" + "== When to Use Logic Analyzer vs Oscilloscope ==\n\n" + "Use LOGIC ANALYZER (capture_logic) when:\n" + "- Decoding digital protocols: I2C, SPI, UART, JTAG, 1-Wire, SDIO\n" + "- Checking bus transactions: addresses, data bytes, ACK/NACK, framing\n" + "- Monitoring many signals simultaneously (up to 16 channels)\n" + "- Verifying digital state machines, GPIO toggling, interrupt timing\n" + "- You need protocol-level decoding via sigrok (export .sr format)\n" + "- The signals are digital (0/1) and you only care about logic states\n\n" + "Use OSCILLOSCOPE / DSO (capture_dso) when:\n" + "- Measuring analog signal characteristics: voltage levels, amplitude, Vpp\n" + "- Measuring timing at specific voltage thresholds (rise/fall time,\n" + " setup/hold time, propagation delay)\n" + "- Checking signal integrity: overshoot, undershoot, ringing, noise\n" + "- Verifying I2C/SPI timing compliance against spec (requires analog\n" + " waveforms to measure at VIL/VIH thresholds, not just logic 0/1)\n" + "- Measuring AC ripple on power rails (use AC coupling)\n" + "- Characterizing RC rise times on open-drain buses (I2C, 1-Wire)\n" + "- You need actual voltage measurements, not just protocol content\n\n" + "KEY DISTINCTION: Protocol decoders (sigrok) work on digital signals\n" + "from the logic analyzer. Analog timing compliance (rise/fall time at\n" + "VIL/VIH, setup/hold time, signal integrity) requires the oscilloscope.\n" + "For a complete bus analysis, use BOTH: logic analyzer for protocol\n" + "content, oscilloscope for electrical timing compliance.\n\n" + "== Logic Analyzer Workflow (DSLogic) ==\n" + "1. scan_devices -- find all plugged-in devices (index, name, handle)\n" + "2. device_info -- learn channel count, samplerate limits, channel modes\n" + " channel_modes shows channels-in-use vs max samplerate\n" + "3. capture_logic -- record digital signals with named channels and trigger\n" + "4. signal_summary -- quick overview of which channels are active\n" + "5. decode_capture -- view per-channel waveform + edge list\n\n" + "== Oscilloscope Workflow (DScope) ==\n" + "1. scan_devices -- find all plugged-in devices\n" + "2. device_info -- check mode_name='DSO', see analog_channels with\n" + " vdiv/coupling/probe_factor, and vdiv_options\n" + "3. capture_dso -- record analog waveforms (configure vdiv, coupling, probe)\n" + "4. signal_summary -- voltage statistics: min/max/Vpp/Vrms/DC offset/frequency\n" + "5. decode_analog -- per-sample voltage waveform and statistics\n\n" + "Output format: TOON (compact) when available, else JSON.\n\n" + "Best practices:\n" + "- ALWAYS call device_info() before capture to check channel_modes and device mode.\n" + " mode_name='LOGIC' -> use capture_logic(), mode_name='DSO' -> use capture_dso().\n" + "- Use meaningful channel_names -- they propagate to all exports and analysis.\n" + "- For protocol decoding (I2C, SPI, UART, etc.), export with out_format='vcd' or\n" + " out_format='sr', then use sigrok's decode_protocol tool.\n" + "- For analog timing analysis (rise/fall time, setup/hold time, signal integrity),\n" + " use capture_dso + decode_analog. Process voltage waveforms with threshold\n" + " crossings at VIL/VIH to extract timing parameters.\n" + "- Set voltage_threshold to match target logic levels (capture_logic only).\n" + "- For DSO: set vdiv to match expected signal amplitude. The full-scale range\n" + " is vdiv * probe_factor * 10 divisions. Use decode_analog() to see voltages.\n" + "- DSO coupling: DC passes the full signal, AC blocks the DC component.\n" + "- DSO probe_factor: set to match your probe (1x, 10x, etc.).\n" + "- Use fewer channels for higher samplerates (see channel_modes).\n" + "- signal_summary() works for both logic and DSO captures, providing\n" + " appropriate metrics for each mode.\n\n" + "Samplerate selection guide (Nyquist + practical oversampling):\n" + " The minimum samplerate is 2x the signal frequency (Nyquist theorem), but\n" + " protocol decoders need enough samples per bit to resolve edges reliably.\n" + " Use 4x the bus clock as the MINIMUM; 8-10x is recommended when possible.\n" + " Higher oversampling wastes capture memory, reducing the observable duration.\n" + " ALWAYS pick the LOWEST samplerate that gives reliable decoding.\n\n" + " Protocol Bus clock/baud Min samplerate Recommended Max duration*\n" + " I2C standard 100 kHz 400 kHz 1 MHz ~16 s\n" + " I2C fast 400 kHz 1.6 MHz 4 MHz ~4 s\n" + " I2C fast+ 1 MHz 4 MHz 10 MHz ~1.6 s\n" + " SPI 1 MHz 1 MHz 4 MHz 10 MHz ~1.6 s\n" + " SPI 10 MHz 10 MHz 40 MHz 100 MHz ~160 ms\n" + " SPI 25 MHz 25 MHz 100 MHz 250 MHz ~64 ms\n" + " UART 9600 9.6 kHz 38.4 kHz 100 kHz ~160 s\n" + " UART 115200 115.2 kHz 460 kHz 1 MHz ~16 s\n" + " UART 1 Mbaud 1 MHz 4 MHz 10 MHz ~1.6 s\n" + " 1-Wire 16 kHz 64 kHz 500 kHz ~32 s\n" + " JTAG 10 MHz 10 MHz 40 MHz 100 MHz ~160 ms\n" + " SDIO 25 MHz 25 MHz 100 MHz 250 MHz ~64 ms\n" + " (* max duration assumes 16M sample device memory; check device_info)\n\n" + " IMPORTANT: signal_summary() returns per-channel Nyquist-based hints:\n" + " min_samplerate_hz (4x), rec_samplerate_hz (10x), oversampling_ratio,\n" + " and a top-level samplerate_hint when oversampling is excessive (>20x).\n" + " Use these hints to re-capture at an optimal rate for longer duration.\n\n" + "DSO voltage conversion formula:\n" + " range_mV = vdiv_mV * probe_factor * 10 (10 vertical divisions)\n" + " voltage_mV = (hw_offset - raw_sample) * range_mV / 255\n" + " where raw_sample is 0-255 (8-bit ADC)\n" + ), +) + +# --------------------------------------------------------------------------- +# Output format helper +# --------------------------------------------------------------------------- + + +def _dumps(obj, indent=None): + """Serialize obj to TOON string if toon is available, else JSON.""" + if _HAS_TOON: + return toon.encode(obj) + return json.dumps(obj, indent=indent) + + +def _loads(s): + """Deserialize from TOON or JSON string.""" + if _HAS_TOON: + try: + return toon.decode(s) + except Exception: + pass + return json.loads(s) + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _run_cli(*args, timeout=30): + """Run dsview-cli, return (stdout, stderr, returncode).""" + if not CLI_BINARY.exists(): + msg = (f"dsview-cli not found at {CLI_BINARY}. " + f"Build from DSView root: mkdir -p build && cd build && cmake .. && make") + return "", msg, -1 + cmd = [str(CLI_BINARY)] + [str(a) for a in args] + try: + r = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout) + return r.stdout, r.stderr, r.returncode + except subprocess.TimeoutExpired: + return "", f"dsview-cli timed out after {timeout}s", -1 + except Exception as exc: + return "", str(exc), -1 + + +def _parse_json(stdout, stderr, rc, context=""): + text = stdout.strip() + if not text: + return {"error": stderr.strip() or f"{context}: no output (rc={rc})"} + try: + return json.loads(text) + except json.JSONDecodeError: + return {"error": f"invalid JSON: {text[:200]}"} + + +def _expand_channels(spec: str, max_ch: int = 16) -> list[int]: + """ + Convert a channel spec string to a sorted list of channel indices. + "all" -> [0..max_ch-1] + "0-7" -> [0,1,2,3,4,5,6,7] + "0,2,4,6" -> [0,2,4,6] + "8" -> [0..7] (treated as a count) + """ + spec = spec.strip().lower() + if spec == "all": + return list(range(max_ch)) + if "-" in spec and "," not in spec: + lo, hi = spec.split("-", 1) + return list(range(int(lo), int(hi) + 1)) + if "," in spec: + return sorted(int(x) for x in spec.split(",") if x.strip().isdigit()) + # plain integer -> treat as count + n = int(spec) + return list(range(min(n, max_ch))) + + +def _parse_si(s: str) -> float: + """Parse an SI-suffixed string to a numeric value. + + Supports both rate-style ('1M' = 1e6) and duration-style + ('500ms' = 0.5, '100us' = 1e-4) suffixes. + + Examples: + '1M' -> 1_000_000.0 + '10k' -> 10_000.0 + '500M' -> 500_000_000.0 + '1G' -> 1_000_000_000.0 + '200ms' -> 0.2 + '100us' -> 0.0001 + '50ns' -> 5e-8 + '1s' -> 1.0 + '0.5' -> 0.5 + """ + s = s.strip() + # Duration suffixes (must check multi-char before single-char) + _DURATION_SUFFIXES = [ + ("ms", 1e-3), + ("us", 1e-6), + ("ns", 1e-9), + ("ps", 1e-12), + ("s", 1.0), + ] + s_lower = s.lower() + for suffix, mult in _DURATION_SUFFIXES: + if s_lower.endswith(suffix): + return float(s[:len(s) - len(suffix)]) * mult + + # Rate-style SI suffixes (single char) + _SI_SUFFIXES = { + "k": 1e3, "K": 1e3, + "m": 1e6, "M": 1e6, + "g": 1e9, "G": 1e9, + } + if s and s[-1] in _SI_SUFFIXES: + return float(s[:-1]) * _SI_SUFFIXES[s[-1]] + + return float(s) + + +def _load_meta(bin_path: str) -> dict: + """Load the .meta.json sidecar if it exists.""" + meta_path = bin_path + ".meta.json" + try: + with open(meta_path) as f: + return json.load(f) + except Exception: + return {} + + +# --------------------------------------------------------------------------- +# Export format helpers +# --------------------------------------------------------------------------- + +def _fmt_samplerate(hz: int) -> str: + """Format samplerate as human-readable string for SR metadata.""" + for unit, divisor in (("GHz", 1_000_000_000), ("MHz", 1_000_000), + ("kHz", 1_000)): + if hz >= divisor and hz % divisor == 0: + return f"{hz // divisor} {unit}" + return f"{hz} Hz" + + +def _fmt_rate(hz: float) -> str: + """Format a frequency (Hz) as a compact human-readable string.""" + abs_hz = abs(hz) + if abs_hz >= 1_000_000_000: + return "%.3g GHz" % (hz / 1e9) + if abs_hz >= 1_000_000: + return "%.3g MHz" % (hz / 1e6) + if abs_hz >= 1_000: + return "%.3g kHz" % (hz / 1e3) + return "%.3g Hz" % hz + + +def _write_sigrok_binary(bin_path: str, out_path: str, meta: dict) -> None: + """Write sigrok binary (our .bin minus 12-byte header).""" + with open(bin_path, "rb") as f: + f.seek(12) + data = f.read() + with open(out_path, "wb") as f: + f.write(data) + + +def _write_sr(bin_path: str, out_path: str, meta: dict) -> None: + """Write sigrok session ZIP (.sr) from a .bin capture. + + For logic captures: standard sigrok logic format with probe mapping. + For DSO/analog captures: sigrok analog format with per-channel data files. + """ + mode = meta.get("mode", "logic") + if mode in ("dso", "analog"): + _write_sr_dso(bin_path, out_path, meta) + else: + _write_sr_logic(bin_path, out_path, meta) + + +def _write_sr_logic(bin_path: str, out_path: str, meta: dict) -> None: + """Write sigrok session ZIP for logic captures.""" + ch_map = meta.get("channel_map", []) + samplerate = meta.get("samplerate", 1_000_000) + unitsize = meta.get("unitsize", 1) + n_probes = len(ch_map) + + lines = [ + "[global]", + "sigrok version=0.5.2", + "", + "[device 1]", + "capturefile=logic-1", + "total probes=%d" % n_probes, + "samplerate=%s" % _fmt_samplerate(samplerate), + "total analog=0", + ] + for i, entry in enumerate(ch_map, start=1): + name = entry.get("name") or ("CH%d" % entry.get("phys", i - 1)) + lines.append("probe%d=%s" % (i, name)) + lines.append("unitsize=%d" % unitsize) + lines.append("") + metadata_bytes = "\n".join(lines).encode() + + with open(bin_path, "rb") as f: + f.seek(12) + logic_data = f.read() + + with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr("version", "2") + zf.writestr("metadata", metadata_bytes) + zf.writestr("logic-1-1", logic_data) + + +def _write_sr_dso(bin_path: str, out_path: str, meta: dict) -> None: + """Write sigrok session ZIP for DSO/analog captures. + + Sigrok analog format: each channel gets its own data file + (analog-1-N-1) with 32-bit float samples in native byte order. + """ + ch_map = meta.get("channel_map", []) + samplerate = meta.get("samplerate", 1_000_000) + n_ch = len(ch_map) + + with open(bin_path, "rb") as f: + f.seek(12) + raw = f.read() + + n_samples = len(raw) // n_ch if n_ch > 0 else 0 + + lines = [ + "[global]", + "sigrok version=0.5.2", + "", + "[device 1]", + "total probes=0", + "samplerate=%s" % _fmt_samplerate(samplerate), + "total analog=%d" % n_ch, + ] + for i, entry in enumerate(ch_map, start=1): + name = entry.get("name") or ("CH%d" % entry.get("phys", i - 1)) + lines.append("analog%d=%s" % (i, name)) + lines.append("") + metadata_bytes = "\n".join(lines).encode() + + with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr("version", "2") + zf.writestr("metadata", metadata_bytes) + + # Write per-channel analog data as 32-bit floats (voltage in V) + for seq, entry in enumerate(ch_map): + vdiv = entry.get("vdiv_mV", 1000) + pfact = entry.get("probe_factor", 1) + hw_off = entry.get("hw_offset", 128) + + floats = [] + for i in range(n_samples): + sample = raw[i * n_ch + seq] + v_mV = _raw_to_voltage_mV(sample, vdiv, pfact, hw_off) + floats.append(v_mV / 1000.0) # sigrok uses volts + + float_data = struct.pack("<%df" % len(floats), *floats) + zf.writestr("analog-1-%d-1" % (seq + 1), float_data) + + +_VCD_IDS = [chr(c) for c in range(33, 127) if c != 36] # skip '$' + + +def _write_vcd(bin_path: str, out_path: str, meta: dict) -> None: + """Write VCD file from a .bin capture. + + For logic captures: standard VCD with 1-bit wire signals. + For DSO captures: VCD is inherently digital, so analog signals are + thresholded at the midpoint (hw_offset) to produce 0/1 waveforms. + Use CSV export for actual voltage values. + """ + import datetime + ch_map = meta.get("channel_map", []) + samplerate = meta.get("samplerate", 1_000_000) + unitsize = meta.get("unitsize", 1) + mode = meta.get("mode", "logic") + is_dso = mode in ("dso", "analog") + + # Compute a VCD timescale so that the inter-sample period is always + # an integer >= 1. Walk from the largest unit down to picoseconds + # and pick the first where period is a whole number >= 1. + _VCD_UNITS = [ + ("s", 1), + ("ms", 1_000), + ("us", 1_000_000), + ("ns", 1_000_000_000), + ("ps", 1_000_000_000_000), + ] + ts_unit = "ps" + period = 1 + if samplerate > 0: + for unit_name, divisor in _VCD_UNITS: + p = divisor / samplerate + if p >= 1.0 and p == int(p): + ts_unit = unit_name + period = int(p) + break + else: + ts_unit = "ps" + period = max(1, round(1_000_000_000_000 / samplerate)) + + with open(bin_path, "rb") as f: + f.seek(12) + raw = f.read() + + if is_dso: + n_ch = len(ch_map) + n_samples = len(raw) // n_ch if n_ch > 0 else 0 + + # Build entries with threshold info + entries = [] + for i, entry in enumerate(ch_map): + name = entry.get("name") or ("CH%d" % entry.get("phys", i)) + ident = _VCD_IDS[i % len(_VCD_IDS)] + hw_off = entry.get("hw_offset", 128) + entries.append((ident, i, name, hw_off)) + + with open(out_path, "w") as f: + now = datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y") + f.write("$date %s $end\n" % now) + f.write("$version dsview-cli (DSO thresholded) $end\n") + f.write("$comment analog thresholded at hw_offset $end\n") + f.write("$timescale 1 %s $end\n" % ts_unit) + f.write("$scope module dso $end\n") + for ident, _, name, _ in entries: + f.write("$var wire 1 %s %s $end\n" % (ident, name)) + f.write("$upscope $end\n") + f.write("$enddefinitions $end\n") + + prev = {} + for s in range(n_samples): + changes = [] + for ident, seq, _, hw_off in entries: + sample = raw[s * n_ch + seq] + b = 1 if sample >= hw_off else 0 + if prev.get(ident) != b: + changes.append("%d%s" % (b, ident)) + prev[ident] = b + if changes: + f.write("#%d %s\n" % (s * period, " ".join(changes))) + f.write("#%d\n" % (n_samples * period)) + else: + # Logic mode + n_samples = len(raw) // unitsize + + entries = [] + for i, entry in enumerate(ch_map): + bit = entry.get("phys", entry.get("seq", i)) + name = entry.get("name") or ("CH%d" % bit) + ident = _VCD_IDS[i % len(_VCD_IDS)] + entries.append((ident, bit, name)) + + with open(out_path, "w") as f: + now = datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y") + f.write("$date %s $end\n" % now) + f.write("$version dsview-cli $end\n") + f.write("$timescale 1 %s $end\n" % ts_unit) + f.write("$scope module logic $end\n") + for ident, _, name in entries: + f.write("$var wire 1 %s %s $end\n" % (ident, name)) + f.write("$upscope $end\n") + f.write("$enddefinitions $end\n") + + prev = {} + for s in range(n_samples): + off = s * unitsize + val = raw[off] if unitsize == 1 else struct.unpack_from("> bit) & 1 + if prev.get(ident) != b: + changes.append("%d%s" % (b, ident)) + prev[ident] = b + if changes: + f.write("#%d %s\n" % (s * period, " ".join(changes))) + f.write("#%d\n" % (n_samples * period)) + + +def _write_csv(bin_path: str, out_path: str, meta: dict) -> None: + """Write CSV from a .bin capture. + + For logic captures: sigrok-compatible CSV with 0/1 bit values. + For DSO/analog captures: time + voltage columns in mV. + """ + ch_map = meta.get("channel_map", []) + samplerate = meta.get("samplerate", 1_000_000) + unitsize = meta.get("unitsize", 1) + mode = meta.get("mode", "logic") + + with open(bin_path, "rb") as f: + f.seek(12) + raw = f.read() + + if mode in ("dso", "analog"): + _write_csv_dso(raw, out_path, meta, ch_map, samplerate) + else: + _write_csv_logic(raw, out_path, ch_map, samplerate, unitsize) + + +def _write_csv_logic(raw: bytes, out_path: str, ch_map: list, + samplerate: int, unitsize: int) -> None: + """Write sigrok-compatible logic CSV with 0/1 values.""" + names = [] + bits = [] + for i, entry in enumerate(ch_map): + bit = entry.get("phys", entry.get("seq", i)) + name = entry.get("name") or ("CH%d" % bit) + names.append(name) + bits.append(bit) + + n_samples = len(raw) // unitsize + + with open(out_path, "w") as f: + f.write("; Channels (%d/%d): %s\n" + % (len(names), len(names), ", ".join(names))) + f.write("; Samplerate: %d Hz\n" % samplerate) + f.write(",".join(names) + "\n") + for s in range(n_samples): + off = s * unitsize + val = raw[off] if unitsize == 1 else struct.unpack_from("> b) & 1) for b in bits) + "\n") + + +def _write_csv_dso(raw: bytes, out_path: str, meta: dict, ch_map: list, + samplerate: int) -> None: + """Write DSO CSV with time and voltage columns in mV.""" + n_ch = len(ch_map) + if n_ch == 0: + return + n_samples = len(raw) // n_ch + + # Build channel info + ch_info = [] + for seq, entry in enumerate(ch_map): + label = entry.get("name") or ("CH%d" % entry.get("phys", seq)) + ch_info.append({ + "label": label, + "vdiv": entry.get("vdiv_mV", 1000), + "pfact": entry.get("probe_factor", 1), + "hw_off": entry.get("hw_offset", 128), + }) + + names = [ci["label"] + "_mV" for ci in ch_info] + time_step = 1.0 / samplerate if samplerate > 0 else 0.0 + + with open(out_path, "w") as f: + f.write("; DSO capture -- voltages in millivolts\n") + f.write("; Samplerate: %d Hz\n" % samplerate) + for ci in ch_info: + f.write("; %s: vdiv=%d mV, probe_factor=%d, coupling=%s\n" + % (ci["label"], ci["vdiv"], ci["pfact"], "DC")) + f.write("Time_s," + ",".join(names) + "\n") + for s in range(n_samples): + t = s * time_step + vals = [] + for seq, ci in enumerate(ch_info): + sample = raw[s * n_ch + seq] + v = _raw_to_voltage_mV(sample, ci["vdiv"], ci["pfact"], + ci["hw_off"]) + vals.append("%.2f" % v) + f.write("%.9f,%s\n" % (t, ",".join(vals))) + + +_VALID_FORMATS = {"bin", "sigrok-binary", "vcd", "csv", "sr"} + + +# --------------------------------------------------------------------------- +# MCP tools +# --------------------------------------------------------------------------- + +@mcp.tool() +def scan_devices() -> str: + """Scan for connected DreamSourceLab USB devices. + + Returns a list of all detected devices with index, handle, and name. + Multiple devices can be connected simultaneously; use the index value + in other tools (device_info, capture_logic) to address a specific unit. + + Call this first to discover what is plugged in and how many devices exist. + """ + stdout, stderr, rc = _run_cli("scan", timeout=10) + data = _parse_json(stdout, stderr, rc, "scan") + if isinstance(data, list): + result = {"count": len(data), "devices": data} + result["note"] = ( + "Use device_index=N in other tools to address a specific device. " + "Call device_info(N) to learn the instrument's capabilities." + ) + return _dumps(result) + return _dumps(data) + + +@mcp.tool() +def device_info(device_index: int = 0) -> str: + """Get full capabilities of a connected DreamSourceLab instrument. + + Returns: + - name: exact device model (e.g. 'DSLogic U3Pro16' or 'DSCope U3P100') + - mode_name: device mode -- 'LOGIC', 'DSO', or 'ANALOG' + - dsview_version: DSView software version (e.g. '1.3.2') + - libsigrok4dsl_version: library version string + - channels: total physical channel count + - channel_range: valid channel indices [min, max] + - samplerate: current samplerate in Hz + - channel_modes: list of available channel configurations, each showing + how many channels can be used at what maximum samplerate. + - trigger_types: supported trigger conditions + - limit_samples: current sample count limit + + For LOGIC mode: + - vth: current voltage threshold in volts (Pro devices) + + For DSO/ANALOG mode: + - analog_channels: per-channel info with vdiv_mV, probe_factor, coupling, + hw_offset, bits + - vdiv_options: available voltage-per-division settings [10..2000 mV] + - coupling_options: ['DC', 'AC'] + - probe_factor_options: [1, 2, 10, 20] + + IMPORTANT: check mode_name to determine which capture tool to use: + - mode_name='LOGIC' -> use capture_logic() + - mode_name='DSO' or 'ANALOG' -> use capture_dso() + + Args: + device_index: Device index from scan_devices (default 0). + Use scan_devices first if multiple units are connected. + """ + stdout, stderr, rc = _run_cli("info", "-d", device_index, timeout=15) + data = _parse_json(stdout, stderr, rc, "info") + if isinstance(data, dict) and "channel_modes" in data: + # Add human-readable summary of the channels-vs-samplerate tradeoff + modes = data["channel_modes"] + if modes: + data["channel_modes_summary"] = ( + "More active channels = lower max samplerate. " + f"Available configurations: {len(modes)}" + ) + + if not isinstance(data, dict): + return _dumps(data) + + mode_name = data.get("mode_name", "LOGIC") + + # Add mode-specific guidance + if mode_name in ("DSO", "ANALOG"): + data["usage_hint"] = ( + "This is an oscilloscope (mode=%s). Use capture_dso() to " + "record analog waveforms, then decode_analog() to view " + "voltage values. Set vdiv to match expected signal " + "amplitude. Full-scale range = vdiv * probe_factor * 10 " + "divisions." % mode_name + ) + # Add voltage-conversion formula hint + data["voltage_formula"] = ( + "voltage_mV = (hw_offset - raw_sample) * vdiv_mV * " + "probe_factor * 10 / 255" + ) + else: + data["usage_hint"] = ( + "This is a logic analyzer (mode=LOGIC). Use capture_logic() " + "to record digital signals. Set voltage_threshold to match " + "your target logic levels." + ) + + # Compute a capture-budget table so Claude can see the + # samplerate-vs-duration tradeoff for this device. + if data.get("limit_samples"): + limit = data["limit_samples"] + _BUDGET_RATES = [ + 100_000, 500_000, + 1_000_000, 4_000_000, 10_000_000, 25_000_000, + 50_000_000, 100_000_000, 250_000_000, 500_000_000, + 1_000_000_000, + ] + budget = [] + for rate in _BUDGET_RATES: + dur_s = limit / rate + if dur_s < 0.001: + dur_str = "%.1f us" % (dur_s * 1_000_000) + elif dur_s < 1.0: + dur_str = "%.1f ms" % (dur_s * 1_000) + else: + dur_str = "%.1f s" % dur_s + budget.append({ + "samplerate": _fmt_samplerate(rate), + "max_duration": dur_str, + }) + data["capture_budget"] = budget + + capture_tool = "capture_dso" if mode_name in ( + "DSO", "ANALOG") else "capture_logic" + data["capture_budget_note"] = ( + "Max capture duration at each samplerate given the device " + "sample memory (%d samples). Use the lowest samplerate that " + "meets the Nyquist requirement (>= 4x signal frequency) to " + "maximize capture duration. Use 'duration' parameter in " + "%s() to specify capture time directly." % (limit, capture_tool) + ) + + return _dumps(data) + + +@mcp.tool() +def capture_logic( + device_index: int = 0, + samplerate: str = "1M", + num_samples: str = "1M", + duration: str = "", + channels: str = "all", + channel_names: str = "", + trigger_channel: int = -1, + trigger_type: str = "none", + trigger_pos: int = 50, + out_file: str = "", + out_format: str = "bin", + voltage_threshold: float = -1.0, + timeout: int = 180, +) -> str: + """Capture logic-analyzer data from a DreamSourceLab device. + + IMPORTANT: check device_info first to see which channel_mode is active + and what samplerate is achievable for your channel count. Fewer channels + allow higher samplerates (e.g. 6 channels -> 500 MHz on DSLogic U3Pro16). + + Args: + device_index: Device index from scan_devices (default 0). + Use different indices for multiple connected devices. + samplerate: Sample rate with SI suffix: 100k, 1M, 10M, 100M, 500M. + num_samples: Samples to capture: 10k, 100k, 1M, 10M. + Ignored when duration is provided. + duration: Capture duration with time suffix: '100ms', '1s', '500us'. + When provided, num_samples is computed as samplerate * duration. + Supported suffixes: s, ms, us, ns. + Examples: '1s' = 1 second, '500ms' = 500 milliseconds, + '100us' = 100 microseconds. + Leave empty to use num_samples instead. + channels: Which physical channels to record. + Formats: "all" | "0-7" | "0,2,4,6" | "8" (count). + channel_names: Comma-separated signal names in the SAME ORDER as + the enabled channels. e.g. "SDA,SCL" for channels="0,1" + or "D0,D1,D2,D3,CLK,CS,MOSI,MISO" for channels="0-7". + Leave empty to use default CH0..CHn labels. + trigger_channel: Physical channel index to trigger on (-1 = free-run). + Must be one of the enabled channels. + trigger_type: Trigger condition: none | rising | falling | high | low. + trigger_pos: Percentage of samples BEFORE the trigger point (0-100). + 50 = trigger at centre of capture window. + out_file: Full path for the .bin output. Auto-generated if empty. + A .meta.json sidecar is always written alongside it. + out_format: Output format for export. Default 'bin' (native .bin + .meta.json). + 'sigrok-binary' - raw bytes without header (use with sigrok-mcp-server + -I binary:numchannels=N:samplerate=M) + 'vcd' - Value Change Dump (IEEE standard, channel names preserved) + 'csv' - sigrok-compatible CSV (use with -I csv:samplerate=M) + 'sr' - sigrok session ZIP (.sr), native sigrok-mcp-server format + Native .bin + .meta.json are always written alongside the export. + voltage_threshold: Input voltage threshold in volts (0.0-5.0). + Controls the logic-level switching point for Pro devices + that support variable thresholds (e.g. DSLogic U3Pro16). + Default -1.0 = use device default (typically 1.0V). + Common values: 0.8 (LVCMOS), 1.0, 1.2, 1.5, 1.8, 2.5, 3.3. + timeout: Maximum seconds to wait for dsview-cli to complete (default 180). + Increase for long captures (e.g. waiting for an infrequent trigger). + The process is killed and an error returned if the timeout expires. + + Returns: + Capture status, channel map, trigger settings, and a 32-sample waveform + preview. + """ + # Validate export format + if out_format not in _VALID_FORMATS: + return _dumps({"error": f"invalid out_format {out_format!r}. " + f"Valid: {sorted(_VALID_FORMATS)}"}) + + # Validate voltage threshold + if voltage_threshold >= 0.0 and voltage_threshold > 5.0: + return _dumps({ + "error": f"voltage_threshold {voltage_threshold} exceeds 5.0V maximum", + "hint": "Common values: 0.8 (LVCMOS), 1.0 (default), 1.5 (3.3V), 2.5 (5V TTL)", + }) + + # Convert duration to num_samples if provided + if duration.strip(): + try: + rate_hz = _parse_si(samplerate) + dur_s = _parse_si(duration) + except (ValueError, TypeError) as exc: + return _dumps( + {"error": f"cannot parse samplerate/duration: {exc}"}) + if dur_s <= 0: + return _dumps( + {"error": f"duration must be positive, got {duration!r}"}) + if rate_hz <= 0: + return _dumps( + {"error": f"samplerate must be positive, got {samplerate!r}"}) + computed = round(rate_hz * dur_s) + if computed < 1: + return _dumps({ + "error": f"duration {duration!r} at samplerate {samplerate!r} " + f"yields < 1 sample ({rate_hz * dur_s:.4g})", + "hint": "increase duration or samplerate", + }) + num_samples = str(computed) + + # Resolve output path + if not out_file: + fd, base_path = tempfile.mkstemp(prefix="dsview_") + os.close(fd) + out_file = base_path + ".bin" + elif out_file.endswith(".bin"): + base_path = out_file[:-4] + else: + base_path = out_file + out_file = base_path + ".bin" + + # Expand channel spec to a sorted list of indices + try: + ch_list = _expand_channels(channels) + except (ValueError, TypeError): + return _dumps({"error": f"invalid channels spec: {channels!r}"}) + + if not ch_list: + return _dumps({"error": "no channels selected"}) + + # Validate trigger channel is in the enabled set (if specified) + if trigger_channel >= 0 and trigger_channel not in ch_list: + return _dumps({ + "error": f"trigger_channel {trigger_channel} is not in enabled channels {ch_list}", + "hint": "trigger_channel must be one of the enabled channel indices", + }) + + if trigger_type not in ("none", "rising", "falling", "high", "low"): + return _dumps({"error": f"invalid trigger_type: {trigger_type!r}. " + "Use: none|rising|falling|high|low"}) + + if not 0 <= trigger_pos <= 100: + return _dumps( + {"error": f"trigger_pos must be 0-100, got {trigger_pos}"}) + + # Build CLI arguments + enable_str = ",".join(str(c) for c in ch_list) + + cli_args = [ + "capture", + "-d", device_index, + "-s", samplerate, + "-n", num_samples, + "-c", enable_str, + "-t", trigger_channel, + "-T", trigger_type, + "-p", trigger_pos, + "-o", out_file, + ] + + if channel_names.strip(): + cli_args += ["-N", channel_names.strip()] + + if voltage_threshold >= 0.0: + cli_args += ["-V", str(voltage_threshold)] + + stdout, stderr, rc = _run_cli(*cli_args, timeout=timeout) + result = _parse_json(stdout, stderr, rc, "capture") + + if not isinstance(result, dict) or not result.get("success"): + return _dumps(result) + + # Include computed duration info when duration was specified + if duration.strip() and result.get("success"): + result["requested_duration"] = duration.strip() + result["computed_num_samples"] = num_samples + + # Enrich with file size and 32-sample preview + cap_path = Path(out_file) + if cap_path.exists(): + result["file_size_bytes"] = cap_path.stat().st_size + unitsize = result.get("unitsize", 1) + ch_map = result.get("channel_map", []) + + try: + with open(out_file, "rb") as f: + f.seek(12) # skip 12-byte header + raw = f.read(unitsize * 32) + + n_got = len(raw) // unitsize + preview = [] + for i in range(n_got): + off = i * unitsize + val = raw[off] if unitsize == 1 else struct.unpack_from("> bit) & 1 + preview.append(row) + + result["sample_preview"] = { + "note": f"First {n_got} samples (columns = signal names)", + "samples": preview, + } + except Exception as exc: + result["preview_error"] = str(exc) + + # Export to requested format + if out_format != "bin" and isinstance( + result, dict) and result.get("success"): + meta = _load_meta(out_file) + ext_map = { + "sigrok-binary": ".sigrok.bin", + "vcd": ".vcd", + "csv": ".csv", + "sr": ".sr", + } + export_path = base_path + ext_map[out_format] + try: + if out_format == "sigrok-binary": + _write_sigrok_binary(out_file, export_path, meta) + n_ch = len(meta.get("channel_map", [])) + sr = meta.get("samplerate", 0) + result["export_file"] = export_path + result["sigrok_format_string"] = f"binary:numchannels={n_ch}:samplerate={sr}" + elif out_format == "vcd": + _write_vcd(out_file, export_path, meta) + result["export_file"] = export_path + elif out_format == "csv": + _write_csv(out_file, export_path, meta) + sr = meta.get("samplerate", 0) + result["export_file"] = export_path + result["sigrok_format_string"] = f"csv:samplerate={sr}" + elif out_format == "sr": + _write_sr(out_file, export_path, meta) + result["export_file"] = export_path + result["sigrok_format_string"] = "srzip" + except Exception as exc: + result["export_error"] = str(exc) + + # Add protocol-decode hint when export format supports sigrok decoding + if out_format in ("vcd", "sr", "csv", + "sigrok-binary") and result.get("success"): + ch_map = result.get("channel_map", []) + ch_names = [e.get("name", "") for e in ch_map if e.get("name")] + hint_parts = [ + "Use sigrok decode_protocol tool for protocol analysis.", + ] + # Build example decoder strings based on channel names + name_set = {n.upper() for n in ch_names} + if {"SDA", "SCL"} <= name_set: + hint_parts.append( + "I2C detected: decode_protocol(input_file=, " + "protocol_decoders='i2c:scl=SCL:sda=SDA')" + ) + if {"MOSI", "MISO", "CLK", "CS"} <= name_set or { + "MOSI", "SCK", "CS"} <= name_set: + hint_parts.append( + "SPI detected: decode_protocol(input_file=, " + "protocol_decoders='spi:clk=CLK:mosi=MOSI:miso=MISO:cs=CS')" + ) + if {"TX"} <= name_set or {"RX"} <= name_set: + hint_parts.append( + "UART detected: decode_protocol(input_file=, " + "protocol_decoders='uart:rx=RX:baudrate=115200') " + "-- adjust baudrate as needed" + ) + result["decode_hint"] = " ".join(hint_parts) + + return _dumps(result) + + +# --------------------------------------------------------------------------- +# DSO voltage helpers +# --------------------------------------------------------------------------- + +def _raw_to_voltage_mV(raw: int, vdiv_mV: int, probe_factor: int, + hw_offset: int) -> float: + """Convert raw 8-bit ADC sample to millivolts. + + Formula: voltage_mV = (hw_offset - raw) * vdiv * probe_factor * 10 / 255 + DS_CONF_DSO_VDIVS = 10 (number of vertical divisions). + """ + range_mV = vdiv_mV * probe_factor * 10 + return (hw_offset - raw) * range_mV / 255.0 + + +def _dso_preview(out_file: str, ch_map: list, max_samples: int = 32) -> list: + """Read a few DSO samples and convert to voltage for preview.""" + n_ch = len(ch_map) + if n_ch == 0: + return [] + try: + with open(out_file, "rb") as f: + f.seek(12) # skip header + raw = f.read(n_ch * max_samples) + except Exception: + return [] + + n_got = len(raw) // n_ch + preview = [] + for i in range(n_got): + row = {} + for seq, entry in enumerate(ch_map): + label = entry.get("name") or ("CH%d" % entry.get("phys", seq)) + sample = raw[i * n_ch + seq] + vdiv = entry.get("vdiv_mV", 1000) + pfact = entry.get("probe_factor", 1) + hw_off = entry.get("hw_offset", 128) + row[label] = round( + _raw_to_voltage_mV(sample, vdiv, pfact, hw_off), 2 + ) + preview.append(row) + return preview + + +@mcp.tool() +def capture_dso( + device_index: int = 0, + samplerate: str = "1M", + num_samples: str = "10k", + duration: str = "", + channels: str = "all", + channel_names: str = "", + vdiv: str = "", + coupling: str = "", + probe_factor: str = "", + trigger_channel: int = -1, + trigger_type: str = "none", + trigger_pos: int = 50, + out_file: str = "", + out_format: str = "bin", + timeout: int = 180, +) -> str: + """Capture analog waveform data from a DreamSourceLab oscilloscope. + + Use this for DScope devices (mode_name='DSO' or 'ANALOG' in device_info). + For logic analyzers (mode_name='LOGIC'), use capture_logic() instead. + + IMPORTANT: call device_info() first to confirm mode_name is 'DSO' or + 'ANALOG' and to see the current analog_channels settings. + + Args: + device_index: Device index from scan_devices (default 0). + samplerate: Sample rate with SI suffix: 100k, 1M, 10M, 100M. + num_samples: Samples to capture: 10k, 100k, 1M, 10M. + Ignored when duration is provided. + duration: Capture duration with time suffix: '100ms', '1s', '500us'. + When provided, num_samples is computed automatically. + channels: Which analog channels to record. + "all" = both channels, "0" = CH0 only, "1" = CH1 only, + "0,1" = both channels. + channel_names: Comma-separated signal names in same order as channels. + e.g. "VOUT,GND" for channels="0,1". + vdiv: Voltage per division in mV. Sets the vertical scale. + Formats: "500" (both channels), "0:500,1:1000" (per-channel). + Full-scale range = vdiv * probe_factor * 10 divisions. + Options: 10, 20, 50, 100, 200, 500, 1000, 2000 mV. + coupling: Input coupling mode. + "DC" (both channels), "AC" (both), or "0:DC,1:AC". + DC passes the full signal; AC blocks the DC component. + probe_factor: Probe attenuation factor. + "1" (both channels), "10" (both), or "0:1,1:10". + Must match your physical probe: 1x, 2x, 10x, or 20x. + trigger_channel: Analog channel to trigger on (-1 = free-run/auto). + Must be 0 or 1 (one of the enabled channels). + trigger_type: Trigger slope: none | rising | falling. + DSO trigger is edge-based (slope detection). + trigger_pos: Percentage of capture window before trigger (0-100). + 50 = trigger at centre. 10 = mostly post-trigger data. + out_file: Full path for .bin output. Auto-generated if empty. + out_format: Output format: 'bin' (default), 'csv' (voltage values), + 'sr' (sigrok session), 'vcd', 'sigrok-binary'. + timeout: Maximum seconds to wait for dsview-cli to complete (default 180). + Increase for long captures (e.g. waiting for an infrequent trigger). + The process is killed and an error returned if the timeout expires. + + Returns: + Capture status, channel_map with analog metadata (vdiv, coupling, + probe_factor, hw_offset), file path, and voltage preview of first + 32 samples. + """ + # Validate export format + if out_format not in _VALID_FORMATS: + return _dumps({"error": "invalid out_format %r. Valid: %s" + % (out_format, sorted(_VALID_FORMATS))}) + + # Convert duration to num_samples if provided + if duration.strip(): + try: + rate_hz = _parse_si(samplerate) + dur_s = _parse_si(duration) + except (ValueError, TypeError) as exc: + return _dumps( + {"error": "cannot parse samplerate/duration: %s" % exc}) + if dur_s <= 0: + return _dumps( + {"error": "duration must be positive, got %r" % duration}) + if rate_hz <= 0: + return _dumps( + {"error": "samplerate must be positive, got %r" % samplerate}) + computed = round(rate_hz * dur_s) + if computed < 1: + return _dumps({ + "error": "duration %r at samplerate %r yields < 1 sample" + % (duration, samplerate), + "hint": "increase duration or samplerate", + }) + num_samples = str(computed) + + # Resolve output path + if not out_file: + fd, base_path = tempfile.mkstemp(prefix="dsview_dso_") + os.close(fd) + out_file = base_path + ".bin" + elif out_file.endswith(".bin"): + base_path = out_file[:-4] + else: + base_path = out_file + out_file = base_path + ".bin" + + # Expand channel spec for DSO. + # _expand_channels treats a bare integer as a *count*, but for DSO a + # bare "0" or "1" (or any valid index < max_ch) means a specific + # channel index. Normalise to comma-separated form so the parser + # takes the list-of-indices branch instead. + _dso_max_ch = 2 # DSCope U3P100; adjust if needed + _ch_spec = channels.strip() + if _ch_spec.isdigit() and int(_ch_spec) < _dso_max_ch: + _ch_spec = _ch_spec + "," # "0," -> parsed as list [0] + try: + ch_list = _expand_channels(_ch_spec, max_ch=_dso_max_ch) + except (ValueError, TypeError): + return _dumps({"error": "invalid channels spec: %r" % channels}) + + if not ch_list: + return _dumps({"error": "no channels selected"}) + + for c in ch_list: + if c not in (0, 1): + return _dumps({ + "error": "DSO channel index %d out of range. " + "DSCope has 2 channels: 0 and 1." % c, + }) + + # Validate trigger + if trigger_channel >= 0 and trigger_channel not in ch_list: + return _dumps({ + "error": "trigger_channel %d not in enabled channels %s" + % (trigger_channel, ch_list), + "hint": "trigger_channel must be 0 or 1", + }) + if trigger_type not in ("none", "rising", "falling"): + return _dumps({ + "error": "invalid trigger_type: %r. DSO supports: none|rising|falling" + % trigger_type, + }) + if not 0 <= trigger_pos <= 100: + return _dumps( + {"error": "trigger_pos must be 0-100, got %d" % trigger_pos}) + + # Build CLI arguments + enable_str = ",".join(str(c) for c in ch_list) + + cli_args = [ + "capture", + "-d", device_index, + "-s", samplerate, + "-n", num_samples, + "-c", enable_str, + "-t", trigger_channel, + "-T", trigger_type, + "-p", trigger_pos, + "-o", out_file, + ] + + if channel_names.strip(): + cli_args += ["-N", channel_names.strip()] + + # DSO-specific per-channel options + if vdiv.strip(): + for part in _parse_dso_option(vdiv, ch_list): + cli_args += ["--vdiv", part] + + if coupling.strip(): + for part in _parse_dso_coupling_option(coupling, ch_list): + cli_args += ["--coupling", part] + + if probe_factor.strip(): + for part in _parse_dso_option(probe_factor, ch_list): + cli_args += ["--probe", part] + + stdout, stderr, rc = _run_cli(*cli_args, timeout=timeout) + result = _parse_json(stdout, stderr, rc, "capture_dso") + + if not isinstance(result, dict) or not result.get("success"): + return _dumps(result) + + # Include computed duration info + if duration.strip() and result.get("success"): + result["requested_duration"] = duration.strip() + result["computed_num_samples"] = num_samples + + # Enrich with voltage preview + cap_path = Path(out_file) + if cap_path.exists(): + result["file_size_bytes"] = cap_path.stat().st_size + ch_map = result.get("channel_map", []) + preview = _dso_preview(out_file, ch_map, max_samples=32) + if preview: + result["sample_preview"] = { + "note": "First %d samples (voltage in mV)" % len(preview), + "samples": preview, + } + + # Export to requested format + if out_format != "bin" and isinstance( + result, dict) and result.get("success"): + meta = _load_meta(out_file) + ext_map = { + "sigrok-binary": ".sigrok.bin", + "vcd": ".vcd", + "csv": ".csv", + "sr": ".sr", + } + export_path = base_path + ext_map[out_format] + try: + if out_format == "csv": + _write_csv(out_file, export_path, meta) + result["export_file"] = export_path + result["csv_note"] = "CSV contains voltage values in mV" + elif out_format == "sr": + _write_sr(out_file, export_path, meta) + result["export_file"] = export_path + elif out_format == "vcd": + _write_vcd(out_file, export_path, meta) + result["export_file"] = export_path + elif out_format == "sigrok-binary": + _write_sigrok_binary(out_file, export_path, meta) + result["export_file"] = export_path + except Exception as exc: + result["export_error"] = str(exc) + + # Add DSO-specific hints + if result.get("success"): + ch_map = result.get("channel_map", []) + hints = [] + for entry in ch_map: + vd = entry.get("vdiv_mV", 0) + pf = entry.get("probe_factor", 1) + if vd: + full_range = vd * pf * 10 + label = entry.get("name") or ("CH%d" % entry.get("phys", 0)) + hints.append( + "%s: vdiv=%dmV x%d probe = +/-%.1fV full-scale" + % (label, vd, pf, full_range / 1000.0) + ) + if hints: + result["scale_info"] = hints + result["decode_hint"] = ( + "Use decode_analog(file_path) to view voltage waveform. " + "Use signal_summary(file_path) for Vpp/Vrms/frequency statistics." + ) + + return _dumps(result) + + +def _parse_dso_option(spec: str, ch_list: list) -> list: + """Parse a DSO per-channel option like '500' or '0:500,1:1000'. + + Returns a list of 'CH:VALUE' strings suitable for CLI --vdiv / --probe. + """ + spec = spec.strip() + if ":" in spec: + # Already in CH:VALUE format, possibly comma-separated + return [p.strip() for p in spec.split(",") if p.strip()] + # Plain value -- apply to all enabled channels + return ["%d:%s" % (ch, spec) for ch in ch_list] + + +def _parse_dso_coupling_option(spec: str, ch_list: list) -> list: + """Parse coupling spec: 'DC', 'AC', or '0:DC,1:AC'. + + Returns list of 'CH:MODE' strings for CLI --coupling. + """ + spec = spec.strip() + if ":" in spec: + return [p.strip() for p in spec.split(",") if p.strip()] + # Plain mode -- apply to all enabled channels + return ["%d:%s" % (ch, spec) for ch in ch_list] + + +@mcp.tool() +def decode_analog( + file_path: str, + start_sample: int = 0, + num_samples: int = 256, +) -> str: + """Read and decode analog voltage samples from a DSO capture .bin file. + + Converts raw 8-bit ADC samples to millivolt values using the calibration + data stored in the .meta.json sidecar (vdiv, probe_factor, hw_offset). + + For each channel returns: + - Voltage waveform (mV) for the requested sample range + - Statistics: min, max, Vpp, Vrms, DC offset (mean) + - Approximate frequency (zero-crossing based) + + Args: + file_path: Path to the .bin file from capture_dso. + start_sample: First sample to decode (0 = start of capture). + num_samples: Number of samples to return (max 1024). + + Returns: + Per-channel voltage waveforms and statistics. + """ + if not Path(file_path).exists(): + return _dumps({"error": "file not found: %s" % file_path}) + + num_samples = min(num_samples, 1024) + meta = _load_meta(file_path) + + if not meta: + return _dumps( + {"error": "no .meta.json sidecar found for %s" % file_path}) + + mode = meta.get("mode", "logic") + if mode not in ("dso", "analog"): + return _dumps({ + "error": "file mode is '%s', not DSO/analog. " + "Use decode_capture() for logic captures." % mode, + }) + + samplerate = meta.get("samplerate", 1_000_000) + ch_map = meta.get("channel_map", []) + n_ch = len(ch_map) + + if n_ch == 0: + return _dumps({"error": "no channels in metadata"}) + + header_size = 12 + # DSO data: interleaved 8-bit, 1 byte per channel per sample + byte_offset = header_size + start_sample * n_ch + + try: + with open(file_path, "rb") as f: + f.seek(byte_offset) + raw = f.read(num_samples * n_ch) + except Exception as exc: + return _dumps({"error": str(exc)}) + + n_got = len(raw) // n_ch + if n_got == 0: + return _dumps({"error": "no sample data at offset %d" % start_sample}) + + sample_period_ns = int(1e9 / samplerate) if samplerate > 0 else 0 + + # Build per-channel voltage arrays and statistics + channels_result = {} + for seq, entry in enumerate(ch_map): + label = entry.get("name") or ("CH%d" % entry.get("phys", seq)) + vdiv = entry.get("vdiv_mV", 1000) + pfact = entry.get("probe_factor", 1) + hw_off = entry.get("hw_offset", 128) + + voltages = [] + for i in range(n_got): + sample = raw[i * n_ch + seq] + v = _raw_to_voltage_mV(sample, vdiv, pfact, hw_off) + voltages.append(round(v, 2)) + + # Statistics + v_min = min(voltages) + v_max = max(voltages) + v_pp = v_max - v_min + v_mean = sum(voltages) / len(voltages) + v_rms = math.sqrt(sum(v * v for v in voltages) / len(voltages)) + + # Frequency estimation via zero-crossing (relative to mean) + crossings = 0 + for i in range(1, len(voltages)): + if ((voltages[i - 1] - v_mean) * (voltages[i] - v_mean)) < 0: + crossings += 1 + + freq = None + if crossings >= 2 and samplerate > 0: + freq = round((crossings / 2.0) / (n_got / samplerate), 1) + + ch_result = { + "vdiv_mV": vdiv, + "probe_factor": pfact, + "coupling": entry.get("coupling", "DC"), + "hw_offset": hw_off, + "full_scale_mV": vdiv * pfact * 10, + "stats": { + "min_mV": round(v_min, 2), + "max_mV": round(v_max, 2), + "vpp_mV": round(v_pp, 2), + "vrms_mV": round(v_rms, 2), + "dc_offset_mV": round(v_mean, 2), + }, + "voltages_mV": voltages, + } + if freq is not None: + ch_result["stats"]["approx_freq_hz"] = freq + if samplerate > 0 and freq > 0: + ch_result["stats"]["min_samplerate_hz"] = round(freq * 4) + ch_result["stats"]["rec_samplerate_hz"] = round(freq * 10) + + channels_result[label] = ch_result + + return _dumps({ + "file": file_path, + "mode": mode, + "samplerate": samplerate, + "start_sample": start_sample, + "samples_decoded": n_got, + "sample_period_ns": sample_period_ns, + "trigger": meta.get("trigger", {}), + "channels": channels_result, + }) + + +@mcp.tool() +def decode_capture( + file_path: str, + start_sample: int = 0, + num_samples: int = 256, +) -> str: + """Read and decode samples from a previously captured .bin file. + + Automatically reads channel names and sample rate from the companion + .meta.json sidecar written by capture_logic or capture_dso. + + For logic captures: returns per-signal digital waveform and edge list. + For DSO captures: automatically delegates to decode_analog() for + voltage waveforms. + + Args: + file_path: Path to the .bin file from capture_logic or capture_dso. + start_sample: First sample to decode (0 = from trigger point). + num_samples: Samples to return (max 1024). + + Returns: + Per-signal waveform data and an edge (transition) list. + """ + if not Path(file_path).exists(): + return _dumps({"error": "file not found: %s" % file_path}) + + num_samples = min(num_samples, 1024) + meta = _load_meta(file_path) + + # Auto-redirect DSO captures to decode_analog + if meta.get("mode") in ("dso", "analog"): + return decode_analog(file_path, start_sample, num_samples) + + samplerate = meta.get("samplerate", 1_000_000) + ch_map = meta.get("channel_map", []) + unitsize = meta.get("unitsize", 2) + header_size = 12 + + # If no meta, read header from file + if not meta: + try: + with open(file_path, "rb") as f: + hdr = f.read(header_size) + if len(hdr) == header_size: + samplerate, n_ch_hdr = struct.unpack(" 8 else 1 + except Exception: + pass + + if not ch_map: + return _dumps( + {"error": "cannot determine channel map; no .meta.json sidecar"}) + + try: + with open(file_path, "rb") as f: + f.seek(header_size + start_sample * unitsize) + raw = f.read(num_samples * unitsize) + except Exception as exc: + return _dumps({"error": str(exc)}) + + n_got = len(raw) // unitsize + sample_period_ns = int(1e9 / samplerate) if samplerate > 0 else 0 + + # Build per-channel sample lists + signals: dict[str, list[int]] = {} + labels: dict[str, int] = {} # label -> bit position in word + for entry in ch_map: + bit = entry.get("phys", entry.get("seq", 0)) + label = entry.get("name") or f"CH{bit}" + signals[label] = [] + labels[label] = bit + + timestamps_ns = [] + for i in range(n_got): + off = i * unitsize + val = raw[off] if unitsize == 1 else struct.unpack_from("> bit) & 1) + + # Edge detection + edges: dict[str, list] = {} + for label, samples in signals.items(): + ch_edges = [] + for i in range(1, len(samples)): + if samples[i] != samples[i - 1]: + ch_edges.append({ + "sample": start_sample + i, + "time_ns": timestamps_ns[i], + "edge": "rising" if samples[i] == 1 else "falling", + }) + if ch_edges: + edges[label] = ch_edges[:20] # cap at 20 per channel + + return _dumps({ + "file": file_path, + "samplerate": samplerate, + "start_sample": start_sample, + "samples_decoded": n_got, + "sample_period_ns": sample_period_ns, + "trigger": meta.get("trigger", {}), + "signals": signals, + "edges": edges, + }) + + +@mcp.tool() +def signal_summary(file_path: str) -> str: + """Summarise signal activity across an entire capture file. + + Works for both logic analyzer and oscilloscope (DSO/analog) captures. + Automatically detects the capture mode from the .meta.json sidecar. + + For logic captures, reports per signal: + - Percentage of time high vs low + - Number of transitions (edges) + - Approximate frequency (if periodic) + - Whether the channel is idle (stuck high or low) + - Samplerate optimisation hints (Nyquist-based) + + For DSO/analog captures, reports per channel: + - Min/Max voltage (mV) + - Peak-to-peak voltage (Vpp) + - RMS voltage + - DC offset (mean) + - Approximate frequency (zero-crossing based) + - Whether channel appears idle (constant voltage, Vpp < 5 mV) + - Samplerate optimisation hints (Nyquist-based) + + A top-level ``samplerate_hint`` is included when the capture is + heavily oversampled relative to the fastest observed signal. + + Args: + file_path: Path to a .bin file produced by capture_logic or capture_dso. + """ + if not Path(file_path).exists(): + return _dumps({"error": "file not found: %s" % file_path}) + + meta = _load_meta(file_path) + samplerate = meta.get("samplerate", 1_000_000) + ch_map = meta.get("channel_map", []) + unitsize = meta.get("unitsize", 2) + mode = meta.get("mode", "logic") + header_sz = 12 + max_samp = 1_000_000 + + if not meta: + try: + with open(file_path, "rb") as f: + hdr = f.read(header_sz) + if len(hdr) == header_sz: + samplerate, n_ch_hdr = struct.unpack(" 8 else 1 + except Exception: + pass + + if not ch_map: + return _dumps({"error": "cannot determine channel map"}) + + # Dispatch to DSO-specific analysis + if mode in ("dso", "analog"): + return _signal_summary_dso(file_path, meta, samplerate, ch_map, + header_sz, max_samp) + + # --- Logic mode analysis --- + labels = {} + for entry in ch_map: + bit = entry.get("phys", entry.get("seq", 0)) + label = entry.get("name") or ("CH%d" % bit) + labels[label] = bit + + try: + with open(file_path, "rb") as f: + f.seek(header_sz) + raw = f.read(unitsize * max_samp) + except Exception as exc: + return _dumps({"error": str(exc)}) + + n_samples = len(raw) // unitsize + if n_samples == 0: + return _dumps({"error": "no sample data in file"}) + + high_count = {lbl: 0 for lbl in labels} + transitions = {lbl: 0 for lbl in labels} + prev = {lbl: None for lbl in labels} + + for i in range(n_samples): + off = i * unitsize + val = raw[off] if unitsize == 1 else struct.unpack_from("> bit) & 1 + high_count[label] += b + if prev[label] is not None and b != prev[label]: + transitions[label] += 1 + prev[label] = b + + summary = {} + max_signal_freq = 0.0 + for label, bit in labels.items(): + pct = 100.0 * high_count[label] / n_samples + tr = transitions[label] + freq = None + if tr >= 2 and samplerate > 0: + freq = round((tr / 2.0) / (n_samples / samplerate), 1) + + entry_d: dict = { + "pct_high": round(pct, 1), + "transitions": tr, + } + if high_count[label] == n_samples: + entry_d["state"] = "always HIGH (idle)" + elif high_count[label] == 0: + entry_d["state"] = "always LOW (idle)" + elif freq is not None: + entry_d["approx_freq_hz"] = freq + max_signal_freq = max(max_signal_freq, freq) + if samplerate > 0 and freq > 0: + oversample = samplerate / freq + min_rate = freq * 4 + rec_rate = freq * 10 + entry_d["min_samplerate_hz"] = round(min_rate) + entry_d["rec_samplerate_hz"] = round(rec_rate) + entry_d["oversampling_ratio"] = round(oversample, 1) + if oversample > 20: + entry_d["hint"] = ( + "Oversampled %.0fx. " + "Recommended %s (10x) -- " + "would allow %.0fx longer capture." + % (oversample, _fmt_rate(rec_rate), + samplerate / rec_rate) + ) + + summary[label] = entry_d + + result: dict = { + "file": file_path, + "mode": "logic", + "samples_analysed": n_samples, + "samplerate": samplerate, + "trigger": meta.get("trigger", {}), + "signals": summary, + } + if max_signal_freq > 0 and samplerate > 0: + rec = max_signal_freq * 10 + oversample = samplerate / max_signal_freq + if oversample > 20: + result["samplerate_hint"] = ( + "Fastest signal ~%s. Current samplerate %s (%.0fx oversample). " + "Recommended: %s (10x Nyquist) -- " + "would allow %.0fx longer capture at same memory depth." + % (_fmt_rate(max_signal_freq), _fmt_rate(samplerate), + oversample, _fmt_rate(rec), samplerate / rec) + ) + + return _dumps(result) + + +def _signal_summary_dso(file_path: str, meta: dict, samplerate: int, + ch_map: list, header_sz: int, + max_samp: int) -> str: + """Analog/DSO signal summary -- voltage statistics per channel.""" + n_ch = len(ch_map) + # DSO: interleaved 8-bit samples, 1 byte per channel + read_bytes = n_ch * max_samp + + try: + with open(file_path, "rb") as f: + f.seek(header_sz) + raw = f.read(read_bytes) + except Exception as exc: + return _dumps({"error": str(exc)}) + + n_samples = len(raw) // n_ch + if n_samples == 0: + return _dumps({"error": "no sample data in file"}) + + summary = {} + max_signal_freq = 0.0 + + for seq, ch_entry in enumerate(ch_map): + label = ch_entry.get("name") or ("CH%d" % ch_entry.get("phys", seq)) + vdiv = ch_entry.get("vdiv_mV", 1000) + pfact = ch_entry.get("probe_factor", 1) + hw_off = ch_entry.get("hw_offset", 128) + + # Convert raw samples to voltages and compute stats in one pass + v_sum = 0.0 + v_sq_sum = 0.0 + v_min = float("inf") + v_max = float("-inf") + prev_v = None + crossings = 0 + + # First pass: compute mean for zero-crossing reference + raw_sum = 0 + for i in range(n_samples): + raw_sum += raw[i * n_ch + seq] + raw_mean = raw_sum / n_samples + mean_v = _raw_to_voltage_mV(int(round(raw_mean)), vdiv, pfact, hw_off) + + # Second pass: full statistics + for i in range(n_samples): + sample = raw[i * n_ch + seq] + v = _raw_to_voltage_mV(sample, vdiv, pfact, hw_off) + v_sum += v + v_sq_sum += v * v + if v < v_min: + v_min = v + if v > v_max: + v_max = v + if prev_v is not None: + if (prev_v - mean_v) * (v - mean_v) < 0: + crossings += 1 + prev_v = v + + v_mean = v_sum / n_samples + v_rms = math.sqrt(v_sq_sum / n_samples) + v_pp = v_max - v_min + + ch_result: dict = { + "vdiv_mV": vdiv, + "probe_factor": pfact, + "coupling": ch_entry.get("coupling", "DC"), + "full_scale_mV": vdiv * pfact * 10, + "min_mV": round(v_min, 2), + "max_mV": round(v_max, 2), + "vpp_mV": round(v_pp, 2), + "vrms_mV": round(v_rms, 2), + "dc_offset_mV": round(v_mean, 2), + } + + # Idle detection: Vpp < 5 mV means effectively constant + if v_pp < 5.0: + ch_result["state"] = "idle (constant ~%.1f mV)" % v_mean + + # Frequency estimation + freq = None + if crossings >= 2 and samplerate > 0: + freq = round((crossings / 2.0) / (n_samples / samplerate), 1) + + if freq is not None: + ch_result["approx_freq_hz"] = freq + max_signal_freq = max(max_signal_freq, freq) + if samplerate > 0 and freq > 0: + oversample = samplerate / freq + min_rate = freq * 4 + rec_rate = freq * 10 + ch_result["min_samplerate_hz"] = round(min_rate) + ch_result["rec_samplerate_hz"] = round(rec_rate) + ch_result["oversampling_ratio"] = round(oversample, 1) + if oversample > 20: + ch_result["hint"] = ( + "Oversampled %.0fx. " + "Recommended %s (10x) -- " + "would allow %.0fx longer capture." + % (oversample, _fmt_rate(rec_rate), + samplerate / rec_rate) + ) + + summary[label] = ch_result + + result: dict = { + "file": file_path, + "mode": meta.get("mode", "dso"), + "samples_analysed": n_samples, + "samplerate": samplerate, + "trigger": meta.get("trigger", {}), + "signals": summary, + } + + if max_signal_freq > 0 and samplerate > 0: + rec = max_signal_freq * 10 + oversample = samplerate / max_signal_freq + if oversample > 20: + result["samplerate_hint"] = ( + "Fastest signal ~%s. Current samplerate %s (%.0fx oversample). " + "Recommended: %s (10x Nyquist) -- " + "would allow %.0fx longer capture at same memory depth." + % (_fmt_rate(max_signal_freq), _fmt_rate(samplerate), + oversample, _fmt_rate(rec), samplerate / rec) + ) + + return _dumps(result) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + mcp.run() diff --git a/cli/requirements.txt b/cli/requirements.txt new file mode 100644 index 00000000..7576254c --- /dev/null +++ b/cli/requirements.txt @@ -0,0 +1,9 @@ +# Python runtime dependencies for dsview_mcp.py +# All stdlib modules (json, os, struct, subprocess, tempfile, pathlib) need no entry. + +# MCP SDK -- FastMCP is bundled inside mcp since v1.0; do NOT pip install fastmcp separately +mcp>=1.8.1 + +# TOON serializer -- compact token-efficient output for LLM context (optional but recommended) +# Falls back to JSON if not installed. +python-toon>=0.1.3