Skip to content

Commit a05cf58

Browse files
feat: implement JiT worker and RPC infrastructure
* feat: add worker process management for JiT compilation - Introduce base worker and JiT-specific child process logic - Implement RPC bridge for database access during JiT - Add VM scripts and loader for isolated execution - Add unit tests for the RPC mechanism
1 parent 5b62867 commit a05cf58

14 files changed

Lines changed: 933 additions & 46 deletions

File tree

cli/api/BUILD

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package(default_visibility = ["//visibility:public"])
22

33
load("//tools:ts_library.bzl", "ts_library")
4+
load("//testing:index.bzl", "ts_test_suite")
45

56
ts_library(
67
name = "api",
78
srcs = glob(
89
["**/*.ts"],
9-
exclude = ["utils/**/*.*"],
10+
exclude = [
11+
"utils/**/*.*",
12+
"**/*_test.ts",
13+
],
1014
),
1115
deps = [
1216
"//cli/api/utils",
@@ -42,3 +46,18 @@ ts_library(
4246
"@npm//tmp",
4347
],
4448
)
49+
50+
ts_test_suite(
51+
name = "tests",
52+
srcs = ["commands/jit/rpc_test.ts"],
53+
deps = [
54+
":api",
55+
"//protos:ts",
56+
"//testing",
57+
"@npm//@types/chai",
58+
"@npm//@types/long",
59+
"@npm//@types/node",
60+
"@npm//chai",
61+
"@npm//ts-mockito",
62+
],
63+
)

cli/api/commands/base_worker.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { ChildProcess, fork } from "child_process";
2+
3+
export abstract class BaseWorker<TResponse, TMessage = any> {
4+
protected constructor(private readonly loaderPath: string) {}
5+
6+
protected async runWorker(
7+
timeoutMillis: number,
8+
onBoot: (child: ChildProcess) => void,
9+
onMessage: (message: TMessage, child: ChildProcess, resolve: (res: TResponse) => void, reject: (err: Error) => void) => void
10+
): Promise<TResponse> {
11+
const forkScript = this.resolveScript();
12+
const child = fork(forkScript, [], {
13+
stdio: [0, 1, 2, "ipc", "pipe"]
14+
});
15+
16+
return new Promise((resolve, reject) => {
17+
let completed = false;
18+
19+
const terminate = (fn: () => void) => {
20+
if (completed) {
21+
return;
22+
}
23+
completed = true;
24+
clearTimeout(timeout);
25+
child.kill();
26+
fn();
27+
};
28+
29+
const timeout = setTimeout(() => {
30+
terminate(() =>
31+
reject(new Error(`Worker timed out after ${timeoutMillis / 1000} seconds`))
32+
);
33+
}, timeoutMillis);
34+
35+
child.on("message", (message: any) => {
36+
if (message.type === "worker_booted") {
37+
onBoot(child);
38+
return;
39+
}
40+
onMessage(message, child, (res) => terminate(() => resolve(res)), (err) => terminate(() => reject(err)));
41+
});
42+
43+
child.on("error", err => {
44+
terminate(() => reject(err));
45+
});
46+
47+
child.on("exit", (code, signal) => {
48+
if (!completed) {
49+
const errorMsg =
50+
code !== 0 && code !== null
51+
? `Worker exited with code ${code} and signal ${signal}`
52+
: "Worker exited without sending a response message";
53+
terminate(() => reject(new Error(errorMsg)));
54+
}
55+
});
56+
});
57+
}
58+
59+
private resolveScript() {
60+
const pathsToTry = [this.loaderPath, "./worker_bundle.js"];
61+
for (const p of pathsToTry) {
62+
try {
63+
return require.resolve(p);
64+
} catch (e) {
65+
// Continue to next path.
66+
}
67+
}
68+
throw new Error(`Could not resolve worker script. Tried: ${pathsToTry.join(", ")}`);
69+
}
70+
}

cli/api/commands/jit/compiler.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { ChildProcess } from "child_process";
2+
3+
import { BaseWorker } from "df/cli/api/commands/base_worker";
4+
import { handleDbRequest } from "df/cli/api/commands/jit/rpc";
5+
import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
6+
import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
7+
import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants";
8+
import { dataform } from "df/protos/ts";
9+
10+
export interface IJitWorkerMessage {
11+
type: "rpc_request" | "jit_response" | "jit_error";
12+
method?: string;
13+
request?: string;
14+
correlationId?: string;
15+
response?: any;
16+
error?: string;
17+
}
18+
19+
export class JitCompileChildProcess extends BaseWorker<
20+
dataform.IJitCompilationResponse,
21+
IJitWorkerMessage
22+
> {
23+
public static async compile(
24+
request: dataform.IJitCompilationRequest,
25+
projectDir: string,
26+
dbadapter: IDbAdapter,
27+
dbclient: IDbClient,
28+
timeoutMillis: number = DEFAULT_COMPILATION_TIMEOUT_MILLIS,
29+
options?: IBigQueryExecutionOptions
30+
): Promise<dataform.IJitCompilationResponse> {
31+
return await new JitCompileChildProcess().run(
32+
request,
33+
projectDir,
34+
dbadapter,
35+
dbclient,
36+
timeoutMillis,
37+
options
38+
);
39+
}
40+
41+
constructor() {
42+
super("../../../vm/jit_loader");
43+
}
44+
45+
private async run(
46+
request: dataform.IJitCompilationRequest,
47+
projectDir: string,
48+
dbadapter: IDbAdapter,
49+
dbclient: IDbClient,
50+
timeoutMillis: number,
51+
options?: IBigQueryExecutionOptions
52+
): Promise<dataform.IJitCompilationResponse> {
53+
return await this.runWorker(
54+
timeoutMillis,
55+
child => {
56+
child.send({
57+
type: "jit_compile",
58+
request,
59+
projectDir
60+
});
61+
},
62+
async (message, child, resolve, reject) => {
63+
if (message.type === "rpc_request") {
64+
await this.handleRpcRequest(message, child, dbadapter, dbclient, options);
65+
} else if (message.type === "jit_response") {
66+
resolve(dataform.JitCompilationResponse.fromObject(message.response));
67+
} else if (message.type === "jit_error") {
68+
reject(new Error(message.error));
69+
}
70+
}
71+
);
72+
}
73+
74+
private async handleRpcRequest(
75+
message: IJitWorkerMessage,
76+
child: ChildProcess,
77+
dbadapter: IDbAdapter,
78+
dbclient: IDbClient,
79+
options?: IBigQueryExecutionOptions
80+
) {
81+
try {
82+
const response = await handleDbRequest(
83+
dbadapter,
84+
dbclient,
85+
message.method,
86+
Buffer.from(message.request, "base64"),
87+
options
88+
);
89+
child.send({
90+
type: "rpc_response",
91+
correlationId: message.correlationId,
92+
response: Buffer.from(response).toString("base64")
93+
});
94+
} catch (e) {
95+
child.send({
96+
type: "rpc_response",
97+
correlationId: message.correlationId,
98+
error: e.message
99+
});
100+
}
101+
}
102+
}

cli/api/commands/jit/rpc.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
2+
import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
3+
import { Structs } from "df/common/protos/structs";
4+
import { dataform, google } from "df/protos/ts";
5+
6+
export async function handleDbRequest(
7+
dbadapter: IDbAdapter,
8+
dbclient: IDbClient,
9+
method: string,
10+
request: Uint8Array,
11+
options?: IBigQueryExecutionOptions
12+
): Promise<Uint8Array> {
13+
switch (method) {
14+
case "Execute":
15+
return await handleExecute(dbclient, request, options);
16+
case "ListTables":
17+
return await handleListTables(dbadapter, request);
18+
case "GetTable":
19+
return await handleGetTable(dbadapter, request);
20+
case "DeleteTable":
21+
return await handleDeleteTable(dbadapter, request, options?.dryRun);
22+
default:
23+
throw new Error(`Unrecognized RPC method: ${method}`);
24+
}
25+
}
26+
27+
async function handleExecute(
28+
dbclient: IDbClient,
29+
request: Uint8Array,
30+
options?: IBigQueryExecutionOptions
31+
): Promise<Uint8Array> {
32+
const executeRequest = dataform.ExecuteRequest.decode(request);
33+
const requestOptions = executeRequest.bigQueryOptions;
34+
35+
const results = await dbclient.executeRaw(executeRequest.statement, {
36+
rowLimit: executeRequest.rowLimit ? executeRequest.rowLimit.toNumber() : undefined,
37+
bigquery: {
38+
labels: {
39+
...(options?.labels || {}),
40+
...(requestOptions?.labels || {})
41+
},
42+
location: requestOptions?.location || options?.location,
43+
jobPrefix: [options?.jobPrefix, requestOptions?.jobPrefix].filter(Boolean).join("-") || undefined,
44+
dryRun: !!(options?.dryRun || requestOptions?.dryRun)
45+
}
46+
});
47+
48+
const executeResponse = dataform.ExecuteResponse.create({
49+
rows: (results.rawRows || results.rows).map(row => Structs.fromObject(row))
50+
});
51+
return dataform.ExecuteResponse.encode(executeResponse).finish();
52+
}
53+
54+
async function handleListTables(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> {
55+
const listTablesRequest = dataform.ListTablesRequest.decode(request);
56+
const targets = await dbadapter.tables();
57+
const tablesMetadata = await Promise.all(
58+
targets
59+
.filter(target => !listTablesRequest.schema || target.schema === listTablesRequest.schema)
60+
.map(target => dbadapter.table(target))
61+
);
62+
const listTablesResponse = dataform.ListTablesResponse.create({
63+
tables: tablesMetadata
64+
});
65+
return dataform.ListTablesResponse.encode(listTablesResponse).finish();
66+
}
67+
68+
async function handleGetTable(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> {
69+
const getTableRequest = dataform.GetTableRequest.decode(request);
70+
const tableMetadata = await dbadapter.table(getTableRequest.target);
71+
if (!tableMetadata) {
72+
return dataform.TableMetadata.encode(dataform.TableMetadata.create({})).finish();
73+
}
74+
return dataform.TableMetadata.encode(tableMetadata).finish();
75+
}
76+
77+
async function handleDeleteTable(
78+
dbadapter: IDbAdapter,
79+
request: Uint8Array,
80+
dryRun?: boolean
81+
): Promise<Uint8Array> {
82+
const deleteTableRequest = dataform.DeleteTableRequest.decode(request);
83+
if (dryRun) {
84+
return new Uint8Array();
85+
}
86+
await dbadapter.deleteTable(deleteTableRequest.target);
87+
return new Uint8Array();
88+
}

0 commit comments

Comments
 (0)