1use std::path::PathBuf;
2
3use bytes::Bytes;
4use hyper::{
5 Method, StatusCode, Uri,
6 client::conn::http1::SendRequest,
7 header::{CONTENT_TYPE, HOST},
8};
9use hyper_utils::{
10 http_body_util::{BodyExt, Full},
11 rt::TokioIo,
12};
13use serde::{Serialize, de::DeserializeOwned};
14use tokio::{net::UnixStream, sync::Mutex};
15use tracing::{debug, error};
16
17use crate::error::Error;
18
19pub struct Client {
20 socket_path: PathBuf,
21 socket: Mutex<Option<SendRequest<Full<Bytes>>>>,
27}
28
29impl Client {
30 pub fn new(socket_path: Option<&str>) -> Client {
31 let socket_path = socket_path.unwrap_or("/var/run/docker.sock");
32 let socket_path = PathBuf::from(socket_path);
33
34 return Client {
35 socket_path: socket_path,
36 socket: Mutex::new(None),
37 };
38 }
39
40 pub async fn connect(&self) -> Result<(), Error> {
44 if self.socket.lock().await.is_some() {
45 return Ok(());
46 }
47
48 let unix_stream = UnixStream::connect(&self.socket_path)
49 .await
50 .map_err(|err| Error::Connecting(err.into()))?;
51 let stream = TokioIo::new(unix_stream);
52
53 let (sender, conn) = hyper::client::conn::http1::handshake(stream)
54 .await
55 .map_err(|err| Error::Connecting(err.into()))?;
56 debug!("connection established");
57
58 tokio::task::spawn(async move {
60 if let Err(err) = conn.await {
61 error!("connection error: {:?}", err);
62 }
63 });
64
65 self.socket.lock().await.replace(sender);
66
67 return Ok(());
68 }
69
70 pub(crate) async fn send_request<R: DeserializeOwned, S: Serialize>(
71 &self,
72 path: &str,
73 query: Option<S>,
74 body: Option<S>,
75 ) -> Result<R, Error> {
76 if self.socket.lock().await.is_none() {
77 self.connect().await?;
78 }
79
80 let path_and_query = match query {
82 Some(query_params) => {
83 let query_string = serde_urlencoded::to_string(query_params)
84 .map_err(|err| Error::Unspecified(format!("encoding request's query parameters: {err}")))?;
85 format!("{path}?{query_string}")
86 }
87 None => path.to_string(),
88 };
89
90 let hyper_uri = Uri::builder()
91 .scheme("unix")
92 .authority("docker")
93 .path_and_query(path_and_query)
94 .build()
95 .map_err(|err| Error::Unspecified(format!("building request's URL: {err}")))?;
96
97 let body = body
98 .map(|body_data| serde_json::to_vec(&body_data))
99 .unwrap_or(Ok(Vec::new()))
100 .map_err(|err| Error::Unspecified(format!("encoding body to JSON: {err}")))?;
101 let body_bytes = Bytes::from(body);
102
103 let hyper_request = hyper::Request::builder()
104 .method(Method::GET)
105 .uri(hyper_uri)
106 .header(HOST, "docker")
107 .header(CONTENT_TYPE, "application/json")
108 .body(Full::new(body_bytes))
109 .map_err(|err| Error::Unspecified(format!("building request: {err}")))?;
110
111 let response = {
112 let mut socket = self.socket.lock().await;
113 socket
116 .as_mut()
117 .unwrap()
118 .send_request(hyper_request)
119 .await
120 .map_err(|err| Error::Unspecified(format!("sending request: {err}")))?
121 };
122
123 if response.status() != StatusCode::OK {
124 return Err(Error::Unspecified(format!(
125 "received not OK status code: {}",
126 response.status()
127 )));
128 }
129
130 let response_body = response
139 .collect()
140 .await
141 .map_err(|err| Error::Unspecified(format!("reading response: {err}")))?
142 .to_bytes();
143 let res = serde_json::from_slice(&response_body)
144 .map_err(|err| Error::Unspecified(format!("parsing response: {err}")))?;
145
146 return Ok(res);
147 }
148}