Skip to main content

docker/
client.rs

1use std::path::PathBuf;
2
3use bytes::Bytes;
4use hyper::{
5    Method, StatusCode, Uri,
6    client::conn::http1::SendRequest,
7    header::{CONTENT_TYPE, HOST},
8};
9use hyper_utils::{
10    http_body_util::{BodyExt, Full},
11    rt::TokioIo,
12};
13use serde::{Serialize, de::DeserializeOwned};
14use tokio::{net::UnixStream, sync::Mutex};
15use tracing::{debug, error};
16
17use crate::error::Error;
18
19pub struct Client {
20    socket_path: PathBuf,
21    // we use the interior mutability pattern to avoid users needing to make the client mut
22    // each time they want to send a request.
23    // See here to learn more about the Interior Mutability Pattern
24    // https://doc.rust-lang.org/book/ch15-05-interior-mutability.html
25    // socket: Arc<Mutex<RefCell<Option<SendRequest<Full<Bytes>>>>>>,
26    socket: Mutex<Option<SendRequest<Full<Bytes>>>>,
27}
28
29impl Client {
30    pub fn new(socket_path: Option<&str>) -> Client {
31        let socket_path = socket_path.unwrap_or("/var/run/docker.sock");
32        let socket_path = PathBuf::from(socket_path);
33
34        return Client {
35            socket_path: socket_path,
36            socket: Mutex::new(None),
37        };
38    }
39
40    /// connect to the docker host.
41    /// Note that you don't necessarily need to call `connect`. The client automatically connects
42    /// to the Docker host on the first request if `connect` is not called before.
43    pub async fn connect(&self) -> Result<(), Error> {
44        if self.socket.lock().await.is_some() {
45            return Ok(());
46        }
47
48        let unix_stream = UnixStream::connect(&self.socket_path)
49            .await
50            .map_err(|err| Error::Connecting(err.into()))?;
51        let stream = TokioIo::new(unix_stream);
52
53        let (sender, conn) = hyper::client::conn::http1::handshake(stream)
54            .await
55            .map_err(|err| Error::Connecting(err.into()))?;
56        debug!("connection established");
57
58        // spawn a task to poll the connection and drive the HTTP state
59        tokio::task::spawn(async move {
60            if let Err(err) = conn.await {
61                error!("connection error: {:?}", err);
62            }
63        });
64
65        self.socket.lock().await.replace(sender);
66
67        return Ok(());
68    }
69
70    pub(crate) async fn send_request<R: DeserializeOwned, S: Serialize>(
71        &self,
72        path: &str,
73        query: Option<S>,
74        body: Option<S>,
75    ) -> Result<R, Error> {
76        if self.socket.lock().await.is_none() {
77            self.connect().await?;
78        }
79
80        // first we need to prepare the request for hyper
81        let path_and_query = match query {
82            Some(query_params) => {
83                let query_string = serde_urlencoded::to_string(query_params)
84                    .map_err(|err| Error::Unspecified(format!("encoding request's query parameters: {err}")))?;
85                format!("{path}?{query_string}")
86            }
87            None => path.to_string(),
88        };
89
90        let hyper_uri = Uri::builder()
91            .scheme("unix")
92            .authority("docker")
93            .path_and_query(path_and_query)
94            .build()
95            .map_err(|err| Error::Unspecified(format!("building request's URL: {err}")))?;
96
97        let body = body
98            .map(|body_data| serde_json::to_vec(&body_data))
99            .unwrap_or(Ok(Vec::new()))
100            .map_err(|err| Error::Unspecified(format!("encoding body to JSON: {err}")))?;
101        let body_bytes = Bytes::from(body);
102
103        let hyper_request = hyper::Request::builder()
104            .method(Method::GET)
105            .uri(hyper_uri)
106            .header(HOST, "docker")
107            .header(CONTENT_TYPE, "application/json")
108            .body(Full::new(body_bytes))
109            .map_err(|err| Error::Unspecified(format!("building request: {err}")))?;
110
111        let response = {
112            let mut socket = self.socket.lock().await;
113            // we can safely unwrap here as `connect` would have returned an error earlier if the connection
114            // failed
115            socket
116                .as_mut()
117                .unwrap()
118                .send_request(hyper_request)
119                .await
120                .map_err(|err| Error::Unspecified(format!("sending request: {err}")))?
121        };
122
123        if response.status() != StatusCode::OK {
124            return Err(Error::Unspecified(format!(
125                "received not OK status code: {}",
126                response.status()
127            )));
128        }
129
130        // let mut response_body = BytesMut::with_capacity(response.size_hint().upper().unwrap_or(500) as usize);
131        // while let Some(next) = response.frame().await {
132        //     let frame = next.map_err(|err| Error::Unspecified(format!("reading response: {err}")))?;
133        //     if let Some(chunk) = frame.data_ref() {
134        //         response_body.put(chunk.as_ref());
135        //     }
136        // }
137
138        let response_body = response
139            .collect()
140            .await
141            .map_err(|err| Error::Unspecified(format!("reading response: {err}")))?
142            .to_bytes();
143        let res = serde_json::from_slice(&response_body)
144            .map_err(|err| Error::Unspecified(format!("parsing response: {err}")))?;
145
146        return Ok(res);
147    }
148}