Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 131 additions & 150 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::ptr::NonNull;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -456,19 +455,8 @@ impl PySessionContext {
) -> PyDataFusionResult<()> {
let options = ListingOptions::new(Arc::new(ParquetFormat::new()))
.with_file_extension(file_extension)
.with_table_partition_cols(
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect::<Vec<(String, DataType)>>(),
)
.with_file_sort_order(
file_sort_order
.unwrap_or_default()
.into_iter()
.map(|e| e.into_iter().map(|f| f.into()).collect())
.collect(),
);
.with_table_partition_cols(convert_partition_cols(table_partition_cols))
.with_file_sort_order(convert_file_sort_order(file_sort_order));
let table_path = ListingTableUrl::parse(path)?;
let resolved_schema: SchemaRef = match schema {
Some(s) => Arc::new(s.0),
Expand Down Expand Up @@ -831,25 +819,15 @@ impl PySessionContext {
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
py: Python,
) -> PyDataFusionResult<()> {
let mut options = ParquetReadOptions::default()
.table_partition_cols(
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect::<Vec<(String, DataType)>>(),
)
.parquet_pruning(parquet_pruning)
.skip_metadata(skip_metadata);
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);
options.file_sort_order = file_sort_order
.unwrap_or_default()
.into_iter()
.map(|e| e.into_iter().map(|f| f.into()).collect())
.collect();

let result = self.ctx.register_parquet(name, path, options);
wait_for_future(py, result)??;
let options = build_parquet_options(
table_partition_cols,
parquet_pruning,
file_extension,
skip_metadata,
&schema,
file_sort_order,
);
wait_for_future(py, self.ctx.register_parquet(name, path, options))??;
Ok(())
}

Expand All @@ -863,19 +841,17 @@ impl PySessionContext {
options: Option<&PyCsvReadOptions>,
py: Python,
) -> PyDataFusionResult<()> {
let options = options
.map(|opts| opts.try_into())
.transpose()?
.unwrap_or_default();
let options = convert_csv_options(options)?;

if path.is_instance_of::<PyList>() {
let paths = path.extract::<Vec<String>>()?;
let result = self.register_csv_from_multiple_paths(name, paths, options);
wait_for_future(py, result)??;
wait_for_future(
py,
self.register_csv_from_multiple_paths(name, paths, options),
)??;
} else {
let path = path.extract::<String>()?;
let result = self.ctx.register_csv(name, &path, options);
wait_for_future(py, result)??;
wait_for_future(py, self.ctx.register_csv(name, &path, options))??;
}

Ok(())
Expand All @@ -892,33 +868,22 @@ impl PySessionContext {
pub fn register_json(
&self,
name: &str,
path: PathBuf,
path: &str,
schema: Option<PyArrowType<Schema>>,
schema_infer_max_records: usize,
file_extension: &str,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
file_compression_type: Option<String>,
py: Python,
) -> PyDataFusionResult<()> {
let path = path
.to_str()
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;

let mut options = JsonReadOptions::default()
.file_compression_type(parse_file_compression_type(file_compression_type)?)
.table_partition_cols(
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect::<Vec<(String, DataType)>>(),
);
options.schema_infer_max_records = schema_infer_max_records;
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);

let result = self.ctx.register_json(name, path, options);
wait_for_future(py, result)??;

let options = build_json_options(
table_partition_cols,
file_compression_type,
schema_infer_max_records,
file_extension,
&schema,
)?;
wait_for_future(py, self.ctx.register_json(name, path, options))??;
Ok(())
}

Expand All @@ -931,28 +896,14 @@ impl PySessionContext {
pub fn register_avro(
&self,
name: &str,
path: PathBuf,
path: &str,
schema: Option<PyArrowType<Schema>>,
file_extension: &str,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
py: Python,
) -> PyDataFusionResult<()> {
let path = path
.to_str()
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;

let mut options = AvroReadOptions::default().table_partition_cols(
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect::<Vec<(String, DataType)>>(),
);
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);

let result = self.ctx.register_avro(name, path, options);
wait_for_future(py, result)??;

let options = build_avro_options(table_partition_cols, file_extension, &schema);
wait_for_future(py, self.ctx.register_avro(name, path, options))??;
Ok(())
}

Expand Down Expand Up @@ -1054,35 +1005,22 @@ impl PySessionContext {
#[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))]
pub fn read_json(
&self,
path: PathBuf,
path: &str,
schema: Option<PyArrowType<Schema>>,
schema_infer_max_records: usize,
file_extension: &str,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
file_compression_type: Option<String>,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
let path = path
.to_str()
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
let mut options = JsonReadOptions::default()
.table_partition_cols(
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect::<Vec<(String, DataType)>>(),
)
.file_compression_type(parse_file_compression_type(file_compression_type)?);
options.schema_infer_max_records = schema_infer_max_records;
options.file_extension = file_extension;
let df = if let Some(schema) = schema {
options.schema = Some(&schema.0);
let result = self.ctx.read_json(path, options);
wait_for_future(py, result)??
} else {
let result = self.ctx.read_json(path, options);
wait_for_future(py, result)??
};
let options = build_json_options(
table_partition_cols,
file_compression_type,
schema_infer_max_records,
file_extension,
&schema,
)?;
let df = wait_for_future(py, self.ctx.read_json(path, options))??;
Ok(PyDataFrame::new(df))
}

Expand All @@ -1095,23 +1033,15 @@ impl PySessionContext {
options: Option<&PyCsvReadOptions>,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
let options = options
.map(|opts| opts.try_into())
.transpose()?
.unwrap_or_default();
let options = convert_csv_options(options)?;

if path.is_instance_of::<PyList>() {
let paths = path.extract::<Vec<String>>()?;
let paths = paths.iter().map(|p| p as &str).collect::<Vec<&str>>();
let result = self.ctx.read_csv(paths, options);
let df = PyDataFrame::new(wait_for_future(py, result)??);
Ok(df)
let paths: Vec<String> = if path.is_instance_of::<PyList>() {
path.extract()?
} else {
let path = path.extract::<String>()?;
let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result)??);
Ok(df)
}
vec![path.extract()?]
};
let df = wait_for_future(py, self.ctx.read_csv(paths, options))??;
Ok(PyDataFrame::new(df))
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -1134,25 +1064,15 @@ impl PySessionContext {
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
let mut options = ParquetReadOptions::default()
.table_partition_cols(
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect::<Vec<(String, DataType)>>(),
)
.parquet_pruning(parquet_pruning)
.skip_metadata(skip_metadata);
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);
options.file_sort_order = file_sort_order
.unwrap_or_default()
.into_iter()
.map(|e| e.into_iter().map(|f| f.into()).collect())
.collect();

let result = self.ctx.read_parquet(path, options);
let df = PyDataFrame::new(wait_for_future(py, result)??);
let options = build_parquet_options(
table_partition_cols,
parquet_pruning,
file_extension,
skip_metadata,
&schema,
file_sort_order,
);
let df = PyDataFrame::new(wait_for_future(py, self.ctx.read_parquet(path, options))??);
Ok(df)
}

Expand All @@ -1166,21 +1086,8 @@ impl PySessionContext {
file_extension: &str,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
let mut options = AvroReadOptions::default().table_partition_cols(
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect::<Vec<(String, DataType)>>(),
);
options.file_extension = file_extension;
let df = if let Some(schema) = schema {
options.schema = Some(&schema.0);
let read_future = self.ctx.read_avro(path, options);
wait_for_future(py, read_future)??
} else {
let read_future = self.ctx.read_avro(path, options);
wait_for_future(py, read_future)??
};
let options = build_avro_options(table_partition_cols, file_extension, &schema);
let df = wait_for_future(py, self.ctx.read_avro(path, options))??;
Ok(PyDataFrame::new(df))
}

Expand Down Expand Up @@ -1280,7 +1187,7 @@ impl PySessionContext {
// check if the file extension matches the expected extension
for path in &table_paths {
let file_path = path.as_str();
if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() {
if !file_path.ends_with(option_extension.as_str()) && !path.is_collection() {
return exec_err!(
"File path '{file_path}' does not match the expected extension '{option_extension}'"
);
Expand Down Expand Up @@ -1321,6 +1228,80 @@ pub fn parse_file_compression_type(
})
}

fn convert_csv_options(
options: Option<&PyCsvReadOptions>,
) -> PyDataFusionResult<CsvReadOptions<'_>> {
Ok(options
.map(|opts| opts.try_into())
.transpose()?
.unwrap_or_default())
}

fn convert_partition_cols(
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
) -> Vec<(String, DataType)> {
table_partition_cols
.into_iter()
.map(|(name, ty)| (name, ty.0))
.collect()
}

fn convert_file_sort_order(
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
) -> Vec<Vec<datafusion::logical_expr::SortExpr>> {
file_sort_order
.unwrap_or_default()
.into_iter()
.map(|e| e.into_iter().map(|f| f.into()).collect())
.collect()
}

fn build_parquet_options<'a>(
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
parquet_pruning: bool,
file_extension: &'a str,
skip_metadata: bool,
schema: &'a Option<PyArrowType<Schema>>,
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
) -> ParquetReadOptions<'a> {
let mut options = ParquetReadOptions::default()
.table_partition_cols(convert_partition_cols(table_partition_cols))
.parquet_pruning(parquet_pruning)
.skip_metadata(skip_metadata);
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);
options.file_sort_order = convert_file_sort_order(file_sort_order);
options
}

fn build_json_options<'a>(
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
file_compression_type: Option<String>,
schema_infer_max_records: usize,
file_extension: &'a str,
schema: &'a Option<PyArrowType<Schema>>,
) -> Result<JsonReadOptions<'a>, PyErr> {
let mut options = JsonReadOptions::default()
.table_partition_cols(convert_partition_cols(table_partition_cols))
.file_compression_type(parse_file_compression_type(file_compression_type)?);
options.schema_infer_max_records = schema_infer_max_records;
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);
Ok(options)
}

fn build_avro_options<'a>(
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
file_extension: &'a str,
schema: &'a Option<PyArrowType<Schema>>,
) -> AvroReadOptions<'a> {
let mut options = AvroReadOptions::default()
.table_partition_cols(convert_partition_cols(table_partition_cols));
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);
options
}

impl From<PySessionContext> for SessionContext {
fn from(ctx: PySessionContext) -> SessionContext {
ctx.ctx.as_ref().clone()
Expand Down
Loading