This guide walks you through creating custom plugins for Rusty Beam, from basic concepts to advanced techniques.
Prerequisites: Rust 1.70+, Cargo, and familiarity with async Rust
cargo new --lib my-plugin
cd my-plugin
[package]
name = "my-plugin"
version = "0.1.0"
edition = "2021"
[lib]
crate-type = ["cdylib"]
name = "rusty_beam_my_plugin"
[dependencies]
async-trait = "0.1"
hyper = { version = "1.0", features = ["full"] }
tokio = { version = "1", features = ["full"] }
rusty-beam = { path = "../.." } # Adjust path as needed
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use rusty_beam::{Plugin, PluginResponse};
use std::sync::Arc;
pub struct MyPlugin;
#[async_trait]
impl Plugin for MyPlugin {
fn name(&self) -> &str {
"my-plugin"
}
async fn handle_request(
&self,
req: Request<Body>,
_app_state: Arc<rusty_beam::AppState>,
) -> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
println!("MyPlugin: Processing request to {}", req.uri());
Ok(PluginResponse::Continue(req))
}
}
#[no_mangle]
pub extern "C" fn create_plugin() -> *mut dyn Plugin {
Box::into_raw(Box::new(MyPlugin))
}
cargo build --release
cp target/release/librusty_beam_my_plugin.so ../plugins/
All plugins must implement the Plugin
trait:
#[async_trait]
pub trait Plugin: Send + Sync {
/// Returns the plugin name for logging and identification
fn name(&self) -> &str;
/// Handles incoming requests
async fn handle_request(
&self,
req: Request<Body>,
app_state: Arc<AppState>,
) -> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>>;
/// Optionally modifies responses from later plugins
async fn handle_response(
&self,
response: Response<Body>,
app_state: Arc<AppState>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
Ok(response)
}
}
pub enum PluginResponse {
/// Continue to next plugin with modified request
Continue(Request<Body>),
/// Stop pipeline and return response
Done(Response<Body>),
}
my-plugin/
├── Cargo.toml
├── src/
│ ├── lib.rs # Plugin implementation
│ ├── config.rs # Configuration parsing
│ └── handlers.rs # Request handlers
├── tests/
│ └── integration.rs # Plugin tests
└── README.md
[lib]
# Must be cdylib for dynamic loading
crate-type = ["cdylib"]
# Name must start with "rusty_beam_"
name = "rusty_beam_my_plugin"
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use rusty_beam::{Plugin, PluginResponse};
use std::sync::Arc;
use std::time::Instant;
pub struct LoggingPlugin;
#[async_trait]
impl Plugin for LoggingPlugin {
fn name(&self) -> &str {
"logging"
}
async fn handle_request(
&self,
mut req: Request<Body>,
_app_state: Arc<rusty_beam::AppState>,
) -> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
let start = Instant::now();
// Add timing header
req.headers_mut().insert(
"X-Request-Start",
start.elapsed().as_micros().to_string().parse()?,
);
println!("[{}] {} {}",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
req.method(),
req.uri()
);
Ok(PluginResponse::Continue(req))
}
async fn handle_response(
&self,
response: Response<Body>,
_app_state: Arc<rusty_beam::AppState>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
println!(" └─ Response: {}", response.status());
Ok(response)
}
}
async fn handle_request(&self, req: Request<Body>, _: Arc<AppState>)
-> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
// Get HTTP method
let method = req.method();
// Get URI and path
let uri = req.uri();
let path = uri.path();
// Get headers
let headers = req.headers();
let content_type = headers.get("content-type")
.and_then(|v| v.to_str().ok());
// Get query parameters
let query = uri.query().unwrap_or("");
// Read body (consumes the request)
let (parts, body) = req.into_parts();
let body_bytes = hyper::body::to_bytes(body).await?;
let body_str = String::from_utf8_lossy(&body_bytes);
// Reconstruct request
let req = Request::from_parts(parts, Body::from(body_bytes));
Ok(PluginResponse::Continue(req))
}
async fn handle_request(&self, mut req: Request<Body>, _: Arc<AppState>)
-> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
// Add custom header
req.headers_mut().insert("X-Plugin", "my-plugin".parse()?);
// Modify URI
if req.uri().path() == "/old-path" {
*req.uri_mut() = "/new-path".parse()?;
}
// Change method
if some_condition {
*req.method_mut() = hyper::Method::GET;
}
Ok(PluginResponse::Continue(req))
}
async fn handle_request(&self, req: Request<Body>, _: Arc<AppState>)
-> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
// Check if we should handle this request
if req.uri().path() == "/my-endpoint" {
let response = Response::builder()
.status(200)
.header("Content-Type", "text/html")
.body(Body::from("<h1>Hello from my plugin!</h1>"))?;
return Ok(PluginResponse::Done(response));
}
// Otherwise, continue to next plugin
Ok(PluginResponse::Continue(req))
}
async fn handle_response(
&self,
mut response: Response<Body>,
_app_state: Arc<AppState>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
// Add security headers
let headers = response.headers_mut();
headers.insert("X-Frame-Options", "DENY".parse()?);
headers.insert("X-Content-Type-Options", "nosniff".parse()?);
// Modify response body
if response.headers().get("content-type")
.and_then(|v| v.to_str().ok())
.map(|ct| ct.contains("text/html"))
.unwrap_or(false)
{
let (parts, body) = response.into_parts();
let body_bytes = hyper::body::to_bytes(body).await?;
let mut body_str = String::from_utf8_lossy(&body_bytes).to_string();
// Add tracking script
body_str = body_str.replace(
"</body>",
"<script>console.log('Plugin active');</script></body>"
);
response = Response::from_parts(parts, Body::from(body_str));
}
Ok(response)
}
use std::sync::RwLock;
use std::collections::HashMap;
pub struct StatefulPlugin {
state: RwLock<HashMap<String, String>>,
}
impl StatefulPlugin {
pub fn new() -> Self {
Self {
state: RwLock::new(HashMap::new()),
}
}
}
#[async_trait]
impl Plugin for StatefulPlugin {
fn name(&self) -> &str {
"stateful"
}
async fn handle_request(
&self,
req: Request<Body>,
_app_state: Arc<rusty_beam::AppState>,
) -> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
// Read state
let state = self.state.read().unwrap();
if let Some(value) = state.get("key") {
println!("Found value: {}", value);
}
drop(state);
// Write state
let mut state = self.state.write().unwrap();
state.insert("key".to_string(), "value".to_string());
Ok(PluginResponse::Continue(req))
}
}
async fn handle_request(
&self,
req: Request<Body>,
app_state: Arc<rusty_beam::AppState>,
) -> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
// Access shared plugin state
let plugin_state = app_state.plugin_state.read().await;
if let Some(my_state) = plugin_state.get("my-plugin") {
// Downcast to your state type
if let Some(state) = my_state.downcast_ref::<MyPluginState>() {
// Use state...
}
}
Ok(PluginResponse::Continue(req))
}
#[cfg(test)]
mod tests {
use super::*;
use hyper::{Body, Request};
#[tokio::test]
async fn test_plugin_name() {
let plugin = MyPlugin;
assert_eq!(plugin.name(), "my-plugin");
}
#[tokio::test]
async fn test_request_handling() {
let plugin = MyPlugin;
let app_state = Arc::new(rusty_beam::AppState::default());
let req = Request::builder()
.uri("/test")
.body(Body::empty())
.unwrap();
let result = plugin.handle_request(req, app_state).await.unwrap();
match result {
PluginResponse::Continue(_) => {
// Expected behavior
}
PluginResponse::Done(_) => {
panic!("Expected Continue, got Done");
}
}
}
}
// tests/integration.rs
use std::process::Command;
use std::thread;
use std::time::Duration;
#[test]
fn test_plugin_integration() {
// Build the plugin
Command::new("cargo")
.args(&["build", "--release"])
.status()
.expect("Failed to build plugin");
// Copy to test location
std::fs::copy(
"target/release/librusty_beam_my_plugin.so",
"../test-plugins/librusty_beam_my_plugin.so"
).expect("Failed to copy plugin");
// Start test server with plugin
let mut server = Command::new("../target/release/rusty-beam")
.arg("test-config.html")
.spawn()
.expect("Failed to start server");
// Wait for server to start
thread::sleep(Duration::from_secs(1));
// Test plugin functionality
let response = reqwest::blocking::get("http://localhost:3000/test")
.expect("Failed to make request");
assert_eq!(response.status(), 200);
// Cleanup
server.kill().expect("Failed to kill server");
}
Important: Always use proper error handling. Panics in plugins can crash the entire server!
async fn handle_request(
&self,
req: Request<Body>,
_app_state: Arc<rusty_beam::AppState>,
) -> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
// Don't panic!
let header_value = match req.headers().get("x-custom") {
Some(value) => value,
None => return Ok(PluginResponse::Continue(req)),
};
// Use ? operator for error propagation
let parsed: i32 = header_value.to_str()?.parse()?;
// Handle errors gracefully
if let Err(e) = some_operation() {
eprintln!("Plugin error: {}", e);
// Return error response instead of propagating
let response = Response::builder()
.status(500)
.body(Body::from("Internal plugin error"))?;
return Ok(PluginResponse::Done(response));
}
Ok(PluginResponse::Continue(req))
}
use std::collections::HashMap;
use std::sync::RwLock;
use std::time::{Duration, Instant};
pub struct RateLimitPlugin {
limits: RwLock<HashMap<String, (u32, Instant)>>,
max_requests: u32,
window: Duration,
}
impl RateLimitPlugin {
pub fn new(max_requests: u32, window_seconds: u64) -> Self {
Self {
limits: RwLock::new(HashMap::new()),
max_requests,
window: Duration::from_secs(window_seconds),
}
}
}
#[async_trait]
impl Plugin for RateLimitPlugin {
fn name(&self) -> &str {
"rate-limit"
}
async fn handle_request(
&self,
req: Request<Body>,
_app_state: Arc<rusty_beam::AppState>,
) -> Result<PluginResponse, Box<dyn std::error::Error + Send + Sync>> {
let client_ip = req.headers()
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown");
let mut limits = self.limits.write().unwrap();
let now = Instant::now();
let (count, window_start) = limits.entry(client_ip.to_string())
.or_insert((0, now));
// Reset window if expired
if now.duration_since(*window_start) > self.window {
*count = 0;
*window_start = now;
}
// Check limit
if *count >= self.max_requests {
let response = Response::builder()
.status(429)
.header("X-RateLimit-Limit", self.max_requests.to_string())
.header("X-RateLimit-Remaining", "0")
.body(Body::from("Rate limit exceeded"))?;
return Ok(PluginResponse::Done(response));
}
// Increment counter
*count += 1;
Ok(PluginResponse::Continue(req))
}
}
pub struct HeaderInjectorPlugin {
headers: Vec<(String, String)>,
}
impl HeaderInjectorPlugin {
pub fn new(headers: Vec<(String, String)>) -> Self {
Self { headers }
}
}
#[async_trait]
impl Plugin for HeaderInjectorPlugin {
fn name(&self) -> &str {
"header-injector"
}
async fn handle_response(
&self,
mut response: Response<Body>,
_app_state: Arc<rusty_beam::AppState>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
let headers = response.headers_mut();
for (name, value) in &self.headers {
headers.insert(
hyper::header::HeaderName::from_bytes(name.as_bytes())?,
value.parse()?,
);
}
Ok(response)
}
}
You now have all the knowledge needed to create powerful Rusty Beam plugins. Start with the examples above and refer to existing plugins in the plugins/
directory for more advanced patterns.