JiT Worker in CLI and RPC infrastructure#2109
Conversation
a08d4da to
01fa6ba
Compare
| @@ -0,0 +1,101 @@ | |||
| import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters"; | |||
There was a problem hiding this comment.
To clarify: is this protobuf bridge protocol the same for GCP and CLI implementations?
There was a problem hiding this comment.
Both the CLI and GCP implementations follow the dataform.DbAdapter service defined in jit.proto.
- They both use the same binary "payloads" for the actual data (e.g., ExecuteRequest, ExecuteResponse).
- In CLI all 4 methods are implemented (Execute, ListTables, GetTable, and DeleteTable) and in GCP only Execute as of now (https://source.corp.google.com/piper///depot/google3/cloud/dataform/compilation/jitrunner/worker/engine.cc;rcl=881961593;l=163)
There was a problem hiding this comment.
See my comment below on the issue with "Execute" compatibility
There was a problem hiding this comment.
PTAL on the approach with executeRaw
01fa6ba to
33b9f19
Compare
| } | ||
|
|
||
| constructor() { | ||
| super("../../vm/jit_loader"); |
There was a problem hiding this comment.
I wonder if it is possible to avoid such hard-coded links. I can imagine that if we create jit directory for jit related actions it will fail.
cli/api/commands/jit/rpc.ts
Outdated
| return new Uint8Array(); | ||
| } | ||
|
|
||
| function mapRowToProto(row: { [key: string]: any }): google.protobuf.IStruct { |
There was a problem hiding this comment.
jit.proto declares that struct contains:
// Rows. For BigQuery, see
// https://docs.cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults.
In other words, right now we expect a raw API result of "f,v" JSON struct, not bespoke conversion.
The BigQuery client for Node is strange in respect for this - it forcefully decodes those and removes rows from the original request. We could either:
a) Use googleapis package client instead for JiT
b) Come up with another protocol for encoding rows and implement it both here and in GCP. Let me know on chat if you need code pointers for GCP part.
There was a problem hiding this comment.
PTAL on the approach with executeRaw
There was a problem hiding this comment.
It has exactly the same problem. query() inside rawExecute already removes rows from response and only returns decoded ones as first component in the tuple [rows, _, response]
33b9f19 to
d6583e7
Compare
- Enhance error coercion in common/errors/errors.ts - Sync protos/execution.proto and protos/jit.proto with new fields
6eb0cd1 to
a05cf58
Compare
* feat: add worker process management for JiT compilation - Introduce base worker and JiT-specific child process logic - Implement RPC bridge for database access during JiT - Add VM scripts and loader for isolated execution - Add unit tests for the RPC mechanism
a05cf58 to
be56edb
Compare
| bigquery: { | ||
| labels: { | ||
| ...(options?.labels || {}), | ||
| ...(requestOptions?.labels || {}) | ||
| }, | ||
| location: requestOptions?.location || options?.location, | ||
| jobPrefix: [options?.jobPrefix, requestOptions?.jobPrefix].filter(Boolean).join("-") || undefined, | ||
| dryRun: !!(options?.dryRun || requestOptions?.dryRun) | ||
| } |
There was a problem hiding this comment.
I think it should be a consistent order of what takes priority here.
Also we'd need to update this logic every time a new option is added to IBigQueryExecutionOptions? I'd suggest using the full value of options by default (and overwrite with requestOptions where it makes sense)
| const targets = await dbadapter.tables(); | ||
| const tablesMetadata = await Promise.all( | ||
| targets | ||
| .filter(target => !listTablesRequest.schema || target.schema === listTablesRequest.schema) |
There was a problem hiding this comment.
The interface in CLI has only parameterless method:
tables(): Promise<dataform.ITarget[]>;
Will it be working correctly then? JiT request of ListTablesRequest can have different values of database, it may not be the same as whatever project is used by dbadapter.tables() to list tables in.
| const getTableRequest = dataform.GetTableRequest.decode(request); | ||
| const tableMetadata = await dbadapter.table(getTableRequest.target); | ||
| if (!tableMetadata) { | ||
| return dataform.TableMetadata.encode(dataform.TableMetadata.create({})).finish(); |
There was a problem hiding this comment.
I'd rather rethrow NOT_FOUND back to the compiler in such cases
| location: options.bigquery?.location, | ||
| maxResults: options.rowLimit, | ||
| useLegacySql: false, | ||
| labels: options.bigquery?.labels, | ||
| jobPrefix: options.bigquery?.jobPrefix, | ||
| dryRun: options.bigquery?.dryRun |
There was a problem hiding this comment.
Have a common logic how we set options for BQ job calls?
| const [, , apiResponse] = (await job[0].getQueryResults({ | ||
| maxResults: rowLimit, | ||
| location | ||
| })) as any; |
There was a problem hiding this comment.
Not sure that it's correct way to generate one more API request here
|
|
||
| const requestMessage = dataform.JitCompilationRequest.fromObject(request); | ||
| const requestBytes = dataform.JitCompilationRequest.encode(requestMessage).finish(); | ||
| const requestBase64 = Buffer.from(requestBytes).toString("base64"); |
There was a problem hiding this comment.
Do we need to do base64 encoding here?
| }, | ||
| root: projectDir, | ||
| mock: hasProjectLocalCore ? {} : { | ||
| "@dataform/core": require("@dataform/core") |
There was a problem hiding this comment.
I don't think we have such a fallback for regular compilation?
| const requestBytes = new Uint8Array(Buffer.from(requestBase64, "base64")); | ||
|
|
||
| const internalRpcCallback = (method, reqBytes, callback) => { | ||
| const reqBase64 = Buffer.from(reqBytes).toString("base64"); |
There was a problem hiding this comment.
same: do we need base64 encoding here?
This change establishes the foundation for executing JiT compilation in an isolated environment. It introduces the worker process management, the RPC bridge for database access during JiT, and the necessary Bazel targets.