diff --git a/.gitignore b/.gitignore index f7dff0f..41bbd59 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ target .idea - +./definitions +.definitions +definitions .DS_Store ./out out diff --git a/Cargo.lock b/Cargo.lock index abaeb2b..7c2c917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + [[package]] name = "bytes" version = "1.11.1" @@ -207,9 +213,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.57" +version = "1.2.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" +checksum = "b7a4d3ec6524d28a329fc53654bbadc9bdd7b0431f5d65f1a56ffb28a1ee5283" dependencies = [ "find-msvc-tools", "shlex", @@ -241,6 +247,46 @@ dependencies = [ "num-traits", ] +[[package]] +name = "clap" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + [[package]] name = "code0-flow" version = "0.0.29" @@ -428,9 +474,9 @@ checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" [[package]] name = "env_filter" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" dependencies = [ "log", "regex", @@ -467,9 +513,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "a043dc74da1e37d6afe657061213aa6f425f855399a11d3463c6ecccc4dfda1f" [[package]] name = "fiat-crypto" @@ -691,9 +737,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" dependencies = [ "atomic-waker", "bytes", @@ -706,7 +752,6 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "pin-utils", "smallvec", "tokio", "want", @@ -747,12 +792,13 @@ dependencies = [ [[package]] name = "icu_collections" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" dependencies = [ "displaydoc", "potential_utf", + "utf8_iter", "yoke", "zerofrom", "zerovec", @@ -760,9 +806,9 @@ dependencies = [ [[package]] name = "icu_locale_core" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" dependencies = [ "displaydoc", "litemap", @@ -773,9 +819,9 @@ dependencies = [ [[package]] name = "icu_normalizer" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" dependencies = [ "icu_collections", "icu_normalizer_data", @@ -787,15 +833,15 @@ dependencies = [ [[package]] name = "icu_normalizer_data" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" [[package]] name = "icu_properties" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" dependencies = [ "icu_collections", "icu_locale_core", @@ -807,15 +853,15 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" [[package]] name = "icu_provider" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" dependencies = [ "displaydoc", "icu_locale_core", @@ -855,9 +901,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.0" +version = "2.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff" dependencies = [ "equivalent", "hashbrown 0.16.1", @@ -882,9 +928,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "jiff" @@ -910,6 +956,16 @@ dependencies = [ "syn", ] +[[package]] +name = "js-sys" +version = "0.3.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e04e2ef80ce82e13552136fabeef8a5ed1f985a96805761cbb9a2c34e7664d9" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "leb128fmt" version = "0.1.0" @@ -918,9 +974,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.183" +version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" [[package]] name = "linux-raw-sys" @@ -930,9 +986,9 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" [[package]] name = "log" @@ -940,6 +996,25 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "manual" +version = "0.1.0" +dependencies = [ + "async-nats", + "async-trait", + "clap", + "env_logger", + "log", + "prost", + "serde", + "serde_json", + "taurus-core", + "tests-core", + "tokio", + "tonic", + "tucana 0.0.66", +] + [[package]] name = "matchit" version = "0.8.4" @@ -1001,9 +1076,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" [[package]] name = "num-traits" @@ -1127,12 +1202,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "pkcs8" version = "0.10.2" @@ -1160,9 +1229,9 @@ dependencies = [ [[package]] name = "potential_utf" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" dependencies = [ "zerovec", ] @@ -1256,9 +1325,9 @@ dependencies = [ [[package]] name = "pulldown-cmark" -version = "0.13.1" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83c41efbf8f90ac44de7f3a868f0867851d261b56291732d0cbf7cceaaeb55a6" +checksum = "7c3a14896dfa883796f1cb410461aef38810ea05f2b2c33c5aded3649095fdad" dependencies = [ "bitflags", "memchr", @@ -1447,6 +1516,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "same-file" version = "1.0.6" @@ -1490,9 +1565,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" [[package]] name = "serde" @@ -1644,6 +1719,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -1701,10 +1782,13 @@ dependencies = [ name = "taurus-core" version = "0.1.0" dependencies = [ + "async-trait", "base64", + "futures-lite", "log", "rand 0.10.0", "tucana 0.0.66", + "uuid", ] [[package]] @@ -1729,6 +1813,17 @@ dependencies = [ "serde", "serde_json", "taurus-core", + "tests-core", + "tucana 0.0.66", +] + +[[package]] +name = "tests-core" +version = "0.1.0" +dependencies = [ + "log", + "serde", + "serde_json", "tucana 0.0.66", ] @@ -1785,9 +1880,9 @@ dependencies = [ [[package]] name = "tinystr" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" dependencies = [ "displaydoc", "zerovec", @@ -2129,6 +2224,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" +dependencies = [ + "getrandom 0.4.2", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "version_check" version = "0.9.5" @@ -2178,6 +2284,51 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0551fc1bb415591e3372d0bc4780db7e587d84e2a7e79da121051c5c4b89d0b0" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fbdf9a35adf44786aecd5ff89b4563a90325f9da0923236f6104e603c7e86be" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dca9693ef2bab6d4e6707234500350d8dad079eb508dca05530c85dc3a529ff2" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39129a682a6d2d841b6c429d0c51e5cb0ed1a03829d8b3d1e69a011e62cb3d3b" +dependencies = [ + "unicode-ident", +] + [[package]] name = "wasm-encoder" version = "0.244.0" @@ -2417,15 +2568,15 @@ dependencies = [ [[package]] name = "writeable" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "yoke" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" dependencies = [ "stable_deref_trait", "yoke-derive", @@ -2434,9 +2585,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" dependencies = [ "proc-macro2", "quote", @@ -2446,18 +2597,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.42" +version = "0.8.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.42" +version = "0.8.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" dependencies = [ "proc-macro2", "quote", @@ -2466,18 +2617,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" dependencies = [ "proc-macro2", "quote", @@ -2493,9 +2644,9 @@ checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" [[package]] name = "zerotrie" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" dependencies = [ "displaydoc", "yoke", @@ -2504,9 +2655,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" dependencies = [ "yoke", "zerofrom", @@ -2515,9 +2666,9 @@ dependencies = [ [[package]] name = "zerovec-derive" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 342dcf0..ae724b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [ "crates/core", "crates/taurus", "crates/tests" ] +members = [ "crates/core", "crates/manual", "crates/taurus", "crates/tests" , "crates/tests-core"] resolver = "3" [workspace.package] @@ -7,6 +7,7 @@ version = "0.1.0" edition = "2024" [workspace.dependencies] +async-trait = "0.1.89" code0-flow = { version = "0.0.29" } tucana = { version = "0.0.66" } tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] } @@ -21,6 +22,9 @@ tonic-health = "0.14.1" tonic = "0.14.1" serde_json = "1.0.149" serde = "1.0.228" - +uuid = { version = "1.23.0", features = ["v4"] } [workspace.dependencies.taurus-core] path = "./crates/core" + +[workspace.dependencies.tests-core] +path = "./crates/tests-core" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 78da80b..fc6f7f9 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -8,3 +8,7 @@ tucana = { workspace = true } base64 = { workspace = true } rand = { workspace = true } log = { workspace = true } +futures-lite = { workspace = true } +async-trait = { workspace = true } +uuid = { workspace = true } + diff --git a/crates/core/src/context/argument.rs b/crates/core/src/context/argument.rs index b49fa68..c99647a 100644 --- a/crates/core/src/context/argument.rs +++ b/crates/core/src/context/argument.rs @@ -46,7 +46,7 @@ impl TryFromArgument for NumberValue { match a { Argument::Eval(Value { kind: Some(Kind::NumberValue(n)), - }) => Ok(n.clone()), + }) => Ok(*n), _ => Err(type_err("Expected number", a)), } } diff --git a/crates/core/src/context/context.rs b/crates/core/src/context/context.rs index 06ef142..c868bca 100644 --- a/crates/core/src/context/context.rs +++ b/crates/core/src/context/context.rs @@ -62,19 +62,15 @@ impl Context { for path in reference.paths { if let Some(index) = path.array_index { match curr.kind { - Some(ref kind) => { - match kind { - Kind::ListValue(list) => { - match list.values.get(index as usize) { - Some(x) => { - curr = x.clone(); - } - None => return ContextResult::NotFound, - } + Some(ref kind) => match kind { + Kind::ListValue(list) => match list.values.get(index as usize) { + Some(x) => { + curr = x.clone(); } - _ => return ContextResult::NotFound, - } - } + None => return ContextResult::NotFound, + }, + _ => return ContextResult::NotFound, + }, None => return ContextResult::NotFound, } } diff --git a/crates/core/src/context/executor.rs b/crates/core/src/context/executor.rs index fe618c4..60568cf 100644 --- a/crates/core/src/context/executor.rs +++ b/crates/core/src/context/executor.rs @@ -1,3 +1,26 @@ +//! Executor for flow node execution. +//! +//! Execution model overview: +//! - The executor walks a linear "next" chain starting from `starting_node_id`. +//! - Each node can call into other nodes through lazy arguments. +//! - A node marked as remote is executed via `RemoteRuntime`. +//! - The executor is synchronous; remote calls are awaited via `block_on`. +//! +//! Remote execution: +//! - A node is considered remote based on `is_remote(&node)`. +//! - Remote args are fully resolved to concrete `Value`s before sending. +//! - The request parameters are mapped by `runtime_parameter_id`. +//! - Remote responses are mapped into `Signal::Success` or `Signal::Failure`. +//! +//! Tracing: +//! - Each node execution produces a trace frame with arguments and outcome. +//! - Child executions are linked with `EdgeKind` to reflect eager or runtime calls. +//! +//! Error behavior: +//! - Missing nodes/functions yield `Signal::Failure`. +//! - Remote failures are mapped to `RuntimeError`. +//! - The executor commits all final outcomes into the `Context`. + use crate::context::argument::{Argument, ParameterNode}; use crate::context::context::{Context, ContextResult}; use crate::context::registry::{FunctionStore, HandlerFunctionEntry}; @@ -5,23 +28,72 @@ use crate::context::signal::Signal; use crate::debug::trace::{ArgKind, ArgTrace, EdgeKind, Outcome, ReferenceKind}; use crate::debug::tracer::{ExecutionTracer, Tracer}; use crate::runtime::error::RuntimeError; +use crate::runtime::remote::RemoteRuntime; +use futures_lite::future::block_on; use std::collections::HashMap; +use tucana::aquila::ExecutionRequest; use tucana::shared::reference_value::Target; use tucana::shared::value::Kind; -use tucana::shared::{NodeFunction, Value}; +use tucana::shared::{NodeFunction, Struct, Value}; +use uuid::Uuid; +/// Executes a flow graph by repeatedly evaluating nodes. +/// +/// The executor is intentionally stateless with respect to the runtime: +/// it borrows the function registry and graph, and mutates only the `Context`. pub struct Executor<'a> { + // Registered Runtime Functions functions: &'a FunctionStore, + // Nodes to execute nodes: HashMap, + // Connection for Remote Function Execution => Actions + remote: Option<&'a dyn RemoteRuntime>, +} + +/// Determines whether a node should be executed remotely. +/// +/// The current policy treats any node whose `definition_source` is not `"taurus"` +/// as a remote node. +fn is_remote(node: &NodeFunction) -> bool { + if node.definition_source.is_empty() { + log::warn!( + "Found empty definition source, taking runtime as origin for node id: {}", + node.database_id + ); + return false; + } + + node.definition_source != "taurus" } impl<'a> Executor<'a> { + /// Create a new executor for the given function store and node map. + /// + /// This does not attach a remote runtime. Remote nodes will error unless + /// a runtime is provided via `with_remote_runtime`. pub fn new(functions: &'a FunctionStore, nodes: HashMap) -> Self { - Self { functions, nodes } + Self { + functions, + nodes, + remote: None, + } + } + + /// Attach a remote runtime for executing nodes marked as remote. + /// + /// This is a builder-style method for ergonomic setup: + /// `Executor::new(...).with_remote_runtime(&runtime)`. + pub fn with_remote_runtime(mut self, remote: &'a dyn RemoteRuntime) -> Self { + self.remote = Some(remote); + self } /// This is now the ONLY execution entry point. + /// + /// - `start_node_id` is the first node in the flow. + /// - `ctx` is mutated in-place with results and errors. + /// - `with_trace` controls whether the trace is printed on completion. pub fn execute(&self, start_node_id: i64, ctx: &mut Context, with_trace: bool) -> Signal { let mut tracer = Tracer::new(); @@ -33,7 +105,10 @@ impl<'a> Executor<'a> { signal } - // Main execution loop + /// Main execution loop. + /// + /// Executes nodes one-by-one along the `next_node_id` chain until a + /// non-success signal is produced or the chain ends. fn execute_call( &self, start_node_id: i64, @@ -75,7 +150,13 @@ impl<'a> Executor<'a> { } } - // executes a single node + /// Execute a single node and return its signal and trace frame id. + /// + /// This handles: + /// - Node lookup + /// - Remote vs local dispatch + /// - Argument building and eager evaluation + /// - Handler invocation and result commit fn execute_single_node( &self, node_id: i64, @@ -92,6 +173,67 @@ impl<'a> Executor<'a> { } }; + if is_remote(&node) { + let remote = match self.remote { + Some(r) => r, + None => { + let err = RuntimeError::simple( + "RemoteRuntimeNotConfigured", + "Remote runtime not configured".to_string(), + ); + return (Signal::Failure(err), 0); + } + }; + + let frame_id = tracer.enter_node(node.database_id, node.runtime_function_id.as_str()); + + let mut args = match self.build_args(&node, ctx, tracer, frame_id) { + Ok(a) => a, + Err(e) => { + ctx.insert_error(node.database_id, e.clone()); + tracer.exit_node( + frame_id, + Outcome::Failure { + error_preview: format!("{:#?}", e), + }, + ); + return (Signal::Failure(e), frame_id); + } + }; + + let values = match self.resolve_remote_args(&mut args, ctx, tracer, frame_id) { + Ok(v) => v, + Err((sig, outcome)) => { + tracer.exit_node(frame_id, outcome); + return (sig, frame_id); + } + }; + + let request = match self.build_remote_request(&node, values) { + Ok(r) => r, + Err(e) => { + ctx.insert_error(node.database_id, e.clone()); + tracer.exit_node( + frame_id, + Outcome::Failure { + error_preview: format!("{:#?}", e), + }, + ); + return (Signal::Failure(e), frame_id); + } + }; + + let remote_result = + block_on(remote.execute_remote(node.definition_source.clone(), request)); + let signal = match remote_result { + Ok(value) => Signal::Success(value), + Err(err) => Signal::Failure(err), + }; + + let final_signal = self.commit_result(&node, signal, ctx, tracer, frame_id); + return (final_signal, frame_id); + } + let entry = match self.functions.get(node.runtime_function_id.as_str()) { Some(e) => e, None => { @@ -137,6 +279,10 @@ impl<'a> Executor<'a> { (final_signal, frame_id) } + /// Build arguments for a node from literals, references, or thunks. + /// + /// Arguments are recorded to the tracer for debugging and inspection. + /// Thunks are represented by the referenced node id. fn build_args( &self, node: &NodeFunction, @@ -236,6 +382,10 @@ impl<'a> Executor<'a> { Ok(args) } + /// Eagerly execute any argument marked as `Eager`. + /// + /// Lazy arguments are preserved as thunks until needed by a handler. + /// If an eager child fails, the failure bubbles up immediately. fn force_eager_args( &self, _node: &NodeFunction, @@ -283,6 +433,10 @@ impl<'a> Executor<'a> { Ok(()) } + /// Invoke a local handler with a closure for lazy execution. + /// + /// The closure will evaluate a thunk node and link its trace to the + /// current execution frame. fn invoke_handler( &self, entry: &HandlerFunctionEntry, @@ -302,6 +456,7 @@ impl<'a> Executor<'a> { (entry.handler)(args, ctx, &mut run) } + /// Persist the final signal into the context and close the trace frame. fn commit_result( &self, node: &NodeFunction, @@ -363,6 +518,82 @@ impl<'a> Executor<'a> { } } } + + /// Resolve all arguments for a remote call. + /// + /// Remote execution requires concrete values, so any thunks are executed + /// eagerly and replaced with their resulting `Value`. + fn resolve_remote_args( + &self, + args: &mut [Argument], + ctx: &mut Context, + tracer: &mut dyn ExecutionTracer, + parent_frame: u64, + ) -> Result, (Signal, Outcome)> { + let mut values = Vec::with_capacity(args.len()); + + for (i, arg) in args.iter_mut().enumerate() { + match arg { + Argument::Eval(v) => values.push(v.clone()), + Argument::Thunk(id) => { + tracer.mark_thunk(parent_frame, i, true, true); + let (child_sig, child_root) = self.execute_call(*id, ctx, tracer); + if child_root != 0 { + tracer.link_child( + parent_frame, + child_root, + EdgeKind::EagerCall { arg_index: i }, + ); + } + + match child_sig { + Signal::Success(v) => { + *arg = Argument::Eval(v.clone()); + values.push(v); + } + s => { + return Err(( + s, + Outcome::Failure { + error_preview: "Eager child failed".into(), + }, + )); + } + } + } + } + } + + Ok(values) + } + + /// Build a remote execution request from a node and its resolved values. + /// + /// Values are mapped to their parameter ids for the remote runtime. + fn build_remote_request( + &self, + node: &NodeFunction, + values: Vec, + ) -> Result { + if node.parameters.len() != values.len() { + return Err(RuntimeError::simple_str( + "RemoteParameterMismatch", + "Remote parameter count mismatch", + )); + } + + let mut fields = HashMap::new(); + for (param, value) in node.parameters.iter().zip(values.into_iter()) { + fields.insert(param.runtime_parameter_id.clone(), value); + } + let id = Uuid::new_v4(); + Ok(ExecutionRequest { + execution_identifier: id.to_string(), + function_identifier: node.runtime_function_id.clone(), + parameters: Some(Struct { fields }), + project_id: 0, + }) + } } fn preview_value(value: &Value) -> String { @@ -371,16 +602,12 @@ fn preview_value(value: &Value) -> String { fn format_value_json(value: &Value) -> String { match value.kind.as_ref() { - Some(Kind::NumberValue(v)) => { - match v.number { - Some(kind) => { - match kind { - tucana::shared::number_value::Number::Integer(i) => i.to_string(), - tucana::shared::number_value::Number::Float(f) => f.to_string(), - } - } - _ => "null".to_string(), - } + Some(Kind::NumberValue(v)) => match v.number { + Some(kind) => match kind { + tucana::shared::number_value::Number::Integer(i) => i.to_string(), + tucana::shared::number_value::Number::Float(f) => f.to_string(), + }, + _ => "null".to_string(), }, Some(Kind::BoolValue(v)) => v.to_string(), Some(Kind::StringValue(v)) => format!("{:?}", v), diff --git a/crates/core/src/runtime/functions/http.rs b/crates/core/src/runtime/functions/http.rs index 4de951e..db413f4 100644 --- a/crates/core/src/runtime/functions/http.rs +++ b/crates/core/src/runtime/functions/http.rs @@ -126,10 +126,7 @@ fn create_response( )); } }; - fields.insert( - "status_code".to_string(), - value_from_i64(code), - ); + fields.insert("status_code".to_string(), value_from_i64(code)); fields.insert( "headers".to_string(), diff --git a/crates/core/src/runtime/functions/number.rs b/crates/core/src/runtime/functions/number.rs index 577915b..d85a7e1 100644 --- a/crates/core/src/runtime/functions/number.rs +++ b/crates/core/src/runtime/functions/number.rs @@ -77,14 +77,14 @@ fn has_digits( match value.number { Some(number) => match number { - number_value::Number::Integer(_) => return Signal::Success(false.to_value()), - number_value::Number::Float(_) => return Signal::Success(true.to_value()), + number_value::Number::Integer(_) => Signal::Success(false.to_value()), + number_value::Number::Float(_) => Signal::Success(true.to_value()), }, None => { - return Signal::Failure(RuntimeError::simple_str( + Signal::Failure(RuntimeError::simple_str( "InvlaidArgumentExeption", "Had NumberValue but no inner number value (was null)", - )); + )) } } } @@ -96,12 +96,12 @@ fn remove_digits( ) -> Signal { args!(args => value: NumberValue); match number_to_i64_lossy(&value) { - Some(number) => return Signal::Success(value_from_i64(number)), + Some(number) => Signal::Success(value_from_i64(number)), None => { - return Signal::Failure(RuntimeError::simple_str( + Signal::Failure(RuntimeError::simple_str( "InvlaidArgumentExeption", "Had NumberValue but no inner number value (was null)", - )); + )) } } } @@ -112,13 +112,10 @@ fn add( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); - match (lhs.number, rhs.number) { - (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) => { - if let Some(sum) = a.checked_add(b) { - return Signal::Success(value_from_i64(sum)); - } + if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = (lhs.number, rhs.number) { + if let Some(sum) = a.checked_add(b) { + return Signal::Success(value_from_i64(sum)); } - _ => {} } let lhs = match num_f64(&lhs) { Ok(v) => v, @@ -137,13 +134,10 @@ fn multiply( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); - match (lhs.number, rhs.number) { - (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) => { - if let Some(prod) = a.checked_mul(b) { - return Signal::Success(value_from_i64(prod)); - } + if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = (lhs.number, rhs.number) { + if let Some(prod) = a.checked_mul(b) { + return Signal::Success(value_from_i64(prod)); } - _ => {} } let lhs = match num_f64(&lhs) { Ok(v) => v, @@ -162,13 +156,10 @@ fn substract( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); - match (lhs.number, rhs.number) { - (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) => { - if let Some(diff) = a.checked_sub(b) { - return Signal::Success(value_from_i64(diff)); - } + if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = (lhs.number, rhs.number) { + if let Some(diff) = a.checked_sub(b) { + return Signal::Success(value_from_i64(diff)); } - _ => {} } let lhs = match num_f64(&lhs) { Ok(v) => v, @@ -200,13 +191,10 @@ fn divide( )); } - match (lhs.number, rhs.number) { - (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) => { - if b != 0 && a % b == 0 { - return Signal::Success(value_from_i64(a / b)); - } + if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = (lhs.number, rhs.number) { + if b != 0 && a % b == 0 { + return Signal::Success(value_from_i64(a / b)); } - _ => {} } let lhs_f = match num_f64(&lhs) { @@ -235,13 +223,10 @@ fn modulo( )); } - match (lhs.number, rhs.number) { - (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) => { - if b != 0 { - return Signal::Success(value_from_i64(a % b)); - } + if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = (lhs.number, rhs.number) { + if b != 0 { + return Signal::Success(value_from_i64(a % b)); } - _ => {} } let lhs_f = match num_f64(&lhs) { @@ -257,13 +242,10 @@ fn abs( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => value: NumberValue); - match value.number { - Some(number_value::Number::Integer(i)) => { - if let Some(abs) = i.checked_abs() { - return Signal::Success(value_from_i64(abs)); - } + if let Some(number_value::Number::Integer(i)) = value.number { + if let Some(abs) = i.checked_abs() { + return Signal::Success(value_from_i64(abs)); } - _ => {} } let value = match num_f64(&value) { Ok(v) => v, @@ -346,13 +328,10 @@ fn square( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => value: NumberValue); - match value.number { - Some(number_value::Number::Integer(i)) => { - if let Some(prod) = i.checked_mul(i) { - return Signal::Success(value_from_i64(prod)); - } + if let Some(number_value::Number::Integer(i)) = value.number { + if let Some(prod) = i.checked_mul(i) { + return Signal::Success(value_from_i64(prod)); } - _ => {} } let value = match num_f64(&value) { Ok(v) => v, @@ -371,11 +350,10 @@ fn exponential( (Some(number_value::Number::Integer(b)), Some(number_value::Number::Integer(e))) if e >= 0 => { - if let Ok(exp) = u32::try_from(e) { - if let Some(pow) = b.checked_pow(exp) { + if let Ok(exp) = u32::try_from(e) + && let Some(pow) = b.checked_pow(exp) { return Signal::Success(value_from_i64(pow)); } - } } _ => {} } @@ -589,11 +567,8 @@ fn min( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); - match (lhs.number, rhs.number) { - (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) => { - return Signal::Success(value_from_i64(a.min(b))); - } - _ => {} + if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = (lhs.number, rhs.number) { + return Signal::Success(value_from_i64(a.min(b))); } let lhs = match num_f64(&lhs) { Ok(v) => v, @@ -612,11 +587,8 @@ fn max( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => lhs: NumberValue, rhs: NumberValue); - match (lhs.number, rhs.number) { - (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) => { - return Signal::Success(value_from_i64(a.max(b))); - } - _ => {} + if let (Some(number_value::Number::Integer(a)), Some(number_value::Number::Integer(b))) = (lhs.number, rhs.number) { + return Signal::Success(value_from_i64(a.max(b))); } let lhs = match num_f64(&lhs) { Ok(v) => v, @@ -635,13 +607,10 @@ fn negate( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => value: NumberValue); - match value.number { - Some(number_value::Number::Integer(i)) => { - if let Some(neg) = i.checked_neg() { - return Signal::Success(value_from_i64(neg)); - } + if let Some(number_value::Number::Integer(i)) = value.number { + if let Some(neg) = i.checked_neg() { + return Signal::Success(value_from_i64(neg)); } - _ => {} } let value = match num_f64(&value) { Ok(v) => v, @@ -788,15 +757,12 @@ fn clamp( _run: &mut dyn FnMut(i64, &mut Context) -> Signal, ) -> Signal { args!(args => value: NumberValue, min: NumberValue, max: NumberValue); - match (value.number, min.number, max.number) { - ( + if let ( Some(number_value::Number::Integer(v)), Some(number_value::Number::Integer(min)), Some(number_value::Number::Integer(max)), - ) => { - return Signal::Success(value_from_i64(v.clamp(min, max))); - } - _ => {} + ) = (value.number, min.number, max.number) { + return Signal::Success(value_from_i64(v.clamp(min, max))); } let value = match num_f64(&value) { Ok(v) => v, @@ -837,7 +803,7 @@ mod tests { use crate::context::argument::Argument; use crate::context::context::Context; use crate::value::{number_to_f64, value_from_f64, value_from_i64}; - use tucana::shared::{number_value, Value, value::Kind}; + use tucana::shared::{Value, number_value, value::Kind}; // ---- helpers: Arguments ---- fn a_num(n: f64) -> Argument { diff --git a/crates/core/src/runtime/functions/text.rs b/crates/core/src/runtime/functions/text.rs index 8176fb9..6ad1f98 100644 --- a/crates/core/src/runtime/functions/text.rs +++ b/crates/core/src/runtime/functions/text.rs @@ -521,7 +521,7 @@ fn from_ascii( Value { kind: Some(Kind::NumberValue(n)), } => match number_to_f64(n) { - Some(n) if n >= 0.0 && n <= 127.0 => Some(n as u8 as char), + Some(n) if (0.0..=127.0).contains(&n) => Some(n as u8 as char), _ => None, }, _ => None, diff --git a/crates/core/src/runtime/mod.rs b/crates/core/src/runtime/mod.rs index 81bfbbf..7715a61 100644 --- a/crates/core/src/runtime/mod.rs +++ b/crates/core/src/runtime/mod.rs @@ -1,2 +1,3 @@ pub mod error; pub mod functions; +pub mod remote; diff --git a/crates/core/src/runtime/remote/mod.rs b/crates/core/src/runtime/remote/mod.rs new file mode 100644 index 0000000..b098a06 --- /dev/null +++ b/crates/core/src/runtime/remote/mod.rs @@ -0,0 +1,13 @@ +use async_trait::async_trait; +use tucana::{aquila::ExecutionRequest, shared::Value}; + +use crate::runtime::error::RuntimeError; + +#[async_trait] +pub trait RemoteRuntime { + async fn execute_remote( + &self, + remote_name: String, + request: ExecutionRequest, + ) -> Result; +} diff --git a/crates/core/src/value.rs b/crates/core/src/value.rs index 033896d..81ddf26 100644 --- a/crates/core/src/value.rs +++ b/crates/core/src/value.rs @@ -1,4 +1,4 @@ -use tucana::shared::{number_value, value::Kind, NumberValue, Value}; +use tucana::shared::{NumberValue, Value, number_value, value::Kind}; pub fn number_value_from_f64(n: f64) -> NumberValue { NumberValue { diff --git a/crates/manual/Cargo.toml b/crates/manual/Cargo.toml new file mode 100644 index 0000000..f4d78f2 --- /dev/null +++ b/crates/manual/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "manual" +version.workspace = true +edition.workspace = true + +[dependencies] +tests-core = { workspace = true } +tucana = { workspace = true } +taurus-core = { workspace = true } +log = { workspace = true } +env_logger = { workspace = true } +serde_json = { workspace = true } +serde = { workspace = true } +prost = { workspace = true } +tonic = { workspace = true } +tokio = { workspace = true } +async-nats = { workspace = true } +clap = { version = "4.6.0", features= ["derive"] } +async-trait = { workspace = true } diff --git a/crates/manual/src/main.rs b/crates/manual/src/main.rs new file mode 100644 index 0000000..a0b612f --- /dev/null +++ b/crates/manual/src/main.rs @@ -0,0 +1,161 @@ +use std::collections::HashMap; + +use async_nats::Client; +use async_trait::async_trait; +use clap::{Parser, arg, command}; +use prost::Message; +use taurus_core::context::{context::Context, executor::Executor, registry::FunctionStore}; +use taurus_core::runtime::{error::RuntimeError, remote::RemoteRuntime}; +use tests_core::Case; +use tucana::shared::helper::value::to_json_value; +use tucana::shared::{NodeFunction, helper::value::from_json_value}; +use tucana::{ + aquila::{ExecutionRequest, ExecutionResult}, + shared::Value, +}; + +pub struct RemoteNatsClient { + client: Client, +} + +impl RemoteNatsClient { + pub fn new(client: Client) -> Self { + RemoteNatsClient { client } + } +} + +#[async_trait] +impl RemoteRuntime for RemoteNatsClient { + async fn execute_remote( + &self, + remote_name: String, + request: ExecutionRequest, + ) -> Result { + let topic = format!("action.{}.{}", remote_name, request.execution_identifier); + let payload = request.encode_to_vec(); + let res = self.client.request(topic.clone(), payload.into()).await; + log::info!("Publishing to topic: {}", topic); + let message = match res { + Ok(r) => r, + Err(err) => { + return Err(RuntimeError::simple( + "RemoteRuntimeExeption", + format!("Failed to handle NATS message {:?}", err), + )); + } + }; + + let decode_result = ExecutionResult::decode(message.payload); + let execution_result = match decode_result { + Ok(r) => r, + Err(_) => { + return Err(RuntimeError::simple_str( + "RemoteRuntimeExeption", + "Failed to decode NATS message", + )); + } + }; + + match execution_result.result { + Some(result) => match result { + tucana::aquila::execution_result::Result::Success(value) => Ok(value), + tucana::aquila::execution_result::Result::Error(err) => { + let name = err.code.to_string(); + let description = match err.description { + Some(string) => string, + None => "Unknown Error".to_string(), + }; + let error = RuntimeError::new(name, description, None); + Err(error) + } + }, + None => Err(RuntimeError::simple_str( + "RemoteRuntimeExeption", + "Result of Remote Response was empty.", + )), + } + } +} + +#[derive(clap::Parser, Debug)] +#[command(author, version, about)] +struct Args { + /// Index value + #[arg(short, long, default_value_t = 0)] + index: i32, + + /// NATS server URL + #[arg(short, long, default_value_t = String::from("nats://127.0.0.1:4222"))] + nats_url: String, + + /// Path value + #[arg(short, long)] + path: String, +} + +#[tokio::main] +async fn main() { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Info) + .init(); + + let args = Args::parse(); + let index = args.index; + let nats_url = args.nats_url; + let path = args.path; + let case = Case::from_path(&path); + + let store = FunctionStore::default(); + + let node_functions: HashMap = case + .clone() + .flow + .node_functions + .into_iter() + .map(|node| (node.database_id, node)) + .collect(); + + let mut context = match case.inputs.get(index as usize) { + Some(inp) => match inp.input.clone() { + Some(json_input) => Context::new(from_json_value(json_input)), + None => Context::default(), + }, + None => Context::default(), + }; + + let client = match async_nats::connect(nats_url).await { + Ok(client) => { + log::info!("Connected to nats server"); + client + } + Err(err) => { + panic!("Failed to connect to NATS server: {}", err); + } + }; + let remote = RemoteNatsClient::new(client); + let result = Executor::new(&store, node_functions.clone()) + .with_remote_runtime(&remote) + .execute(case.flow.starting_node_id, &mut context, true); + + match result { + taurus_core::context::signal::Signal::Success(value) => { + let json = to_json_value(value); + let pretty = serde_json::to_string_pretty(&json).unwrap(); + println!("{}", pretty); + } + taurus_core::context::signal::Signal::Return(value) => { + let json = to_json_value(value); + let pretty = serde_json::to_string_pretty(&json).unwrap(); + println!("{}", pretty); + } + taurus_core::context::signal::Signal::Respond(value) => { + let json = to_json_value(value); + let pretty = serde_json::to_string_pretty(&json).unwrap(); + println!("{}", pretty); + } + taurus_core::context::signal::Signal::Stop => println!("Received Stop signal"), + taurus_core::context::signal::Signal::Failure(runtime_error) => { + println!("RuntimeError: {:?}", runtime_error); + } + } +} diff --git a/crates/taurus/src/client/runtime_usage.rs b/crates/taurus/src/client/runtime_usage.rs index a638c66..e358df3 100644 --- a/crates/taurus/src/client/runtime_usage.rs +++ b/crates/taurus/src/client/runtime_usage.rs @@ -36,4 +36,3 @@ impl TaurusRuntimeUsageService { } } } - diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 2d26e87..a27990f 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -1,9 +1,11 @@ mod client; mod config; +mod remote; use crate::client::runtime_status::TaurusRuntimeStatusService; use crate::client::runtime_usage::TaurusRuntimeUsageService; use crate::config::Config; +use crate::remote::RemoteNatsClient; use code0_flow::flow_service::FlowUpdateService; use code0_flow::flow_config::load_env_file; @@ -12,7 +14,7 @@ use futures_lite::StreamExt; use log::error; use prost::Message; use std::collections::HashMap; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::time::Instant; use taurus_core::context::context::Context; use taurus_core::context::executor::Executor; use taurus_core::context::registry::FunctionStore; @@ -24,7 +26,11 @@ use tucana::shared::{ ExecutionFlow, NodeFunction, RuntimeFeature, RuntimeUsage, Translation, Value, }; -fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, RuntimeUsage) { +fn handle_message( + flow: ExecutionFlow, + store: &FunctionStore, + nats_remote: &RemoteNatsClient, +) -> (Signal, RuntimeUsage) { let start = Instant::now(); let mut context = Context::default(); @@ -34,8 +40,9 @@ fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, Runtim .map(|node| (node.database_id, node)) .collect(); - let signal = - Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true); + let signal = Executor::new(store, node_functions) + .with_remote_runtime(nats_remote) + .execute(flow.starting_node_id, &mut context, true); let duration_millis = start.elapsed().as_millis() as i64; ( @@ -133,6 +140,7 @@ async fn main() { runtime_status_service = Some(status_service); } + let nats_client = RemoteNatsClient::new(client.clone()); let mut worker_task = tokio::spawn(async move { let mut sub = match client .queue_subscribe(String::from("execution.*"), "taurus".into()) @@ -162,7 +170,7 @@ async fn main() { }; let flow_id = flow.flow_id; - let result = handle_message(flow, &store); + let result = handle_message(flow, &store, &nats_client); let value = match result.0 { Signal::Failure(error) => { log::error!( diff --git a/crates/taurus/src/remote/mod.rs b/crates/taurus/src/remote/mod.rs new file mode 100644 index 0000000..a932be2 --- /dev/null +++ b/crates/taurus/src/remote/mod.rs @@ -0,0 +1,70 @@ +use async_nats::Client; +use prost::Message; +use taurus_core::runtime::{error::RuntimeError, remote::RemoteRuntime}; +use tonic::async_trait; +use tucana::{ + aquila::{ExecutionRequest, ExecutionResult}, + shared::Value, +}; + +pub struct RemoteNatsClient { + client: Client, +} + +impl RemoteNatsClient { + pub fn new(client: Client) -> Self { + RemoteNatsClient { client } + } +} + +#[async_trait] +impl RemoteRuntime for RemoteNatsClient { + async fn execute_remote( + &self, + remote_name: String, + request: ExecutionRequest, + ) -> Result { + let topic = format!("action.{}.{}", remote_name, request.execution_identifier); + let payload = request.encode_to_vec(); + let res = self.client.request(topic, payload.into()).await; + let message = match res { + Ok(r) => r, + Err(_) => { + return Err(RuntimeError::simple_str( + "RemoteRuntimeExeption", + "Failed to handle NATS message", + )); + } + }; + + let decode_result = ExecutionResult::decode(message.payload); + let execution_result = match decode_result { + Ok(r) => r, + Err(_) => { + return Err(RuntimeError::simple_str( + "RemoteRuntimeExeption", + "Failed to decode NATS message", + )); + } + }; + + match execution_result.result { + Some(result) => match result { + tucana::aquila::execution_result::Result::Success(value) => Ok(value), + tucana::aquila::execution_result::Result::Error(err) => { + let name = err.code.to_string(); + let description = match err.description { + Some(string) => string, + None => "Unknown Error".to_string(), + }; + let error = RuntimeError::new(name, description, None); + Err(error) + } + }, + None => Err(RuntimeError::simple_str( + "RemoteRuntimeExeption", + "Result of Remote Response was empty.", + )), + } + } +} diff --git a/crates/tests-core/Cargo.toml b/crates/tests-core/Cargo.toml new file mode 100644 index 0000000..37fa8de --- /dev/null +++ b/crates/tests-core/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "tests-core" +version.workspace = true +edition.workspace = true + +[dependencies] +tucana = { workspace = true } +log = { workspace = true } +serde_json = { workspace = true } +serde = { workspace = true } + diff --git a/crates/tests-core/src/lib.rs b/crates/tests-core/src/lib.rs new file mode 100644 index 0000000..c0109d4 --- /dev/null +++ b/crates/tests-core/src/lib.rs @@ -0,0 +1,107 @@ +use std::path::Path; + +use log::{error, info}; +use serde::Deserialize; +use tucana::shared::ValidationFlow; + +#[derive(Clone, Deserialize)] +pub struct Input { + pub input: Option, + pub expected_result: serde_json::Value, +} + +#[derive(Clone, Deserialize)] +pub struct Case { + pub name: String, + pub description: String, + pub inputs: Vec, + pub flow: ValidationFlow, +} + +pub enum CaseResult { + Success, + Failure(Input, serde_json::Value), +} + +pub trait Testable { + fn run(&self) -> CaseResult; +} + +#[derive(Clone, Deserialize)] +pub struct Cases { + pub cases: Vec, +} + +pub fn print_success(case: &Case) { + info!("test {} ... ok", case.name); +} + +pub fn print_failure(case: &Case, input: &Input, result: serde_json::Value) { + error!("test {} ... FAILED", case.name); + error!(" input: {:?}", input.input); + error!(" expected: {:?}", input.expected_result); + error!(" real_value: {:?}", result); + error!(" message: {}", case.description); +} + +fn get_test_case + std::fmt::Debug>(path: P) -> Option { + let content = match std::fs::read_to_string(&path) { + Ok(it) => it, + Err(err) => { + log::error!("Cannot read file ({:?}): {:?}", path, err); + return None; + } + }; + + match serde_json::from_str(&content) { + Ok(it) => it, + Err(err) => { + log::error!("Cannot read json ({:?}): {:?}", path, err); + None + } + } +} + +fn get_test_cases(path: &str) -> Cases { + let mut items = Vec::new(); + let dir = match std::fs::read_dir(path) { + Ok(d) => d, + Err(err) => { + panic!("Cannot open path: {:?}", err) + } + }; + + for entry in dir { + let entry = match entry { + Ok(it) => it, + Err(err) => { + log::error!("Cannot read entry: {:?}", err); + continue; + } + }; + let file_path = entry.path(); + items.push(match get_test_case(&file_path) { + Some(it) => it, + None => { + continue; + } + }); + } + + Cases { cases: items } +} + +impl Case { + pub fn from_path(path: &str) -> Self { + match get_test_case(path) { + Some(s) => s, + None => panic!("flow was not found"), + } + } +} + +impl Cases { + pub fn from_path(path: &str) -> Self { + get_test_cases(path) + } +} diff --git a/crates/tests/Cargo.toml b/crates/tests/Cargo.toml index 0e4f08a..8ac7443 100644 --- a/crates/tests/Cargo.toml +++ b/crates/tests/Cargo.toml @@ -9,4 +9,5 @@ taurus-core = { workspace = true } log = { workspace = true } env_logger = { workspace = true } serde_json = { workspace = true } +tests-core = { workspace = true } serde = { workspace = true } diff --git a/crates/tests/src/main.rs b/crates/tests/src/main.rs index 5eb1e37..29ae8c3 100644 --- a/crates/tests/src/main.rs +++ b/crates/tests/src/main.rs @@ -1,104 +1,27 @@ -use log::{error, info}; -use serde::Deserialize; use serde_json::json; use std::collections::HashMap; use taurus_core::context::{context::Context, executor::Executor, registry::FunctionStore}; +use tests_core::{Case, CaseResult, Cases, print_failure, print_success}; use tucana::shared::{ - NodeFunction, ValidationFlow, + NodeFunction, helper::value::{from_json_value, to_json_value}, }; -#[derive(Clone, Deserialize)] -struct Input { - input: Option, - expected_result: serde_json::Value, +pub trait Testable { + fn run(&self) -> CaseResult; } -#[derive(Clone, Deserialize)] -struct Case { - name: String, - description: String, - inputs: Vec, - flow: ValidationFlow, -} - -#[derive(Clone, Deserialize)] -struct TestCases { - cases: Vec, -} - -fn print_success(case: &Case) { - info!("test {} ... ok", case.name); -} - -fn print_failure(case: &Case, input: &Input, result: serde_json::Value) { - error!("test {} ... FAILED", case.name); - error!(" input: {:?}", input.input); - error!(" expected: {:?}", input.expected_result); - error!(" real_value: {:?}", result); - error!(" message: {}", case.description); -} - -fn get_test_cases(path: &str) -> TestCases { - let mut items = Vec::new(); - let dir = match std::fs::read_dir(path) { - Ok(d) => d, - Err(err) => { - panic!("Cannot open path: {:?}", err) +fn run_tests(cases: Cases) { + for case in &cases.cases { + match case.run() { + CaseResult::Success => print_success(case), + CaseResult::Failure(input, result) => print_failure(case, &input, result), } - }; - - for entry in dir { - let entry = match entry { - Ok(it) => it, - Err(err) => { - log::error!("Cannot read entry: {:?}", err); - continue; - } - }; - let path = entry.path(); - - let content = match std::fs::read_to_string(&path) { - Ok(it) => it, - Err(err) => { - log::error!("Cannot read file ({:?}): {:?}", path, err); - continue; - } - }; - items.push(match serde_json::from_str(&content) { - Ok(it) => it, - Err(err) => { - log::error!("Cannot read json ({:?}): {:?}", path, err); - continue; - } - }); - } - - TestCases { cases: items } -} - -impl TestCases { - pub fn from_path(path: &str) -> Self { - get_test_cases(path) } - - pub fn run_tests(&self) { - for case in self.cases.clone() { - match case.run() { - CaseResult::Success => print_success(&case), - CaseResult::Failure(input, result) => print_failure(&case, &input, result), - } - } - } -} - -enum CaseResult { - Success, - Failure(Input, serde_json::Value), } -impl Case { +impl Testable for Case { fn run(&self) -> CaseResult { let store = FunctionStore::default(); @@ -167,6 +90,6 @@ fn main() { .filter_level(log::LevelFilter::Info) .init(); - let cases = TestCases::from_path("./crates/tests/flows/"); - cases.run_tests(); + let cases = Cases::from_path("./flows/"); + run_tests(cases); } diff --git a/crates/tests/flows/01_return_object.json b/flows/01_return_object.json similarity index 95% rename from crates/tests/flows/01_return_object.json rename to flows/01_return_object.json index 34c28c8..cd5b3f3 100644 --- a/crates/tests/flows/01_return_object.json +++ b/flows/01_return_object.json @@ -17,6 +17,7 @@ "starting_node_id": "1", "node_functions": [ { + "definition_source": "taurus", "databaseId": "2", "runtimeFunctionId": "rest::control::respond", "parameters": [ @@ -33,6 +34,7 @@ }, { "databaseId": "1", + "definition_source": "taurus", "runtimeFunctionId": "http::response::create", "parameters": [ { diff --git a/crates/tests/flows/02_return_flow_input.json b/flows/02_return_flow_input.json similarity index 97% rename from crates/tests/flows/02_return_flow_input.json rename to flows/02_return_flow_input.json index ef8402d..9a44b8c 100644 --- a/crates/tests/flows/02_return_flow_input.json +++ b/flows/02_return_flow_input.json @@ -45,6 +45,7 @@ "startingNodeId": "3", "nodeFunctions": [ { + "definition_source": "taurus", "databaseId": "3", "runtimeFunctionId": "http::response::create", "parameters": [ diff --git a/crates/tests/flows/03_for_each.json b/flows/03_for_each.json similarity index 96% rename from crates/tests/flows/03_for_each.json rename to flows/03_for_each.json index 231d005..383688f 100644 --- a/crates/tests/flows/03_for_each.json +++ b/flows/03_for_each.json @@ -11,6 +11,7 @@ "startingNodeId": "5", "nodeFunctions": [ { + "definition_source": "taurus", "databaseId": "5", "runtimeFunctionId": "std::list::for_each", "parameters": [ @@ -67,6 +68,7 @@ }, { "databaseId": "6", + "definition_source": "taurus", "runtimeFunctionId": "std::number::add", "parameters": [ { diff --git a/flows/04_example_action.json b/flows/04_example_action.json new file mode 100644 index 0000000..c0b5c96 --- /dev/null +++ b/flows/04_example_action.json @@ -0,0 +1,39 @@ +{ + "name": "04_example_action", + "description": "This flow expects a simple http response object as the return value", + "inputs": [ + { + "input": null, + "expected_result": { + "status_code": 200, + "headers": { + "Header": "X" + }, + "payload": "Hello World" + } + } + ], + "flow": { + "starting_node_id": "1", + "node_functions": [ + { + "definition_source": "example", + "databaseId": "1", + "runtimeFunctionId": "fib", + "parameters": [ + { + "databaseId": "4", + "runtimeParameterId": "number", + "value": { + "literalValue": { + "numberValue": { + "integer": "10" + } + } + } + } + ] + } + ] + } +}