actix

初识Actix

让我们从官网的quick start中认识actix

use std::io::Result;
 
use actix_web::{
    get, post,
    web::{self},
    App, HttpResponse, HttpServer, Responder,
};
 
#[get("/")]
async fn hello() -> impl Responder {
    HttpResponse::Ok().body("Hello world!")
}
 
#[post("/echo")]
async fn echo(req_body: String) -> impl Responder {
    HttpResponse::Ok().body(req_body)
}
 
async fn manual_hello() -> impl Responder {
    HttpResponse::Ok().body("Hey there!")
}
 
#[actix_web::main]
async fn main() -> Result<()> {
    println!("Server running at http://127.0.0.1:8080");
    HttpServer::new(|| {
        App::new()
            .service(hello)
            .service(echo)
            .route("/hey", web::get().to(manual_hello))
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

这里有几个需要注意的地方:

1、每一个request handler都是一个async fn(这点和axum保持一致的,果然优秀的框架优秀的共性总是一样的),他们接受零个或者多个参数,这些参数能够从request中被提取出来(FromRequest特征),并且能够被转换成HttpResponseResponder特征)。

我们会在后面详细介绍这两个trait的作用,现在先在下面给出他们的trait签名:

  • FromRequest
#[doc(alias = "extract", alias = "extractor")]
pub trait FromRequest: Sized {
    /// The associated error which can be returned.
    type Error: Into<Error>;
 
    /// Future that resolves to a `Self`.
    ///
    /// To use an async function or block, the futures must be boxed. The following snippet will be
    /// common when creating async/await extractors (that do not consume the body).
    ///
    /// ```ignore
    /// type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
    /// // or
    /// type Future = futures_util::future::LocalBoxFuture<'static, Result<Self, Self::Error>>;
    ///
    /// fn from_request(req: HttpRequest, ...) -> Self::Future {
    ///     let req = req.clone();
    ///     Box::pin(async move {
    ///         ...
    ///     })
    /// }
    /// ```
    type Future: Future<Output = Result<Self, Self::Error>>;
 
    /// Create a `Self` from request parts asynchronously.
    fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future;
 
    /// Create a `Self` from request head asynchronously.
    ///
    /// This method is short for `T::from_request(req, &mut Payload::None)`.
    fn extract(req: &HttpRequest) -> Self::Future {
        Self::from_request(req, &mut Payload::None)
    }
}
  • Responder
pub trait Responder {
    type Body: MessageBody + 'static;
 
    /// Convert self to `HttpResponse`.
    fn respond_to(self, req: &HttpRequest) -> HttpResponse<Self::Body>;
 
    /// Wraps responder to allow alteration of its response.
    ///
    /// See [`CustomizeResponder`] docs for more details on its capabilities.
    ///
    /// # Examples
    /// ```
    /// use actix_web::{Responder, http::StatusCode, test::TestRequest};
    ///
    /// let responder = "Hello world!"
    ///     .customize()
    ///     .with_status(StatusCode::BAD_REQUEST)
    ///     .insert_header(("x-hello", "world"));
    ///
    /// let request = TestRequest::default().to_http_request();
    /// let response = responder.respond_to(&request);
    /// assert_eq!(response.status(), StatusCode::BAD_REQUEST);
    /// assert_eq!(response.headers().get("x-hello").unwrap(), "world");
    /// ```
    #[inline]
    fn customize(self) -> CustomizeResponder<Self>
    where
        Self: Sized,
    {
        CustomizeResponder::new(self)
    }
 
    #[doc(hidden)]
    #[deprecated(since = "4.0.0", note = "Prefer `.customize().with_status(header)`.")]
    fn with_status(self, status: StatusCode) -> CustomizeResponder<Self>
    where
        Self: Sized,
    {
        self.customize().with_status(status)
    }
 
    #[doc(hidden)]
    #[deprecated(since = "4.0.0", note = "Prefer `.customize().insert_header(header)`.")]
    fn with_header(self, header: impl TryIntoHeaderPair) -> CustomizeResponder<Self>
    where
        Self: Sized,
    {
        self.customize().insert_header(header)
    }
}
 

2、对于request handler,actix提供了两种不同的方式来定义他们:

  • 声明式的写法:

    #[post("/echo")]
    async fn req_handler(){
      //do something...
    }
  • 函数式的写法:

     App::new().route("/hey", web::get().to(manual_hello))

3、main函数的返回值:

一开始看到官网给的demo的时候,我很好奇为什么main函数的返回值会是std::io::Result<()>这个类型,于是我从HttpServer.run()函数一路点下去:

  • run函数的定义:
pub fn run(self) -> Server {
        self.builder.run()
    }

可以看到这个函数的返回值是一个Server结构体,于是继续查看Server的定义:

#[must_use = "Server does nothing unless you `.await` or poll it"]
pub struct Server {
    handle: ServerHandle,
    fut: BoxFuture<'static, io::Result<()>>,
}

于是当我们await 的时候,会拿到这个Future中的值,这个值的类型就是io::Result<()>

应用

actix-web提供了很多工具供我们去写web应用程序,例如:middleware,请求的预处理,相应的预处理等等。

所有的actix-web服务都会围绕着App实例进行构建(如quick start中的代码,route,service都是挂载到app对象上的)。除了route和middleware,我们还可以使用app实例

来在相同的作用域中跨handler进行状态共享。

一个应用程序的scope指的是所有路由的公共名字空间:例如:

For an application with scope /app, any request with the paths /app, /app/, or /app/test would match; however, the path /application would not match.

如下面的代码所示:

use actix_web::{web, App, HttpServer, Responder};
 
async fn index() -> impl Responder {
    "Hello world!"
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new().service(
            // prefixes all resources and routes attached to it...
            web::scope("/app")
                // ...so this handles requests for `GET /app/index.html`
                .route("/index.html", web::get().to(index)),
        )
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

我们可以使用上面的方式来对路由进行分组定义。

共享状态

我们将在同一个scope中进行不同handler之间的状态共享:

状态能够通过web::Data这个extractor去获取,T是状态的类型,状态也能够在middleware中共享:

下面的程序会在状态中存储应用的名称并进行传递:

use std::io::Result;
 
use actix_web::{
    get,
    web::{self},
    App, HttpServer,
};
 
#[get("/")]
async fn index(data: web::Data<AppState>) -> String {
    let app_name = &data.app_name;
    format!("Hello {}!", app_name)
}
 
struct AppState {
    app_name: String,
}
 
#[actix_web::main]
async fn main() -> Result<()> {
    println!("Server running at http://127.0.0.1:8080");
    HttpServer::new(|| {
        App::new()
            .app_data(web::Data::new(AppState {
                app_name: String::from("Actix Web"),
            }))
            .service(index)
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

共享可变状态

一个HttpServer为每一个线程构造了一个应用实例,因此,application data必须构造多次,因此如果你想在不同的线程中共享数据的活,你需要使用一个可共享的对象,例如

必须要实现Send + Sync这两个trait.

由于web::Data在内部使用Arc,因此我们应该在创建我们的数据之前把它放到web::Data 中去。

#[get("/add")]
async fn add(data: web::Data<AppStateWithCounter>) -> String {
    let mut counter = data.counter.lock().unwrap();
    *counter += 1;
    format!("Counter: {}", counter)
}
 
 
#[actix_web::main]
async fn main() -> Result<()> {
    println!("Server running at http://127.0.0.1:8080");
    let counter = web::Data::new(AppStateWithCounter {
        counter: Mutex::new(0),
    });
    HttpServer::new(move || {
        App::new()
            .app_data(counter.clone())
            .service(add)
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

要点:

  • 在 Actix-Web 中,HttpServer::new 方法接受一个闭包作为参数,这个闭包用于配置和注册应用程序的路由、中间件和其他设置。然而,如果你在这个闭包内部初始化了应用程序状态(例如使用 Data::new 或其他状态管理方式),那么这个状态将是线程本地的,也就是说它只存在于该工作线程中。

    这可能会导致一个问题:如果这个状态在不同的工作线程中被修改,它们之间就会发生状态不同步的情况。因为每个工作线程都有自己的状态副本,当一个线程修改了状态后,其他线程并不知道状态已经被修改,从而导致不一致的行为。

    为了避免这个问题,Actix-Web 建议在应用程序启动时初始化状态,而不是在 HttpServer::new 闭包内部初始化。这样可以确保状态是全局共享的,所有工作线程都可以访问并正确修改同一份状态。

应用守卫和虚拟主机

守卫是一个简单的函数,它接受request的参数,并且返回true或者false。正规来讲,guard 是一个实现了Guard 特征的对象。actix提供了多种guard。

一种提供的guard是Host,他可以基于请求头的信息进行过滤:(针对主机名进行过滤)

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .service(
                web::scope("/")
                    .guard(guard::Host("www.rust-lang.org"))
                    .route("", web::to(|| async { HttpResponse::Ok().body("www") })),
            )
            .service(
                web::scope("/")
                    .guard(guard::Host("users.rust-lang.org"))
                    .route("", web::to(|| async { HttpResponse::Ok().body("user") })),
            )
            .route("/", web::to(HttpResponse::Ok))
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

配置

为了简化和易用性,Appweb::Scope提供了config这个方法,这个方法对于移动部分配置到不同的模块甚至lib上非常有用,例如,一些资源配置能够被移动到另外的模块:

use actix_web::{web, App, HttpResponse, HttpServer};
 
// this function could be located in a different module
fn scoped_config(cfg: &mut web::ServiceConfig) {
    cfg.service(
        web::resource("/test")
            .route(web::get().to(|| async { HttpResponse::Ok().body("test") }))
            .route(web::head().to(HttpResponse::MethodNotAllowed)),
    );
}
 
// this function could be located in a different module
fn config(cfg: &mut web::ServiceConfig) {
    cfg.service(
        web::resource("/app")
            .route(web::get().to(|| async { HttpResponse::Ok().body("app") }))
            .route(web::head().to(HttpResponse::MethodNotAllowed)),
    );
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .configure(config)
            .service(web::scope("/api").configure(scoped_config))
            .route(
                "/",
                web::get().to(|| async { HttpResponse::Ok().body("/") }),
            )
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

我们可以将一类资源抽象成配置,我们很容易在不同的地方去复用这些配置。

上面的代码的结果为:

/         -> "/"
/app      -> "app"
/api/test -> "test"

Server

HttpServer类型是用来处理Http请求的。

HttpServer接受一个application factory作为参数,并且这个application factory参数必须实现Send + Sync特征。

为了启动这个服务器,他必须被绑定到一个网络套接字上,使用HttpServer.bind方法进行绑定。如果端口被占用,这个方法会失败。

bind函数绑定成功,使用HttpServer.run()方法返回一个Server实例,这个Server的示例必须await或者spawn以便能够开始处理请求,服务会一直运行直到进程收到了shutdown signal.

多线程

HttpServer自动启动了大量的http worker,默认数量等于系统的cpu的数量。这个数量可以通过HttpServer.workers()方法进行修改。

use actix_web::{web, App, HttpResponse, HttpServer};
 
#[actix_web::main]
async fn main() {
    HttpServer::new(|| App::new().route("/", web::get().to(HttpResponse::Ok))).workers(4);
    // <- Start 4 workers
}

值得注意的是,应用的状态并不在这些线程中共享,并且handler可以自由的操控本地的拷贝而来的状态,不用担心并发问题。

因为每个worker线程分开单独处理自己的请求,那些会阻塞当前线程的handler会导致当前的worker停止处理新的请求:

fn my_handler() -> impl Responder {
    std::thread::sleep(Duration::from_secs(5)); // <-- Bad practice! Will cause the current worker thread to hang!
    "response"
}

出于这个原因,一个长时间的,不消耗cpu资源的操作(io,database)必须是一个future或者是一个异步的函数。这不会造成线程的阻塞:

async fn my_handler() -> impl Responder {
    tokio::time::sleep(Duration::from_secs(5)).await; // <-- Ok. Worker thread will handle other requests here
    "response"
}

tls / https

actix-web天然支持两种类型的tls:rustls和openssl

这节内容不是很重要,参考:https://actix.rs/docs/server#tls—https

keep-alive

参考:https://actix.rs/docs/server#keep-alive

Extractor

Actix-web提供了一种类型安全的请求信息工具,它被称为extractor。这个功能基于FromRequest特征实现。

具体来说,extractors 允许你定义一个类型,该类型实现了 FromRequest trait。该 trait 要求实现一个 from_request 方法,该方法从传入的 HttpRequest 和应用程序状态中提取所需的信息,并返回一个 Result,其中包含提取的数据或一个错误。

让我们来看一看actix提供的一系列built-in的extractor吧!

Path

类似于Java的PathVariable,用于提取url上的路径参数:

http://xxx.com/1/2

use std::io::Result;
 
use actix_web::{get, web::Path, App, HttpServer};
 
#[get("/")]
async fn index() -> &'static str {
    "Hello, world!"
}
 
#[get("/hello/{name}/{age}")]
async fn get_path_info(info: Path<(String, u32)>) -> Result<String> {
    let (name, age) = info.into_inner();
    Ok(format!("Hello, {}! You are {} years old.", name, age))
}
 
#[actix_web::main]
async fn main() -> Result<()> {
    println!("Server running on http://127.0.0.1:8888");
    HttpServer::new(|| App::new().service(index).service(get_path_info))
        .bind(("127.0.0.1", 8888))?
        .run()
        .await
}
 

我们也可以把这个元祖中的参数封装到一个结构体中:这可以方便我们看清参数中有哪些字段

use std::io::Result;
 
use actix_web::{get, web::Path, App, HttpServer};
use serde::Deserialize;
 
#[get("/")]
async fn index() -> &'static str {
    "Hello, world!"
}
 
#[derive(Deserialize)]
struct PathInfo {
    name: String,
    age: u8,
}
 
#[get("/hello/{name}/{age}")]
async fn get_path_info(info: Path<PathInfo>) -> Result<String> {
    Ok(format!(
        "Hello, {}! You are {} years old.",
        info.name, info.age
    ))
}
 
#[actix_web::main]
async fn main() -> Result<()> {
    println!("Server running on http://127.0.0.1:8888");
    HttpServer::new(|| App::new().service(index).service(get_path_info))
        .bind(("127.0.0.1", 8888))?
        .run()
        .await
}

我们也可以在原生的HttpRequest上去获取,但是这样没有类型安全保障:

#[get("/users/{user_id}/{friend}")] // <- define path parameters
async fn index(req: HttpRequest) -> Result<String> {
    let name: String = req.match_info().get("friend").unwrap().parse().unwrap();
    let userid: i32 = req.match_info().query("user_id").parse().unwrap();
 
    Ok(format!("Welcome {}, user_id {}!", name, userid))
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    use actix_web::{App, HttpServer};
 
    HttpServer::new(|| App::new().service(index))
        .bind(("127.0.0.1", 8080))?
        .run()
        .await
}

Query

这个extractor用于从query parameter中获取参数:

http://xxx.com?a=1&b=2

use actix_web::{get, web, App, HttpServer};
use serde::Deserialize;
 
#[derive(Deserialize)]
struct Info {
    username: String,
}
 
// this handler gets called if the query deserializes into `Info` successfully
// otherwise a 400 Bad Request error response is returned
#[get("/")]
async fn index(info: web::Query<Info>) -> String {
    format!("Welcome {}!", info.username)
}

JSON

JSON允许将一个request body反序列化成一个结构体。

use actix_web::{post, web, App, HttpServer, Result};
use serde::Deserialize;
 
#[derive(Deserialize)]
struct Info {
    username: String,
}
 
/// deserialize `Info` from request's body
#[post("/submit")]
async fn submit(info: web::Json<Info>) -> Result<String> {
    Ok(format!("Welcome {}!", info.username))
}

某些extractor提供了提供了配置处理提取过程的方式,为了配置对这个extractor进行配置,我们将这个配置对象传递给app_data,你能够配置payload的最大的大小以及进行自定义的错误处理,示例代码如下:

use actix_web::{error, web, App, HttpResponse, HttpServer, Responder};
use serde::Deserialize;
 
#[derive(Deserialize)]
struct Info {
    username: String,
}
 
/// deserialize `Info` from request's body, max payload size is 4kb
async fn index(info: web::Json<Info>) -> impl Responder {
    format!("Welcome {}!", info.username)
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        let json_config = web::JsonConfig::default()
            .limit(4096)
            .error_handler(|err, _req| {
                // create custom error response
                error::InternalError::from_response(err, HttpResponse::Conflict().finish())
                    .into()
            });
 
        App::new().service(
            web::resource("/")
                // change json extractor configuration
                .app_data(json_config)
                .route(web::post().to(index)),
        )
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

URL-encoded form

和JSON一样,URL-encoded form请求体也能被提取到一个对象上:

FormConfig能够对这个extractor的处理进行配置。

use actix_web::{post, web, App, HttpServer, Result};
use serde::Deserialize;
 
#[derive(Deserialize)]
struct FormData {
    username: String,
}
 
/// extract form data using serde
/// this handler gets called only if the content type is *x-www-form-urlencoded*
/// and the content of the request could be deserialized to a `FormData` struct
#[post("/")]
async fn index(form: web::Form<FormData>) -> Result<String> {
    Ok(format!("Welcome {}!", form.username))
}

Other

参考:https://actix.rs/docs/extractors#other

Application State extractor

我们可以使用web::Data这个extractor来提取Application State。然而state是一个只读的引用,如果你需要改变状态:

use actix_web::{web, App, HttpServer, Responder};
use std::cell::Cell;
 
#[derive(Clone)]
struct AppState {
    count: Cell<usize>,
}
 
async fn show_count(data: web::Data<AppState>) -> impl Responder {
    format!("count: {}", data.count.get())
}
 
async fn add_one(data: web::Data<AppState>) -> impl Responder {
    let count = data.count.get();
    data.count.set(count + 1);
 
    format!("count: {}", data.count.get())
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let data = AppState {
        count: Cell::new(0),
    };
 
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(data.clone()))
            .route("/", web::to(show_count))
            .route("/add", web::to(add_one))
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

上面的代码是能工作的,data.count仅仅记录了每一个worker thread处理请求的数量,为了记录所有thread处理请求的数量,我们必须使用Arc和atomics:

use actix_web::{get, web, App, HttpServer, Responder};
use std::{
    cell::Cell,
    sync::atomic::{AtomicUsize, Ordering},
    sync::Arc,
};
 
#[derive(Clone)]
struct AppState {
    local_count: Cell<usize>,
    global_count: Arc<AtomicUsize>,
}
 
#[get("/")]
async fn show_count(data: web::Data<AppState>) -> impl Responder {
    format!(
        "global_count: {}\nlocal_count: {}",
        data.global_count.load(Ordering::Relaxed),
        data.local_count.get()
    )
}
 
#[get("/add")]
async fn add_one(data: web::Data<AppState>) -> impl Responder {
    data.global_count.fetch_add(1, Ordering::Relaxed);
 
    let local_count = data.local_count.get();
    data.local_count.set(local_count + 1);
 
    format!(
        "global_count: {}\nlocal_count: {}",
        data.global_count.load(Ordering::Relaxed),
        data.local_count.get()
    )
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let data = AppState {
        local_count: Cell::new(0),
        global_count: Arc::new(AtomicUsize::new(0)),
    };
 
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(data.clone()))
            .service(show_count)
            .service(add_one)
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}

Handler

handle是一个async的函数,每个handler接受0-n个参数,他们能够从request中进行提取,并且返回一个可以被转换成HttpResponse的类型。

请求处理被分为两个阶段:

  • 首先,handler function被调用,返回一个实现了Responder特征的返回值
  • 然后,repond_to()这个函数被调用,将返回的结果转换成HttpResponse或者一个Error

Actix默认提供了对rust中的一些标准类型对Responder的实现。(&str,String等等)。

async fn index(_req: HttpRequest) -> &'static str {
    "Hello world!"
}
 
async fn index(_req: HttpRequest) -> impl Responder {
    web::Bytes::from_static(b"Hello world!")
}

响应自定义类型

为了使我们的自定义类型可以作为handler的返回值,我们需要为其实现Responder trait。

use actix_web::{
    body::BoxBody, get, http::header::ContentType, App, HttpResponse, HttpServer, Responder,
};
use serde::{Deserialize, Serialize};
 
#[derive(Deserialize, Serialize)]
struct MyObj {
    name: &'static str,
}
 
impl Responder for MyObj {
    type Body = BoxBody;
    fn respond_to(self, _req: &actix_web::HttpRequest) -> actix_web::HttpResponse<Self::Body> {
        let body = serde_json::to_string(&self).unwrap();
        HttpResponse::Ok()
            .content_type(ContentType::json())
            .body(body)
    }
}
#[get("/")]
async fn index() -> impl Responder {
    MyObj { name: "user" }
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().service(index))
        .bind(("127.0.0.1", 8888))?
        .run()
        .await
}

流式响应体

响应体可以异步生成。在这种情况下,响应体必须实现流特征 Stream<Item = Result<Bytes, Error>>

#[get("/stream")]
async fn stream() -> HttpResponse {
    let body = once(ok::<_, Error>(Bytes::from_static(b"bytes")));
    HttpResponse::Ok()
        .content_type("application/json")
        .streaming(body)
}
 
#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().service(stream))
        .bind(("127.0.0.1", 8888))?
        .run()
        .await
}

返回不同的类型(Either)

use actix_web::{Either, Error, HttpResponse};
 
type RegisterResult = Either<HttpResponse, Result<&'static str, Error>>;
 
async fn index() -> RegisterResult {
    if is_a_variant() {
        // choose Left variant
        Either::Left(HttpResponse::BadRequest().body("Bad data"))
    } else {
        // choose Right variant
        Either::Right(Ok("Hello!"))
    }
}