-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode.ts
More file actions
270 lines (240 loc) · 9.9 KB
/
node.ts
File metadata and controls
270 lines (240 loc) · 9.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import { DuckDBArrayValue, DuckDBBitValue, DuckDBBlobValue, DuckDBDateValue, DuckDBDecimalValue, DuckDBIntervalValue, DuckDBListValue, DuckDBMapValue, DuckDBResult, DuckDBStructValue, DuckDBTimestampMillisecondsValue, DuckDBTimestampNanosecondsValue, DuckDBTimestampSecondsValue, DuckDBTimestampTZValue, DuckDBTimestampValue, DuckDBTimeTZValue, DuckDBTimeValue, DuckDBTypeId, DuckDBUnionValue, DuckDBUUIDValue, type DuckDBConnection, type DuckDBInstance, type DuckDBValue } from '@duckdb/node-api'
import { delta_scan, parquet_scan, read_csv, read_json, read_json_objects, read_parquet, read_text, read_xlsx } from './io'
export { delta_scan, parquet_scan, read_csv, read_json, read_json_objects, read_parquet, read_text, read_xlsx }
import type { DSettings } from './.buck/types'
import { builder } from './src/build'
import { formatJSON, generateInterface, serializeDescribe } from './src/interface-generator'
import { readFileSync, writeFileSync } from 'node:fs'
import { BuckDBBase } from './core'
import { isBucket, isReaderFunction, type Dict } from './src/utils'
import { DuckDBTypeIdMap } from './src/typedef'
import { omit, pick, zipObject } from 'es-toolkit'
import { DuckDBResultReader } from '@duckdb/node-api/lib/DuckDBResultReader'
import Path from 'node:path'
const ModelsJsonPath = Path.join(__dirname, './.buck/models.json')
const ModelsTSPath = Path.join(__dirname, './.buck/models.ts')
const InitConfigKeys = ['s3_access_key_id', 's3_secret_access_key', 's3_region', 's3_session_token']
function mapDuckDBTypeToSchema(typeInfo: any): string | Record<string, any> {
const baseType = DuckDBTypeIdMap[typeInfo.typeId] || 'DAny'
if (typeInfo.typeId === DuckDBTypeId.LIST || typeInfo.typeId === DuckDBTypeId.ARRAY) {
if (typeInfo.valueType) {
return {
__type: 'array',
elementType: mapDuckDBTypeToSchema(typeInfo.valueType)
}
}
return baseType
}
if (typeInfo.typeId === DuckDBTypeId.STRUCT) {
const struct: Record<string, any> = { __type: 'struct' }
if (typeInfo.entryNames && typeInfo.entryTypes) {
typeInfo.entryNames.forEach((name: string, i: number) => {
struct[name] = mapDuckDBTypeToSchema(typeInfo.entryTypes[i])
})
}
return struct
}
return baseType
}
class JsonModelTable {
constructor(
private jsonContent: Dict = JSON.parse(readFileSync(ModelsJsonPath, 'utf-8'))
) {
}
hasSchema(ressource: string, uri: string) {
return this.jsonContent[ressource]?.[uri] ? true : false
}
writeJsonContent() {
const inter = generateInterface(this.jsonContent);
writeFileSync(ModelsJsonPath, formatJSON(this.jsonContent))
writeFileSync(ModelsTSPath, inter)
}
writeSchema(ressource: string, uri: string, schemaJson: Record<string, any>) {
if (!this.jsonContent[ressource]) {
this.jsonContent[ressource] = {}
}
this.jsonContent[ressource][uri] = schemaJson
this.writeJsonContent()
}
writeResultSchema(ressource: string, uri: string, result: DuckDBResult) {
return this.writeSchema(ressource, uri, zipObject(result.columnNames(), result.columnTypes().map(mapDuckDBTypeToSchema)))
}
writeDescribedSchema(ressource: string, uri: string, described: Record<string, any>) {
this.writeSchema(ressource, uri, serializeDescribe(described as any))
}
}
export const jsonModelTable = new JsonModelTable()
if (import.meta.main) {
jsonModelTable.writeJsonContent()
}
const mapValueRec = (value: DuckDBValue) => {
if (value instanceof DuckDBListValue) {
return value.items.map(mapValueRec)
} else if (value instanceof DuckDBDecimalValue) {
return value.toDouble()
}
else if (value instanceof DuckDBDateValue) {
return value.toString()
}
else if (value instanceof DuckDBMapValue) {
return new Map(value.entries.map(x => [x.key, x.value]))
} else if (value instanceof DuckDBStructValue) {
const rtn = {}
for (const [key, val] of Object.entries(value.entries)) {
rtn[key] = mapValueRec(val)
}
return rtn
} else if (value instanceof DuckDBUUIDValue) {
return value.toString()
} else if (typeof value === 'bigint') {
return Number(value)
} else if (value instanceof DuckDBTimestampValue || value instanceof DuckDBTimeValue || value instanceof DuckDBTimeTZValue || value instanceof DuckDBTimestampTZValue) {
return new Date(Number(value.micros) * 1e-6)
} else if (value instanceof DuckDBTimestampSecondsValue) {
return new Date(Number(value.seconds))
} else if (value instanceof DuckDBTimestampMillisecondsValue) {
return new Date(Number(value.millis) * 1e-3)
} else if (value instanceof DuckDBTimestampNanosecondsValue) {
return new Date(Number(value.nanos) / 1e-9)
} else if (value instanceof DuckDBBlobValue) {
return value.bytes
} else if (value instanceof DuckDBBitValue) {
return value.data
} else if (value instanceof DuckDBIntervalValue) {
return value.toString()
} else if (value instanceof DuckDBArrayValue) {
return value.items.map(mapValueRec)
} else if (value instanceof DuckDBUnionValue) {
// todo, why isnt that an
return mapValueRec(value.value)
} else {
return value
}
}
function mapRowToObject(rowData: Record<string, any>): Dict {
const row: Dict = {}
for (const [key, value] of Object.entries(rowData)) {
row[key] = mapValueRec(value as DuckDBValue)
}
return row
}
function buildResult(reader: DuckDBResultReader) {
const rows = reader.getRows()
// @ts-ignore
const columnNames = reader.result.columnNames()
const rtn: Dict[] = []
for (let item of rows) {
// Convert array row to object using column names
const rowObj: Record<string, any> = {}
for (const [i, name] of columnNames.entries()) {
rowObj[name] = item[i]
}
rtn.push(mapRowToObject(rowObj))
}
return rtn
}
class BuckDBNode extends BuckDBBase {
readonly type = 'node' as const
private _instance: DuckDBInstance
private _connection: DuckDBConnection
private _initPromise: Promise<void> | null = null
public isBucket: boolean = false
constructor(
handle?: string,
settings?: Partial<DSettings>,
) {
super(handle, settings)
this.isBucket = !!isBucket(handle)
this._instance = null as unknown as DuckDBInstance
this._connection = null as unknown as DuckDBConnection
}
private _initDB(): Promise<void> {
if (this._initPromise) {
return this._initPromise
}
if (this._instance && this._connection) {
return Promise.resolve()
}
this._initPromise = (async () => {
const { DuckDBInstance } = await import('@duckdb/node-api')
let h = this.handle || ':memory:'
if (isBucket(this.handle)) {
this.lazySettings({ file_search_path: this.handle })
h = ':memory:'
}
const configSettings = pick(this.settings, InitConfigKeys)
this.lazySettings(omit(this.settings, InitConfigKeys))
this._instance = await DuckDBInstance.create(h, configSettings as any)
this._connection = await this._instance.connect()
})()
return this._initPromise
}
async upsertSchema(_model: string, _schema: Record<string, string>) {
// await this._initDB();
// const tableFile = Bun.file(`./.buck/models.json`);
// const tableContent = await tableFile.json();
// if (!tableContent[this.handle || '']) {
// tableContent[this.handle || ''] = {};
// }
// tableContent[this.handle || ''][model] = schema;
// await tableFile.write(JSON.stringify(tableContent, null, 2));
// const tsfile = generateInterface(tableContent);
// await Bun.file('./.buck/models.ts').write(tsfile);
}
async ensureSchema(_uri: string) {
const isInBrowser = typeof window !== 'undefined';
const isReader = !!isReaderFunction(_uri);
const uri = isInBrowser || !isReader ? this.getSchemaUri(_uri) : _uri;
const h = this.handle || ''
if (jsonModelTable.hasSchema(h, uri)) {
return
}
const describeResp = await this.describe(_uri)
jsonModelTable.writeDescribedSchema(h, uri, describeResp)
}
async query(sql: string, opts: Record<string, any> = {}) {
await this._initDB()
const cmds = this.queue.flush()
for (const cmd of cmds) {
await this._connection.run(cmd)
}
const run = await this._connection.run(sql)
const reader = new DuckDBResultReader(run as any);
await reader.readAll()
if (opts?.rows) {
return reader.getRowsJson()
}
return buildResult(reader)
}
async *stream(sql: string) {
await this._initDB()
const cmds = this.queue.flush()
for (const cmd of cmds) {
await this._connection.run(cmd)
}
const result = await this._connection.run(sql)
// Use the built-in yieldRowObjects method from PR #303
// This yields Record<string, DuckDBValue> for each row
for await (const rowObjects of result.yieldRowObjects()) {
for (const rowObj of rowObjects) {
yield mapRowToObject(rowObj)
}
}
}
async run(sql: string) {
await this._initDB()
return this._connection.run(sql)
}
update(tableName: string) {
return update(this, tableName)
}
raw(sql: string) {
return {
execute: () => this.execute(sql),
toSql: () => sql,
}
}
}
export const Buck = builder(BuckDBNode)
export const MemoryDB = Buck('')
export const from = MemoryDB.from