Skip to content

JiT Worker in CLI and RPC infrastructure#2109

Open
rafal-hawrylak wants to merge 2 commits intomainfrom
cli_jit_workers
Open

JiT Worker in CLI and RPC infrastructure#2109
rafal-hawrylak wants to merge 2 commits intomainfrom
cli_jit_workers

Conversation

@rafal-hawrylak
Copy link
Collaborator

This change establishes the foundation for executing JiT compilation in an isolated environment. It introduces the worker process management, the RPC bridge for database access during JiT, and the necessary Bazel targets.

@rafal-hawrylak rafal-hawrylak marked this pull request as ready for review March 9, 2026 19:59
@rafal-hawrylak rafal-hawrylak requested a review from a team as a code owner March 9, 2026 19:59
@rafal-hawrylak rafal-hawrylak enabled auto-merge (squash) March 9, 2026 19:59
@rafal-hawrylak rafal-hawrylak self-assigned this Mar 9, 2026
@@ -0,0 +1,101 @@
import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: is this protobuf bridge protocol the same for GCP and CLI implementations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the CLI and GCP implementations follow the dataform.DbAdapter service defined in jit.proto.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below on the issue with "Execute" compatibility

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL on the approach with executeRaw

}

constructor() {
super("../../vm/jit_loader");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is possible to avoid such hard-coded links. I can imagine that if we create jit directory for jit related actions it will fail.

return new Uint8Array();
}

function mapRowToProto(row: { [key: string]: any }): google.protobuf.IStruct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jit.proto declares that struct contains:

// Rows. For BigQuery, see
  // https://docs.cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults.

In other words, right now we expect a raw API result of "f,v" JSON struct, not bespoke conversion.

The BigQuery client for Node is strange in respect for this - it forcefully decodes those and removes rows from the original request. We could either:
a) Use googleapis package client instead for JiT
b) Come up with another protocol for encoding rows and implement it both here and in GCP. Let me know on chat if you need code pointers for GCP part.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL on the approach with executeRaw

Copy link
Collaborator

@ikholopov-omni ikholopov-omni Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has exactly the same problem. query() inside rawExecute already removes rows from response and only returns decoded ones as first component in the tuple [rows, _, response]

- Enhance error coercion in common/errors/errors.ts
- Sync protos/execution.proto and protos/jit.proto with new fields
@rafal-hawrylak rafal-hawrylak force-pushed the cli_jit_workers branch 8 times, most recently from 6eb0cd1 to a05cf58 Compare March 12, 2026 14:39
* feat: add worker process management for JiT compilation

- Introduce base worker and JiT-specific child process logic

- Implement RPC bridge for database access during JiT

- Add VM scripts and loader for isolated execution

- Add unit tests for the RPC mechanism
Comment on lines +37 to +45
bigquery: {
labels: {
...(options?.labels || {}),
...(requestOptions?.labels || {})
},
location: requestOptions?.location || options?.location,
jobPrefix: [options?.jobPrefix, requestOptions?.jobPrefix].filter(Boolean).join("-") || undefined,
dryRun: !!(options?.dryRun || requestOptions?.dryRun)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be a consistent order of what takes priority here.

Also we'd need to update this logic every time a new option is added to IBigQueryExecutionOptions? I'd suggest using the full value of options by default (and overwrite with requestOptions where it makes sense)

const targets = await dbadapter.tables();
const tablesMetadata = await Promise.all(
targets
.filter(target => !listTablesRequest.schema || target.schema === listTablesRequest.schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface in CLI has only parameterless method:
tables(): Promise<dataform.ITarget[]>;

Will it be working correctly then? JiT request of ListTablesRequest can have different values of database, it may not be the same as whatever project is used by dbadapter.tables() to list tables in.

const getTableRequest = dataform.GetTableRequest.decode(request);
const tableMetadata = await dbadapter.table(getTableRequest.target);
if (!tableMetadata) {
return dataform.TableMetadata.encode(dataform.TableMetadata.create({})).finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather rethrow NOT_FOUND back to the compiler in such cases

Comment on lines +110 to +115
location: options.bigquery?.location,
maxResults: options.rowLimit,
useLegacySql: false,
labels: options.bigquery?.labels,
jobPrefix: options.bigquery?.jobPrefix,
dryRun: options.bigquery?.dryRun
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a common logic how we set options for BQ job calls?

Comment on lines +430 to +433
const [, , apiResponse] = (await job[0].getQueryResults({
maxResults: rowLimit,
location
})) as any;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that it's correct way to generate one more API request here


const requestMessage = dataform.JitCompilationRequest.fromObject(request);
const requestBytes = dataform.JitCompilationRequest.encode(requestMessage).finish();
const requestBase64 = Buffer.from(requestBytes).toString("base64");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do base64 encoding here?

},
root: projectDir,
mock: hasProjectLocalCore ? {} : {
"@dataform/core": require("@dataform/core")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have such a fallback for regular compilation?

const requestBytes = new Uint8Array(Buffer.from(requestBase64, "base64"));

const internalRpcCallback = (method, reqBytes, callback) => {
const reqBase64 = Buffer.from(reqBytes).toString("base64");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same: do we need base64 encoding here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants