Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = [ "crates/core", "crates/taurus", "crates/tests" ]
members = [ "crates/core", "crates/manual", "crates/taurus", "crates/tests" , "crates/tests-core"]
resolver = "3"

[workspace.package]
Expand All @@ -25,3 +25,6 @@ serde = "1.0.228"

[workspace.dependencies.taurus-core]
path = "./crates/core"

[workspace.dependencies.tests-core]
path = "./crates/tests-core"
11 changes: 9 additions & 2 deletions crates/core/src/context/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use crate::runtime::remote::RemoteRuntime;

use futures_lite::future::block_on;
use std::collections::HashMap;
use tucana::aquila::execution_result::Result as ExecutionOutcome;
use tucana::aquila::{ActionRuntimeError, ExecutionRequest, ExecutionResult};
use tucana::aquila::ExecutionRequest;
use tucana::shared::reference_value::Target;
use tucana::shared::value::Kind;
use tucana::shared::{NodeFunction, Struct, Value};
Expand All @@ -56,6 +55,14 @@ pub struct Executor<'a> {
/// The current policy treats any node whose `definition_source` is not `"taurus"`
/// as a remote node.
fn is_remote(node: &NodeFunction) -> bool {
if node.definition_source == "" {
log::warn!(
"Found empty definition source, taking runtime as origin for node id: {}",
node.database_id
);
return false;
}

node.definition_source != "taurus"
}

Expand Down
18 changes: 18 additions & 0 deletions crates/manual/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "manual"
version.workspace = true
edition.workspace = true

[dependencies]
tests-core = { workspace = true }
tucana = { workspace = true }
taurus-core = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
prost = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
async-nats = { workspace = true }
clap ={ version = "4.6.0", features= ["derive"] }
160 changes: 160 additions & 0 deletions crates/manual/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use std::collections::HashMap;

use async_nats::Client;
use clap::{Parser, arg, command};
use prost::Message;
use taurus_core::context::{context::Context, executor::Executor, registry::FunctionStore};
use taurus_core::runtime::{error::RuntimeError, remote::RemoteRuntime};
use tests_core::Case;
use tonic::async_trait;
use tucana::shared::helper::value::to_json_value;
use tucana::shared::{NodeFunction, helper::value::from_json_value};
use tucana::{
aquila::{ExecutionRequest, ExecutionResult},
shared::Value,
};

pub struct RemoteNatsClient {
client: Client,
}

impl RemoteNatsClient {
pub fn new(client: Client) -> Self {
RemoteNatsClient { client }
}
}

#[async_trait]
impl RemoteRuntime for RemoteNatsClient {
async fn execute_remote(
&self,
remote_name: String,
request: ExecutionRequest,
) -> Result<Value, RuntimeError> {
let topic = format!("action.{}.{}", remote_name, request.execution_identifier);
let payload = request.encode_to_vec();
let res = self.client.request(topic, payload.into()).await;
let message = match res {
Ok(r) => r,
Err(_) => {
return Err(RuntimeError::simple_str(
"RemoteRuntimeExeption",
"Failed to handle NATS message",
));
}
};

let decode_result = ExecutionResult::decode(message.payload);
let execution_result = match decode_result {
Ok(r) => r,
Err(_) => {
return Err(RuntimeError::simple_str(
"RemoteRuntimeExeption",
"Failed to decode NATS message",
));
}
};

match execution_result.result {
Some(result) => match result {
tucana::aquila::execution_result::Result::Success(value) => Ok(value),
tucana::aquila::execution_result::Result::Error(err) => {
let name = err.code.to_string();
let description = match err.description {
Some(string) => string,
None => "Unknown Error".to_string(),
};
let error = RuntimeError::new(name, description, None);
Err(error)
}
},
None => Err(RuntimeError::simple_str(
"RemoteRuntimeExeption",
"Result of Remote Response was empty.",
)),
}
}
}

#[derive(clap::Parser, Debug)]
#[command(author, version, about)]
struct Args {
/// Index value
#[arg(short, long, default_value_t = 0)]
index: i32,

/// NATS server URL
#[arg(short, long, default_value_t = String::from("nats://127.0.0.1:4222"))]
nats_url: String,

/// Path value
#[arg(short, long)]
path: String,
}

#[tokio::main]
async fn main() {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.init();

let args = Args::parse();
let index = args.index;
let nats_url = args.nats_url;
let path = args.path;
let case = Case::from_path(&path);

let store = FunctionStore::default();

let node_functions: HashMap<i64, NodeFunction> = case
.clone()
.flow
.node_functions
.into_iter()
.map(|node| (node.database_id, node))
.collect();

let mut context = match case.inputs.get(index as usize) {
Some(inp) => match inp.input.clone() {
Some(json_input) => Context::new(from_json_value(json_input)),
None => Context::default(),
},
None => Context::default(),
};

let client = match async_nats::connect(nats_url).await {
Ok(client) => {
log::info!("Connected to nats server");
client
}
Err(err) => {
panic!("Failed to connect to NATS server: {}", err);
}
};
let remote = RemoteNatsClient::new(client);
let result = Executor::new(&store, node_functions.clone())
.with_remote_runtime(&remote)
.execute(case.flow.starting_node_id, &mut context, true);

match result {
taurus_core::context::signal::Signal::Success(value) => {
let json = to_json_value(value);
let pretty = serde_json::to_string_pretty(&json).unwrap();
println!("{}", pretty);
}
taurus_core::context::signal::Signal::Return(value) => {
let json = to_json_value(value);
let pretty = serde_json::to_string_pretty(&json).unwrap();
println!("{}", pretty);
}
taurus_core::context::signal::Signal::Respond(value) => {
let json = to_json_value(value);
let pretty = serde_json::to_string_pretty(&json).unwrap();
println!("{}", pretty);
}
taurus_core::context::signal::Signal::Stop => println!("Received Stop signal"),
taurus_core::context::signal::Signal::Failure(runtime_error) => {
println!("RuntimeError: {:?}", runtime_error);
}
}
}
13 changes: 13 additions & 0 deletions crates/tests-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "tests-core"
version.workspace = true
edition.workspace = true

[dependencies]
tucana = { workspace = true }
taurus-core = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }

Loading