Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "prosa-hyper"
version = "0.3.0"
version = "0.3.1"
authors = ["Jérémy HERGAULT <jeremy.hergault@worldline.com>", "Anthony THOMAS <anthony.thomas@worldline.com>", "Julien TERUEL <julien.teruel@worldline.com>", "Rene-Louis EYMARD <rene-louis.eymard@worldline.com>"]
description = "ProSA Hyper processor for HTTP client/server"
homepage = "https://worldline.com/"
Expand Down Expand Up @@ -31,6 +31,9 @@ proc = "client::proc::HyperClientProc"
settings = "client::proc::HyperClientSettings"
adaptor = []

[lints.clippy]
unwrap_used = "deny"

[dependencies]
bytes = ">=1.11.1, < 2"
thiserror = "2"
Expand Down
19 changes: 11 additions & 8 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.get_matches();

// load the configuration
let config = Config::builder()
.add_source(config::File::with_name(
matches.get_one::<String>("config").unwrap().as_str(),
))
let mut config_builder = Config::builder();
if let Some(config_file) = matches
.get_one::<String>("config")
.map(|c| config::File::with_name(c.as_str()))
{
config_builder = config_builder.add_source(config_file);
}
let config = config_builder
.add_source(
config::Environment::with_prefix("PROSA")
.try_parsing(true)
.separator("_")
.list_separator(" "),
)
.build()
.unwrap();
.build()?;

let prosa_hyper_settings = config.try_deserialize::<MainHyperSettings>()?;
let service_name = prosa_hyper_settings.hyper_client.service_name.clone();
Expand All @@ -174,7 +177,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
bus.clone(),
prosa_hyper_settings.hyper_client,
);
Proc::<HyperDemoAdaptor>::run(http_proc);
Proc::<HyperDemoAdaptor>::run(http_proc)?;

if matches.contains_id("inj") && matches.get_flag("inj") {
debug!("Start a Inj processor");
Expand All @@ -185,7 +188,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
bus.clone(),
inj_settings,
);
Proc::<HyperDemoAdaptor>::run(inj_proc);
Proc::<HyperDemoAdaptor>::run(inj_proc)?;
}

// Wait on main task
Expand Down
12 changes: 3 additions & 9 deletions examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,18 @@ observability:
level: DEBUG
metrics:
#otlp: # Opentelemetry protocol
# endpoint: "http://localhost:4137"
# timeout_sec: 3
# protocol: Grpc
# endpoint: "grpc://localhost:4137"
prometheus:
endpoint: "0.0.0.0:9090"
stdout:
level: debug
traces:
#otlp: # Opentelemetry protocol
# endpoint: "http://localhost:4137"
# timeout_sec: 3
# protocol: Grpc
# endpoint: "grpc://localhost:4137"
stdout:
level: debug
logs:
#otlp: # Opentelemetry protocol
# endpoint: "http://localhost:4137"
# timeout_sec: 3
# protocol: Grpc
# endpoint: "grpc://localhost:4137"
stdout:
level: debug
19 changes: 11 additions & 8 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.get_matches();

// load the configuration
let config = Config::builder()
.add_source(config::File::with_name(
matches.get_one::<String>("config").unwrap().as_str(),
))
let mut config_builder = Config::builder();
if let Some(config_file) = matches
.get_one::<String>("config")
.map(|c| config::File::with_name(c.as_str()))
{
config_builder = config_builder.add_source(config_file);
}
let config = config_builder
.add_source(
config::Environment::with_prefix("PROSA")
.try_parsing(true)
.separator("_")
.list_separator(" "),
)
.build()
.unwrap();
.build()?;

let prosa_hyper_settings = config.try_deserialize::<MainHyperSettings>()?;

Expand All @@ -157,7 +160,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
bus.clone(),
prosa_hyper_settings.hyper_server,
);
Proc::<HyperDemoAdaptor>::run(http_proc);
Proc::<HyperDemoAdaptor>::run(http_proc)?;

if matches.contains_id("stub") && matches.get_flag("stub") {
debug!("Start a Stub processor");
Expand All @@ -168,7 +171,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
bus.clone(),
stub_settings,
);
Proc::<StubParotAdaptor>::run(stub_proc);
Proc::<StubParotAdaptor>::run(stub_proc)?;
}

// Wait on main task
Expand Down
126 changes: 89 additions & 37 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod tests {
use std::{
env,
fs::{self, File},
io::Read as _,
io::{self, Read as _},
time::Duration,
};
use tokio::time;
Expand Down Expand Up @@ -80,7 +80,11 @@ mod tests {
}
}

async fn run_test(settings: HttpTestSettings, certificate: Option<Certificate>, http2: bool) {
async fn run_test(
settings: HttpTestSettings,
certificate: Option<Certificate>,
http2: bool,
) -> io::Result<()> {
let url = settings.server.listener.url.clone();

// Create bus and main processor
Expand All @@ -96,7 +100,7 @@ mod tests {
bus.clone(),
settings.server,
);
Proc::<ServerTestAdaptor>::run(http_server_proc);
Proc::<ServerTestAdaptor>::run(http_server_proc)?;

// Wait for processor to start
std::thread::sleep(Duration::from_secs(1));
Expand All @@ -106,42 +110,51 @@ mod tests {
.timeout(Duration::from_secs(WAIT_TIME.as_secs()))
.use_rustls_tls();
if let Some(cert) = certificate {
client_builder = client_builder.add_root_certificate(cert);
client_builder = client_builder.tls_certs_only(vec![cert]);
}
if http2 {
client_builder = client_builder.http2_prior_knowledge();
}
let client = client_builder.build().unwrap();
let client = client_builder
.build()
.expect("reqwest client should be valid");
for _i in 0..20 {
let resp = client
.get(url.clone())
.send()
.await
.expect("Failed to send request");
assert_eq!(resp.status(), StatusCode::OK);
let server_header = resp.headers().get(hyper::header::SERVER).unwrap();
assert!(server_header.to_str().unwrap().starts_with(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
)));
assert!(resp.headers().get(hyper::header::SERVER).is_some_and(|h| {
h.to_str().is_ok_and(|s| {
s.starts_with(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
})
}));
}

bus.stop("ProSA HTTP client server unit test end".into())
.await
.unwrap();
.map_err(io::Error::other)?;

// Wait on main task to end
main_task.await;
Ok(())
}

#[tokio::test]
async fn http_client_server() {
let test_settings =
HttpTestSettings::new(Url::parse("http://localhost:48180").unwrap(), None, None);
let test_settings = HttpTestSettings::new(
Url::parse("http://localhost:48180").expect("HTTP client/server URL should be valid"),
None,
None,
);

// Run a ProSA to test
run_test(test_settings, None, false).await;
assert!(run_test(test_settings, None, false).await.is_ok());
}

#[tokio::test]
Expand All @@ -150,37 +163,57 @@ mod tests {
let prosa_temp_dir = env::temp_dir().join(PROSA_HTTPS_TEST_DIR_NAME);

let _ = fs::remove_dir_all(&prosa_temp_dir);
fs::create_dir_all(&prosa_temp_dir).unwrap();
fs::create_dir_all(&prosa_temp_dir)
.expect("Can't create ProSA temporary directory for HTTPS");

let key_path = prosa_temp_dir.join("prosa_server_https.key");
let cert_path = prosa_temp_dir.join("prosa_server_https.pem");
let cert_path_str = cert_path
.as_os_str()
.to_str()
.expect("Cert path should be a valid String");
let server_ssl_config = HttpTestSettings::create_server_cert(
key_path.as_os_str().to_str().unwrap().into(),
cert_path.as_os_str().to_str().unwrap().into(),
key_path
.as_os_str()
.to_str()
.expect("Key path should be a valid String")
.into(),
cert_path_str.into(),
)
.unwrap();
.expect("Server certificate should be created");

let mut buf = Vec::new();
File::open(cert_path.as_os_str().to_str().unwrap())
.unwrap()
File::open(cert_path_str)
.expect("Cert file should exist")
.read_to_end(&mut buf)
.unwrap();
let client_cert = reqwest::Certificate::from_pem(&buf).unwrap();
.expect("Cert file should be read");
let client_cert =
reqwest::Certificate::from_pem(&buf).expect("Certificate should be valid for reqwest");

let client_ssl_store = Store::File {
path: format!("{}/", prosa_temp_dir.as_os_str().to_str().unwrap()),
path: format!(
"{}/",
prosa_temp_dir
.as_os_str()
.to_str()
.expect("ProSA temp dir should be a valid String")
),
};
let mut client_ssl_config = SslConfig::default();
client_ssl_config.set_store(client_ssl_store);

let test_settings = HttpTestSettings::new(
Url::parse("https://localhost:48543").unwrap(),
Url::parse("https://localhost:48543").expect("HTTPS client/server URL should be valid"),
Some(server_ssl_config),
Some(client_ssl_config),
);

// Run a ProSA to test
run_test(test_settings, Some(client_cert), false).await;
assert!(
run_test(test_settings, Some(client_cert), false)
.await
.is_ok()
);
}

#[tokio::test]
Expand All @@ -189,39 +222,58 @@ mod tests {
let prosa_temp_dir = env::temp_dir().join(PROSA_H2_TEST_DIR_NAME);

let _ = fs::remove_dir_all(&prosa_temp_dir);
fs::create_dir_all(&prosa_temp_dir).unwrap();
fs::create_dir_all(&prosa_temp_dir).expect("Can't create ProSA temporary directory for H2");

let key_path = prosa_temp_dir.join("prosa_server_h2.key");
let cert_path = prosa_temp_dir.join("prosa_server_h2.pem");
let cert_path_str = cert_path
.as_os_str()
.to_str()
.expect("Cert path should be a valid String");
let mut server_ssl_config = HttpTestSettings::create_server_cert(
key_path.as_os_str().to_str().unwrap().into(),
cert_path.as_os_str().to_str().unwrap().into(),
key_path
.as_os_str()
.to_str()
.expect("Key path should be a valid String")
.into(),
cert_path_str.into(),
)
.unwrap();
.expect("Server certificate should be created");
// Need to set the ALPN for server because of inline configuration @see TargetSetting::new
server_ssl_config.set_alpn(vec!["h2".into()]);

let mut buf = Vec::new();
File::open(cert_path.as_os_str().to_str().unwrap())
.unwrap()
File::open(cert_path_str)
.expect("Cert file should exist")
.read_to_end(&mut buf)
.unwrap();
let client_cert = reqwest::Certificate::from_pem(&buf).unwrap();
.expect("Cert file should be read");
let client_cert =
reqwest::Certificate::from_pem(&buf).expect("Certificate should be valid for reqwest");

let client_ssl_store = Store::File {
path: format!("{}/", prosa_temp_dir.as_os_str().to_str().unwrap()),
path: format!(
"{}/",
prosa_temp_dir
.as_os_str()
.to_str()
.expect("ProSA temp dir should be a valid String")
),
};
let mut client_ssl_config = SslConfig::default();
client_ssl_config.set_store(client_ssl_store);
client_ssl_config.set_alpn(vec!["h2".into()]);

let test_settings = HttpTestSettings::new(
Url::parse("https://localhost:49543").unwrap(),
Url::parse("https://localhost:49543").expect("HTTP2 client/server URL should be valid"),
Some(server_ssl_config),
Some(client_ssl_config),
);

// Run a ProSA to test
run_test(test_settings, Some(client_cert), true).await;
assert!(
run_test(test_settings, Some(client_cert), true)
.await
.is_ok()
);
}
}
6 changes: 4 additions & 2 deletions src/server/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ pub struct HyperServerSettings {

impl HyperServerSettings {
fn default_listener() -> ListenerSetting {
let mut url = Url::parse("http://0.0.0.0:8080").unwrap();
let mut url =
Url::parse("http://0.0.0.0:8080").expect("Default Hyper server URL should be valid");
if let Ok(Ok(port)) = env::var("PORT").map(|p| p.parse::<u16>()) {
url.set_port(Some(port)).unwrap();
url.set_port(Some(port))
.expect("Default Hyper server URL should be base");
}

ListenerSetting::new(url, None)
Expand Down
Loading
Loading