Skip to main content

s3/
objects.rs

1use std::collections::BTreeMap;
2
3use crypto::{Hasher, sha2::Sha256};
4use quick_xml::{de::from_str, se::to_string};
5use serde::{Deserialize, Serialize};
6
7use crate::client::{
8    ByteStream, Client, Error, HttpClient, HttpMethod, bytes_to_string, canonical_bucket_uri, canonical_object_uri,
9    canonical_query_string, collect_body, consume_empty, header_to_string, header_to_u64,
10};
11
12#[derive(Debug, Clone)]
13pub struct CompletedPart {
14    pub part_number: u32,
15    pub e_tag: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct UploadPartOutput {
20    pub e_tag: Option<String>,
21}
22
23#[derive(Debug, Clone)]
24pub struct CompleteMultipartUploadOutput {
25    pub e_tag: Option<String>,
26}
27
28#[derive(Debug, Clone)]
29pub struct DeletedObject {
30    pub key: String,
31}
32
33#[derive(Debug, Clone)]
34pub struct DeleteObjectsError {
35    pub key: Option<String>,
36    pub code: Option<String>,
37    pub message: Option<String>,
38}
39
40#[derive(Debug, Clone)]
41pub struct DeleteObjectsOutput {
42    pub deleted: Vec<DeletedObject>,
43    pub errors: Vec<DeleteObjectsError>,
44}
45
46#[derive(Debug, Clone)]
47pub struct MultipartUpload {
48    pub key: String,
49    pub upload_id: String,
50    pub initiated: Option<String>,
51}
52
53#[derive(Debug, Clone)]
54pub struct ListMultipartUploadsOutput {
55    pub uploads: Vec<MultipartUpload>,
56}
57
58#[derive(Debug, Clone)]
59pub struct UploadedPart {
60    pub part_number: u32,
61    pub e_tag: Option<String>,
62    pub size: Option<u64>,
63    pub last_modified: Option<String>,
64}
65
66#[derive(Debug, Clone)]
67pub struct ListPartsOutput {
68    pub parts: Vec<UploadedPart>,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct Tag {
73    pub key: String,
74    pub value: String,
75}
76
77#[derive(Debug, Clone)]
78pub struct GetObjectTaggingOutput {
79    pub tags: Vec<Tag>,
80}
81
82pub struct GetObjectOutput {
83    pub body: ByteStream,
84    pub e_tag: Option<String>,
85    pub content_type: Option<String>,
86    pub content_length: Option<u64>,
87}
88
89#[derive(Debug, Clone)]
90pub struct HeadObjectOutput {
91    pub e_tag: Option<String>,
92    pub content_type: Option<String>,
93    pub content_length: Option<u64>,
94}
95
96impl<H: HttpClient> Client<H> {
97    pub async fn put_object(&self, bucket: &str, key: &str, body: &[u8]) -> Result<(), crate::client::Error> {
98        let canonical_uri = canonical_object_uri(bucket, key);
99        let response = self.execute(HttpMethod::Put, &canonical_uri, "", body, bucket).await?;
100        consume_empty(response)
101    }
102
103    pub async fn get_object(&self, bucket: &str, key: &str) -> Result<GetObjectOutput, crate::client::Error> {
104        let canonical_uri = canonical_object_uri(bucket, key);
105        let response = self.execute(HttpMethod::Get, &canonical_uri, "", b"", bucket).await?;
106        let e_tag = header_to_string(&response, "etag");
107        let content_type = header_to_string(&response, "content-type");
108        let content_length = header_to_u64(&response, "content-length");
109
110        Ok(GetObjectOutput {
111            body: response.body,
112            e_tag,
113            content_type,
114            content_length,
115        })
116    }
117
118    pub async fn head_object(&self, bucket: &str, key: &str) -> Result<HeadObjectOutput, crate::client::Error> {
119        let canonical_uri = canonical_object_uri(bucket, key);
120        let response = self.execute(HttpMethod::Head, &canonical_uri, "", b"", bucket).await?;
121
122        Ok(HeadObjectOutput {
123            e_tag: header_to_string(&response, "etag"),
124            content_type: header_to_string(&response, "content-type"),
125            content_length: header_to_u64(&response, "content-length"),
126        })
127    }
128
129    pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<(), crate::client::Error> {
130        let canonical_uri = canonical_object_uri(bucket, key);
131        let response = self
132            .execute(HttpMethod::Delete, &canonical_uri, "", b"", bucket)
133            .await?;
134        consume_empty(response)
135    }
136
137    pub async fn delete_objects(
138        &self,
139        bucket: &str,
140        keys: &[&str],
141    ) -> Result<DeleteObjectsOutput, crate::client::Error> {
142        let canonical_uri = canonical_bucket_uri(bucket);
143        let body = to_string(&DeleteBodyXml {
144            object: keys
145                .iter()
146                .map(|k| DeleteObjectEntryXml {
147                    key: k.to_string(),
148                })
149                .collect(),
150        })
151        .map_err(|e| Error::Xml(quick_xml::DeError::Custom(e.to_string())))?
152        .into_bytes();
153        let checksum_headers = delete_objects_checksum_headers(&body);
154        let response = self
155            .execute_with_headers(HttpMethod::Post, &canonical_uri, "delete=", &body, &checksum_headers, bucket)
156            .await?;
157        let xml_text = bytes_to_string(collect_body(response.body).await?)?;
158        let xml: DeleteResultXml = from_str(&xml_text)?;
159        Ok(DeleteObjectsOutput {
160            deleted: xml
161                .deleted
162                .into_iter()
163                .map(|entry| DeletedObject {
164                    key: entry.key,
165                })
166                .collect(),
167            errors: xml
168                .errors
169                .into_iter()
170                .map(|entry| DeleteObjectsError {
171                    key: entry.key,
172                    code: entry.code,
173                    message: entry.message,
174                })
175                .collect(),
176        })
177    }
178
179    pub async fn create_multipart_upload(&self, bucket: &str, key: &str) -> Result<String, crate::client::Error> {
180        let canonical_uri = canonical_object_uri(bucket, key);
181        let response = self
182            .execute(HttpMethod::Post, &canonical_uri, "uploads=", b"", bucket)
183            .await?;
184        let xml_text = bytes_to_string(collect_body(response.body).await?)?;
185        let xml: InitiateMultipartUploadResultXml = from_str(&xml_text)?;
186        Ok(xml.upload_id)
187    }
188
189    pub async fn upload_part(
190        &self,
191        bucket: &str,
192        key: &str,
193        upload_id: &str,
194        part_number: u32,
195        body: &[u8],
196    ) -> Result<UploadPartOutput, crate::client::Error> {
197        let canonical_uri = canonical_object_uri(bucket, key);
198        let mut params = BTreeMap::new();
199        params.insert("partNumber".to_string(), part_number.to_string());
200        params.insert("uploadId".to_string(), upload_id.to_string());
201        let canonical_query = canonical_query_string(&params);
202        let response = self
203            .execute(HttpMethod::Put, &canonical_uri, &canonical_query, body, bucket)
204            .await?;
205        let e_tag = header_to_string(&response, "etag");
206        Ok(UploadPartOutput {
207            e_tag,
208        })
209    }
210
211    pub async fn complete_multipart_upload(
212        &self,
213        bucket: &str,
214        key: &str,
215        upload_id: &str,
216        parts: &[CompletedPart],
217    ) -> Result<CompleteMultipartUploadOutput, crate::client::Error> {
218        let canonical_uri = canonical_object_uri(bucket, key);
219        let mut params = BTreeMap::new();
220        params.insert("uploadId".to_string(), upload_id.to_string());
221        let canonical_query = canonical_query_string(&params);
222        let xml_body = to_string(&CompleteMultipartBodyXml {
223            part: parts
224                .iter()
225                .map(|p| CompletePartEntryXml {
226                    part_number: p.part_number,
227                    e_tag: p.e_tag.clone(),
228                })
229                .collect(),
230        })
231        .map_err(|e| Error::Xml(quick_xml::DeError::Custom(e.to_string())))?
232        .into_bytes();
233        let response = self
234            .execute(HttpMethod::Post, &canonical_uri, &canonical_query, &xml_body, bucket)
235            .await?;
236        let xml_text = bytes_to_string(collect_body(response.body).await?)?;
237        let xml: CompleteMultipartUploadResultXml = from_str(&xml_text)?;
238        Ok(CompleteMultipartUploadOutput {
239            e_tag: xml.e_tag,
240        })
241    }
242
243    pub async fn abort_multipart_upload(
244        &self,
245        bucket: &str,
246        key: &str,
247        upload_id: &str,
248    ) -> Result<(), crate::client::Error> {
249        let canonical_uri = canonical_object_uri(bucket, key);
250        let mut params = BTreeMap::new();
251        params.insert("uploadId".to_string(), upload_id.to_string());
252        let canonical_query = canonical_query_string(&params);
253        let response = self
254            .execute(HttpMethod::Delete, &canonical_uri, &canonical_query, b"", bucket)
255            .await?;
256        consume_empty(response)
257    }
258
259    pub async fn list_multipart_uploads(
260        &self,
261        bucket: &str,
262    ) -> Result<ListMultipartUploadsOutput, crate::client::Error> {
263        let canonical_uri = canonical_bucket_uri(bucket);
264        let response = self
265            .execute(HttpMethod::Get, &canonical_uri, "uploads=", b"", bucket)
266            .await?;
267        let xml_text = bytes_to_string(collect_body(response.body).await?)?;
268        let xml: ListMultipartUploadsResultXml = from_str(&xml_text)?;
269        Ok(ListMultipartUploadsOutput {
270            uploads: xml
271                .uploads
272                .into_iter()
273                .map(|entry| MultipartUpload {
274                    key: entry.key,
275                    upload_id: entry.upload_id,
276                    initiated: entry.initiated,
277                })
278                .collect(),
279        })
280    }
281
282    pub async fn list_parts(
283        &self,
284        bucket: &str,
285        key: &str,
286        upload_id: &str,
287    ) -> Result<ListPartsOutput, crate::client::Error> {
288        let canonical_uri = canonical_object_uri(bucket, key);
289        let mut params = BTreeMap::new();
290        params.insert("uploadId".to_string(), upload_id.to_string());
291        let canonical_query = canonical_query_string(&params);
292        let response = self
293            .execute(HttpMethod::Get, &canonical_uri, &canonical_query, b"", bucket)
294            .await?;
295        let xml_text = bytes_to_string(collect_body(response.body).await?)?;
296        let xml: ListPartsResultXml = from_str(&xml_text)?;
297        Ok(ListPartsOutput {
298            parts: xml
299                .parts
300                .into_iter()
301                .map(|entry| UploadedPart {
302                    part_number: entry.part_number,
303                    e_tag: entry.e_tag,
304                    size: entry.size,
305                    last_modified: entry.last_modified,
306                })
307                .collect(),
308        })
309    }
310
311    pub async fn put_object_tagging(&self, bucket: &str, key: &str, tags: &[Tag]) -> Result<(), crate::client::Error> {
312        let canonical_uri = canonical_object_uri(bucket, key);
313        let body = to_string(&TaggingBodyXml {
314            tag_set: TagSetBodyXml {
315                tag: tags
316                    .iter()
317                    .map(|t| TagEntryXml {
318                        key: t.key.clone(),
319                        value: t.value.clone(),
320                    })
321                    .collect(),
322            },
323        })
324        .map_err(|e| Error::Xml(quick_xml::DeError::Custom(e.to_string())))?
325        .into_bytes();
326        let response = self
327            .execute(HttpMethod::Put, &canonical_uri, "tagging=", &body, bucket)
328            .await?;
329        consume_empty(response)
330    }
331
332    pub async fn get_object_tagging(
333        &self,
334        bucket: &str,
335        key: &str,
336    ) -> Result<GetObjectTaggingOutput, crate::client::Error> {
337        let canonical_uri = canonical_object_uri(bucket, key);
338        let response = self
339            .execute(HttpMethod::Get, &canonical_uri, "tagging=", b"", bucket)
340            .await?;
341        let xml_text = bytes_to_string(collect_body(response.body).await?)?;
342        let xml: TaggingXml = from_str(&xml_text)?;
343        Ok(GetObjectTaggingOutput {
344            tags: xml
345                .tag_set
346                .tag
347                .into_iter()
348                .map(|entry| Tag {
349                    key: entry.key,
350                    value: entry.value,
351                })
352                .collect(),
353        })
354    }
355
356    pub async fn delete_object_tagging(&self, bucket: &str, key: &str) -> Result<(), crate::client::Error> {
357        let canonical_uri = canonical_object_uri(bucket, key);
358        let response = self
359            .execute(HttpMethod::Delete, &canonical_uri, "tagging=", b"", bucket)
360            .await?;
361        consume_empty(response)
362    }
363}
364
365fn delete_objects_checksum_headers(body: &[u8]) -> [(String, String); 2] {
366    [
367        ("x-amz-sdk-checksum-algorithm".to_string(), "SHA256".to_string()),
368        ("x-amz-checksum-sha256".to_string(), checksum_sha256(body)),
369    ]
370}
371
372fn checksum_sha256(body: &[u8]) -> String {
373    base64::encode(Sha256::hash(body).as_ref(), base64::Alphabet::Standard)
374}
375
376#[derive(Debug, Serialize)]
377#[serde(rename = "Delete")]
378struct DeleteBodyXml {
379    #[serde(rename = "Object")]
380    object: Vec<DeleteObjectEntryXml>,
381}
382
383#[derive(Debug, Serialize)]
384struct DeleteObjectEntryXml {
385    #[serde(rename = "Key")]
386    key: String,
387}
388
389#[derive(Debug, Serialize)]
390#[serde(rename = "CompleteMultipartUpload")]
391struct CompleteMultipartBodyXml {
392    #[serde(rename = "Part")]
393    part: Vec<CompletePartEntryXml>,
394}
395
396#[derive(Debug, Serialize)]
397struct CompletePartEntryXml {
398    #[serde(rename = "PartNumber")]
399    part_number: u32,
400    #[serde(rename = "ETag")]
401    e_tag: String,
402}
403
404#[derive(Debug, Serialize)]
405#[serde(rename = "Tagging")]
406struct TaggingBodyXml {
407    #[serde(rename = "TagSet")]
408    tag_set: TagSetBodyXml,
409}
410
411#[derive(Debug, Serialize)]
412struct TagSetBodyXml {
413    #[serde(rename = "Tag")]
414    tag: Vec<TagEntryXml>,
415}
416
417#[derive(Debug, Serialize)]
418struct TagEntryXml {
419    #[serde(rename = "Key")]
420    key: String,
421    #[serde(rename = "Value")]
422    value: String,
423}
424
425#[derive(Debug, Deserialize)]
426#[serde(rename = "InitiateMultipartUploadResult")]
427struct InitiateMultipartUploadResultXml {
428    #[serde(rename = "UploadId")]
429    upload_id: String,
430}
431
432#[derive(Debug, Deserialize)]
433#[serde(rename = "CompleteMultipartUploadResult")]
434struct CompleteMultipartUploadResultXml {
435    #[serde(rename = "ETag")]
436    e_tag: Option<String>,
437}
438
439#[derive(Debug, Deserialize)]
440#[serde(rename = "DeleteResult")]
441struct DeleteResultXml {
442    #[serde(rename = "Deleted", default)]
443    deleted: Vec<DeletedXml>,
444    #[serde(rename = "Error", default)]
445    errors: Vec<DeleteErrorXml>,
446}
447
448#[derive(Debug, Deserialize)]
449struct DeletedXml {
450    #[serde(rename = "Key")]
451    key: String,
452}
453
454#[derive(Debug, Deserialize)]
455struct DeleteErrorXml {
456    #[serde(rename = "Key")]
457    key: Option<String>,
458    #[serde(rename = "Code")]
459    code: Option<String>,
460    #[serde(rename = "Message")]
461    message: Option<String>,
462}
463
464#[derive(Debug, Deserialize)]
465#[serde(rename = "ListMultipartUploadsResult")]
466struct ListMultipartUploadsResultXml {
467    #[serde(rename = "Upload", default)]
468    uploads: Vec<MultipartUploadXml>,
469}
470
471#[derive(Debug, Deserialize)]
472struct MultipartUploadXml {
473    #[serde(rename = "Key")]
474    key: String,
475    #[serde(rename = "UploadId")]
476    upload_id: String,
477    #[serde(rename = "Initiated")]
478    initiated: Option<String>,
479}
480
481#[derive(Debug, Deserialize)]
482#[serde(rename = "ListPartsResult")]
483struct ListPartsResultXml {
484    #[serde(rename = "Part", default)]
485    parts: Vec<PartXml>,
486}
487
488#[derive(Debug, Deserialize)]
489struct PartXml {
490    #[serde(rename = "PartNumber")]
491    part_number: u32,
492    #[serde(rename = "ETag")]
493    e_tag: Option<String>,
494    #[serde(rename = "Size")]
495    size: Option<u64>,
496    #[serde(rename = "LastModified")]
497    last_modified: Option<String>,
498}
499
500#[derive(Debug, Deserialize)]
501#[serde(rename = "Tagging")]
502struct TaggingXml {
503    #[serde(rename = "TagSet")]
504    tag_set: TagSetXml,
505}
506
507#[derive(Debug, Deserialize)]
508struct TagSetXml {
509    #[serde(rename = "Tag", default)]
510    tag: Vec<TagXml>,
511}
512
513#[derive(Debug, Deserialize)]
514struct TagXml {
515    #[serde(rename = "Key")]
516    key: String,
517    #[serde(rename = "Value")]
518    value: String,
519}
520
521#[cfg(test)]
522mod tests {
523    use std::sync::{Arc, Mutex};
524
525    use quick_xml::de::from_str;
526    use tokio::runtime::Runtime;
527
528    use super::*;
529    use crate::client::{HttpRequest, HttpResponseData, StaticCredentials};
530
531    #[test]
532    fn builds_delete_objects_body() {
533        let xml = to_string(&DeleteBodyXml {
534            object: vec![
535                DeleteObjectEntryXml {
536                    key: "a".to_string(),
537                },
538                DeleteObjectEntryXml {
539                    key: "b/c".to_string(),
540                },
541            ],
542        })
543        .unwrap();
544        assert!(xml.contains("<Key>a</Key>"));
545        assert!(xml.contains("<Key>b/c</Key>"));
546    }
547
548    #[test]
549    fn parses_delete_objects_response() {
550        let xml = r#"
551<DeleteResult>
552  <Deleted><Key>a.txt</Key></Deleted>
553  <Error><Key>b.txt</Key><Code>AccessDenied</Code><Message>Denied</Message></Error>
554</DeleteResult>
555"#;
556        let parsed: DeleteResultXml = from_str(xml).unwrap();
557        assert_eq!(parsed.deleted.len(), 1);
558        assert_eq!(parsed.deleted[0].key, "a.txt");
559        assert_eq!(parsed.errors.len(), 1);
560        assert_eq!(parsed.errors[0].code.as_deref(), Some("AccessDenied"));
561    }
562
563    #[test]
564    fn delete_objects_sets_sha256_checksum_headers() {
565        #[derive(Clone)]
566        struct CapturingHttpClient {
567            request: Arc<Mutex<Option<HttpRequest>>>,
568        }
569
570        impl crate::client::HttpClient for CapturingHttpClient {
571            async fn send(&self, request: HttpRequest) -> Result<HttpResponseData, crate::client::HttpError> {
572                *self.request.lock().unwrap() = Some(request);
573                Ok(HttpResponseData {
574                    status_code: 200,
575                    headers: Vec::new(),
576                    body: Box::pin(futures_util::stream::once(async {
577                        Ok(bytes::Bytes::from_static(b"<DeleteResult />"))
578                    })),
579                })
580            }
581        }
582
583        let captured = Arc::new(Mutex::new(None));
584        let http = CapturingHttpClient {
585            request: Arc::clone(&captured),
586        };
587        let cfg = crate::client::ClientConfig {
588            endpoint: "http://127.0.0.1:9000",
589            credentials: StaticCredentials {
590                access_key_id: "minioadmin",
591                secret_access_key: "minioadmin",
592                session_token: "",
593            },
594            region: "auto",
595            virtual_hosted: false,
596        };
597        let client = Client::with_http_client(&cfg, http).unwrap();
598
599        Runtime::new().unwrap().block_on(async {
600            client.delete_objects("bucket", &["a", "b/c"]).await.unwrap();
601        });
602
603        let request = captured.lock().unwrap().clone().unwrap();
604        let checksum_algorithm = request
605            .headers
606            .iter()
607            .find(|(name, _)| name.eq_ignore_ascii_case("x-amz-sdk-checksum-algorithm"))
608            .map(|(_, value)| value.as_str());
609        let checksum_sha256 = request
610            .headers
611            .iter()
612            .find(|(name, _)| name.eq_ignore_ascii_case("x-amz-checksum-sha256"))
613            .map(|(_, value)| value.as_str());
614
615        assert_eq!(checksum_algorithm, Some("SHA256"));
616        let expected_body = to_string(&DeleteBodyXml {
617            object: vec![
618                DeleteObjectEntryXml {
619                    key: "a".to_string(),
620                },
621                DeleteObjectEntryXml {
622                    key: "b/c".to_string(),
623                },
624            ],
625        })
626        .unwrap();
627        assert_eq!(checksum_sha256, Some(super::checksum_sha256(expected_body.as_bytes()).as_str()));
628    }
629
630    #[test]
631    fn builds_and_parses_tagging_xml() {
632        let body = to_string(&TaggingBodyXml {
633            tag_set: TagSetBodyXml {
634                tag: vec![
635                    TagEntryXml {
636                        key: "env".to_string(),
637                        value: "dev".to_string(),
638                    },
639                    TagEntryXml {
640                        key: "team".to_string(),
641                        value: "infra".to_string(),
642                    },
643                ],
644            },
645        })
646        .unwrap();
647        assert!(body.contains("<Key>env</Key><Value>dev</Value>"));
648        assert!(body.contains("<Key>team</Key><Value>infra</Value>"));
649
650        let parsed: TaggingXml =
651            from_str("<Tagging><TagSet><Tag><Key>a</Key><Value>b</Value></Tag></TagSet></Tagging>").unwrap();
652        assert_eq!(parsed.tag_set.tag.len(), 1);
653        assert_eq!(parsed.tag_set.tag[0].key, "a");
654        assert_eq!(parsed.tag_set.tag[0].value, "b");
655    }
656
657    #[test]
658    fn parses_list_parts_response() {
659        let xml = r#"
660<ListPartsResult>
661  <Part>
662    <PartNumber>1</PartNumber>
663    <ETag>"etag-1"</ETag>
664    <Size>5</Size>
665    <LastModified>2026-01-01T00:00:00.000Z</LastModified>
666  </Part>
667</ListPartsResult>
668"#;
669        let parsed: ListPartsResultXml = from_str(xml).unwrap();
670        assert_eq!(parsed.parts.len(), 1);
671        assert_eq!(parsed.parts[0].part_number, 1);
672    }
673
674    #[test]
675    fn parses_list_multipart_uploads_response() {
676        let xml = r#"
677<ListMultipartUploadsResult>
678  <Upload>
679    <Key>big.bin</Key>
680    <UploadId>upload-1</UploadId>
681    <Initiated>2026-01-01T00:00:00.000Z</Initiated>
682  </Upload>
683</ListMultipartUploadsResult>
684"#;
685        let parsed: ListMultipartUploadsResultXml = from_str(xml).unwrap();
686        assert_eq!(parsed.uploads.len(), 1);
687        assert_eq!(parsed.uploads[0].key, "big.bin");
688        assert_eq!(parsed.uploads[0].upload_id, "upload-1");
689    }
690}