Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/rayforce.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ bool ray_sym_ensure_cap(uint32_t needed);
ray_err_t ray_sym_save(const char* path);
ray_err_t ray_sym_load(const char* path);

/* ===== Environment API =====
*
* Thread-safety: the environment is shared global state. Concurrent calls
* to ray_env_get() and ray_env_set() require external synchronization by
* the caller. */

Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ray_env_set uses a global spinlock, but ray_env_get reads the same global g_env without any locking (see src/lang/env.c). Now that these are part of the public header, please document the thread-safety contract (e.g., “environment API is not safe for concurrent get/set without external synchronization”, or add locking to ray_env_get if concurrent access is intended).

Suggested change
/*
* Thread-safety: the environment API operates on shared global state.
* Calls to ray_env_get() and ray_env_set() are not safe to use concurrently
* with each other without external synchronization provided by the caller.
*/

Copilot uses AI. Check for mistakes.
ray_t* ray_env_get(int64_t sym_id);
ray_err_t ray_env_set(int64_t sym_id, ray_t* val);

/* ===== Table API ===== */

ray_t* ray_table_new(int64_t ncols);
Expand Down
267 changes: 221 additions & 46 deletions src/ops/datalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ void dl_program_free(dl_program_t* prog) {
ray_release(prog->rels[i].table);
if (prog->rels[i].prov_col && !RAY_IS_ERR(prog->rels[i].prov_col))
ray_release(prog->rels[i].prov_col);
if (prog->rels[i].prov_src_offsets && !RAY_IS_ERR(prog->rels[i].prov_src_offsets))
ray_release(prog->rels[i].prov_src_offsets);
if (prog->rels[i].prov_src_data && !RAY_IS_ERR(prog->rels[i].prov_src_data))
ray_release(prog->rels[i].prov_src_data);
}
ray_free(dl_prog_block(prog));
}
Expand Down Expand Up @@ -1327,8 +1331,123 @@ static bool dl_row_in_table(ray_t* tbl, int64_t row, ray_t* ref) {
return false;
}

/* Build source provenance for one IDB relation in CSR format.
*
* For each derived row, extracts head variable bindings from the firing rule
* and scans each positive body atom's relation for rows consistent with those
* bindings. Results are stored as two parallel vectors on the relation:
*
* prov_src_offsets — I64[nrows+1]: offsets[i] = start index in prov_src_data
* for derived row i. offsets[nrows] = total entry count.
* prov_src_data — I64[total]: each entry = (rel_idx << 32) | row_idx,
* packed reference to the contributing source row.
* Row indices are truncated to 32 bits (max ~4 billion rows
* per relation).
*
* Body-only variables (not appearing in the head) are unconstrained during
* source lookup, so the entry set may be a superset of the true proof. */
static void dl_build_source_prov(dl_program_t* prog, dl_rel_t* rel,
int64_t nrows, int64_t* pd) {
ray_t* off_vec = ray_vec_new(RAY_I64, nrows + 1);
if (!off_vec || RAY_IS_ERR(off_vec)) return;
off_vec->len = nrows + 1;
int64_t* off = (int64_t*)ray_data(off_vec);

int64_t buf_cap = (nrows < 16) ? 64 : nrows * 4;
ray_t* buf_block = ray_alloc((size_t)buf_cap * sizeof(int64_t));
if (!buf_block) { ray_release(off_vec); return; }
int64_t* buf = (int64_t*)ray_data(buf_block);
int64_t buf_len = 0;

for (int64_t row = 0; row < nrows; row++) {
off[row] = buf_len;
if (pd[row] < 0) continue;

dl_rule_t* rule = &prog->rules[pd[row]];

int64_t var_vals[DL_MAX_ARITY * DL_MAX_BODY];
bool var_set [DL_MAX_ARITY * DL_MAX_BODY];
memset(var_set, 0, sizeof(var_set));

/* Extract head variable bindings from this derived row */
for (int h = 0; h < rule->head_arity; h++) {
int v = rule->head_vars[h];
if (v == DL_CONST) continue;
ray_t* col = ray_table_get_col_idx(rel->table, h);
if (!col) continue;
var_vals[v] = ((int64_t*)ray_data(col))[row];
var_set[v] = true;
}

/* For each positive body atom, find matching source rows */
for (int b = 0; b < rule->n_body; b++) {
dl_body_t* body = &rule->body[b];
if (body->type != DL_POS) continue;

int bri = dl_find_rel(prog, body->pred);
if (bri < 0) continue;
dl_rel_t* brel = &prog->rels[bri];
int64_t bnrows = ray_table_nrows(brel->table);

for (int64_t br = 0; br < bnrows; br++) {
bool match = true;
for (int c = 0; c < body->arity; c++) {
ray_t* bcol = ray_table_get_col_idx(brel->table, c);
if (!bcol) { match = false; break; }
int64_t cell = ((int64_t*)ray_data(bcol))[br];
int v = body->vars[c];
if (v == DL_CONST) {
if (cell != body->const_vals[c]) { match = false; break; }
} else if (var_set[v]) {
if (cell != var_vals[v]) { match = false; break; }
}
/* body-only variable: unconstrained, always matches */
}
if (!match) continue;

if (buf_len >= buf_cap) {
int64_t new_cap = buf_cap * 2;
ray_t* new_block = ray_alloc((size_t)new_cap * sizeof(int64_t));
if (!new_block) goto oom;
memcpy(ray_data(new_block), buf, (size_t)buf_len * sizeof(int64_t));
ray_free(buf_block);
buf_block = new_block;
buf = (int64_t*)ray_data(new_block);
buf_cap = new_cap;
}
buf[buf_len++] = ((int64_t)bri << 32) | (int64_t)(uint32_t)br;
}
Comment on lines +1416 to +1419
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The packed source reference uses (rel_idx << 32) | (uint32_t)row_idx, which silently truncates row_idx for relations with >2^32-1 rows. If that limit is intentional, it should be enforced (return error / skip provenance) and documented; otherwise consider a different encoding that preserves full 64-bit row indices.

Copilot uses AI. Check for mistakes.
}
}

Comment on lines +1408 to +1422
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In dl_build_source_prov, the goto done path (when growing buf_block fails) exits early but leaves off[row+1..nrows-1] uninitialized. That produces an invalid CSR offsets vector that callers may index into. Consider treating allocation failure as a hard failure: free any partially built buffers and set both prov_src_offsets/prov_src_data to NULL (or at least fill remaining offsets with the final buf_len before returning).

Copilot uses AI. Check for mistakes.
/* Success path: finalize CSR */
off[nrows] = buf_len;
{
ray_t* data_vec = ray_vec_new(RAY_I64, buf_len > 0 ? buf_len : 1);
if (!data_vec || RAY_IS_ERR(data_vec)) goto oom;
data_vec->len = buf_len;
if (buf_len > 0)
memcpy(ray_data(data_vec), buf, (size_t)buf_len * sizeof(int64_t));
ray_free(buf_block);

if (rel->prov_src_offsets) ray_release(rel->prov_src_offsets);
if (rel->prov_src_data) ray_release(rel->prov_src_data);
rel->prov_src_offsets = off_vec;
rel->prov_src_data = data_vec;
return;
}

oom:
/* Allocation failed — discard partial results, leave both fields NULL */
ray_free(buf_block);
ray_release(off_vec);
if (rel->prov_src_offsets) { ray_release(rel->prov_src_offsets); rel->prov_src_offsets = NULL; }
if (rel->prov_src_data) { ray_release(rel->prov_src_data); rel->prov_src_data = NULL; }
}

/* Build provenance for all IDB relations.
* For each rule, compile with final tables and mark matching tuples. */
* For each rule, compile with final tables and mark matching tuples.
* Then build deep source provenance (CSR offsets + packed source refs). */
static void dl_build_provenance(dl_program_t* prog) {
for (int ri = 0; ri < prog->n_rels; ri++) {
dl_rel_t* rel = &prog->rels[ri];
Expand Down Expand Up @@ -1376,6 +1495,8 @@ static void dl_build_provenance(dl_program_t* prog) {

if (rel->prov_col) ray_release(rel->prov_col);
rel->prov_col = prov;

dl_build_source_prov(prog, rel, nrows, pd);
}
}

Expand Down Expand Up @@ -1630,18 +1751,20 @@ ray_t* dl_get_provenance(dl_program_t* prog, const char* pred_name) {
return prog->rels[idx].prov_col;
}

/* Stub: deep provenance source offsets (not yet implemented in C engine).
* Returns NULL; Rust side treats this as "no deep provenance available". */
ray_t* dl_get_provenance_src_offsets(dl_program_t* prog, const char* pred_name) {
(void)prog; (void)pred_name;
return NULL;
if (!prog || !pred_name) return NULL;
if (!(prog->flags & DL_FLAG_PROVENANCE)) return NULL;
int idx = dl_find_rel(prog, pred_name);
if (idx < 0) return NULL;
return prog->rels[idx].prov_src_offsets;
}

/* Stub: deep provenance source data (not yet implemented in C engine).
* Returns NULL; Rust side treats this as "no deep provenance available". */
ray_t* dl_get_provenance_src_data(dl_program_t* prog, const char* pred_name) {
(void)prog; (void)pred_name;
return NULL;
if (!prog || !pred_name) return NULL;
if (!(prog->flags & DL_FLAG_PROVENANCE)) return NULL;
int idx = dl_find_rel(prog, pred_name);
if (idx < 0) return NULL;
return prog->rels[idx].prov_src_data;
}

/* ── Builtins ── */
Expand Down Expand Up @@ -2311,79 +2434,92 @@ static ray_t* dl_parse_body_clause(dl_rule_t* rule, ray_t* clause,
return ray_error("type", "rule/query: unrecognized body clause form");
}

/* (rule (head-name ?v1 ?v2 ...) clause1 clause2 ...)
* Special form: args are NOT evaluated.
* Parses the head and body into a dl_rule_t and stores it globally. */
ray_t* ray_rule_fn(ray_t** args, int64_t n) {
if (n < 2)
return ray_error("arity", "rule expects at least a head and one body clause");

/* First arg: head -- must be a list (head-name ?v1 ?v2 ...) */
ray_t* head = args[0];
/* Parse head + body clauses into out (shared by rule and query inline rules). */
static ray_t* dl_parse_rule_from_head_and_body(dl_rule_t* out, ray_t* head,
ray_t** body_args, int64_t n_body,
dl_var_map_t* vars) {
if (!is_list(head) || ray_len(head) < 1)
return ray_error("type", "rule: head must be (name ?var ...)");

ray_t** hd = (ray_t**)ray_data(head);
int64_t hlen = ray_len(head);

/* Head name */
if (hd[0]->type != -RAY_SYM)
return ray_error("type", "rule: head name must be a symbol");

ray_t* head_name_str = ray_sym_str(hd[0]->i64);
if (!head_name_str)
return ray_error("type", "rule: cannot resolve head name");

/* _ is reserved as wildcard -- cannot be a rule predicate name */
if (ray_str_len(head_name_str) == 1 && ray_str_ptr(head_name_str)[0] == '_')
return ray_error("domain", "rule: _ is reserved as wildcard");

if (g_dl_n_rules >= DL_MAX_RULES)
return ray_error("domain", "rule: too many rules (max 128)");

/* Build variable map */
dl_var_map_t vars;
memset(&vars, 0, sizeof(vars));

int head_arity = (int)(hlen - 1);
dl_rule_t rule;
dl_rule_init(&rule, ray_str_ptr(head_name_str), head_arity);
dl_rule_init(out, ray_str_ptr(head_name_str), head_arity);

/* Head variables */
for (int i = 0; i < head_arity; i++) {
ray_t* harg = hd[i + 1];
if (is_dl_var(harg)) {
int vi = dl_var_get_or_create(&vars, harg->i64);
dl_rule_head_var(&rule, i, vi);
int vi = dl_var_get_or_create(vars, harg->i64);
dl_rule_head_var(out, i, vi);
} else if (harg->type == -RAY_I64) {
dl_rule_head_const(&rule, i, harg->i64);
dl_rule_head_const(out, i, harg->i64);
} else if (harg->type == -RAY_SYM) {
dl_rule_head_const(&rule, i, harg->i64);
dl_rule_head_const(out, i, harg->i64);
} else {
return ray_error("type", "rule: head arguments must be ?variables or constants");
}
}

/* Body clauses */
for (int64_t i = 1; i < n; i++) {
ray_t* err = dl_parse_body_clause(&rule, args[i], &vars);
for (int64_t i = 0; i < n_body; i++) {
ray_t* err = dl_parse_body_clause(out, body_args[i], vars);
if (err) return err;
}

rule.n_vars = vars.n;
out->n_vars = vars->n;
return NULL;
}

/* One inline rule: ((head-name ?a ...) body1 body2 ...) */
static ray_t* dl_parse_inline_rule(dl_rule_t* out, ray_t* rule_list) {
if (!is_list(rule_list) || ray_len(rule_list) < 1)
return ray_error("type", "query: each (rules ...) entry must be a non-empty list");

ray_t** re = (ray_t**)ray_data(rule_list);
int64_t rlen = ray_len(rule_list);
dl_var_map_t vars;
memset(&vars, 0, sizeof(vars));
return dl_parse_rule_from_head_and_body(out, re[0], &re[1], rlen - 1, &vars);
}

/* (rule (head-name ?v1 ?v2 ...) clause1 clause2 ...)
* Special form: args are NOT evaluated.
* Parses the head and body into a dl_rule_t and stores it globally. */
ray_t* ray_rule_fn(ray_t** args, int64_t n) {
if (n < 2)
return ray_error("arity", "rule expects at least a head and one body clause");

if (g_dl_n_rules >= DL_MAX_RULES)
return ray_error("domain", "rule: too many rules (max 128)");

dl_var_map_t vars;
memset(&vars, 0, sizeof(vars));
dl_rule_t rule;
ray_t* perr = dl_parse_rule_from_head_and_body(&rule, args[0], &args[1], n - 1, &vars);
if (perr) return perr;

/* Store globally */
memcpy(&g_dl_rules[g_dl_n_rules++], &rule, sizeof(dl_rule_t));
return ray_bool(true);
}

/* (query db (find ?a ?b ...) (where clause1 clause2 ...))
/* (query db (find ?a ?b ...) (where clause1 clause2 ...) [(rules ...)])
* Optional fourth arg (rules ...) supplies inline rules only (globals ignored).
* Special form: db is evaluated, find/where are NOT evaluated.
* Creates a temporary dl_program_t, registers the EAV table,
* copies global rules, builds a synthetic query rule, and evaluates. */
* copies global rules (unless inline rules), builds a synthetic query rule, and evaluates. */
ray_t* ray_query_fn(ray_t** args, int64_t n) {
if (n < 3)
return ray_error("arity", "query expects: db (find ...) (where ...)");
if (n < 3 || n > 4)
return ray_error("arity", "query expects: db (find ...) (where ...) [(rules ...)]");

/* Evaluate db (first arg) */
ray_t* db = ray_eval(args[0]);
Expand Down Expand Up @@ -2441,6 +2577,27 @@ ray_t* ray_query_fn(ray_t** args, int64_t n) {
return ray_error("type", "query: expected (where ...) as third argument");
}

/* Optional 4th arg must be (rules ...) — inline rules override globals */
ray_t* rules_clause = NULL;
if (n == 4) {
ray_t* fourth = args[3];
if (!is_list(fourth) || ray_len(fourth) < 1) {
ray_release(db);
return ray_error("type", "query: fourth argument must be (rules ...)");
}
ray_t** re4 = (ray_t**)ray_data(fourth);
if (re4[0]->type != -RAY_SYM) {
ray_release(db);
return ray_error("type", "query: fourth argument must be (rules ...)");
}
ray_t* rname = ray_sym_str(re4[0]->i64);
if (!rname || strcmp(ray_str_ptr(rname), "rules") != 0) {
ray_release(db);
return ray_error("type", "query: fourth argument must be (rules ...)");
}
rules_clause = fourth;
}

/* Build variable map for the query */
dl_var_map_t vars;
memset(&vars, 0, sizeof(vars));
Expand Down Expand Up @@ -2495,9 +2652,27 @@ ray_t* ray_query_fn(ray_t** args, int64_t n) {
ray_release(eav_tbl);
}

/* Copy all global rules into the program */
for (int i = 0; i < g_dl_n_rules; i++)
dl_add_rule(prog, &g_dl_rules[i]);
if (rules_clause) {
ray_t** re = (ray_t**)ray_data(rules_clause);
int64_t rlen = ray_len(rules_clause);
for (int64_t i = 1; i < rlen; i++) {
dl_rule_t irule;
ray_t* rerr = dl_parse_inline_rule(&irule, re[i]);
if (rerr) {
dl_program_free(prog);
ray_release(db);
return rerr;
}
if (dl_add_rule(prog, &irule) < 0) {
dl_program_free(prog);
ray_release(db);
return ray_error("domain", "query: too many rules");
}
}
} else {
for (int i = 0; i < g_dl_n_rules; i++)
dl_add_rule(prog, &g_dl_rules[i]);
}

/* Add the synthetic query rule */
dl_add_rule(prog, &qrule);
Expand Down
Loading