1use std::collections::BTreeMap;
2
3use crypto::{Hasher, sha2::Sha256};
4use quick_xml::{de::from_str, se::to_string};
5use serde::{Deserialize, Serialize};
6
7use crate::client::{
8 ByteStream, Client, Error, HttpClient, HttpMethod, bytes_to_string, canonical_bucket_uri, canonical_object_uri,
9 canonical_query_string, collect_body, consume_empty, header_to_string, header_to_u64,
10};
11
12#[derive(Debug, Clone)]
13pub struct CompletedPart {
14 pub part_number: u32,
15 pub e_tag: String,
16}
17
18#[derive(Debug, Clone)]
19pub struct UploadPartOutput {
20 pub e_tag: Option<String>,
21}
22
23#[derive(Debug, Clone)]
24pub struct CompleteMultipartUploadOutput {
25 pub e_tag: Option<String>,
26}
27
28#[derive(Debug, Clone)]
29pub struct DeletedObject {
30 pub key: String,
31}
32
33#[derive(Debug, Clone)]
34pub struct DeleteObjectsError {
35 pub key: Option<String>,
36 pub code: Option<String>,
37 pub message: Option<String>,
38}
39
40#[derive(Debug, Clone)]
41pub struct DeleteObjectsOutput {
42 pub deleted: Vec<DeletedObject>,
43 pub errors: Vec<DeleteObjectsError>,
44}
45
46#[derive(Debug, Clone)]
47pub struct MultipartUpload {
48 pub key: String,
49 pub upload_id: String,
50 pub initiated: Option<String>,
51}
52
53#[derive(Debug, Clone)]
54pub struct ListMultipartUploadsOutput {
55 pub uploads: Vec<MultipartUpload>,
56}
57
58#[derive(Debug, Clone)]
59pub struct UploadedPart {
60 pub part_number: u32,
61 pub e_tag: Option<String>,
62 pub size: Option<u64>,
63 pub last_modified: Option<String>,
64}
65
66#[derive(Debug, Clone)]
67pub struct ListPartsOutput {
68 pub parts: Vec<UploadedPart>,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct Tag {
73 pub key: String,
74 pub value: String,
75}
76
77#[derive(Debug, Clone)]
78pub struct GetObjectTaggingOutput {
79 pub tags: Vec<Tag>,
80}
81
82pub struct GetObjectOutput {
83 pub body: ByteStream,
84 pub e_tag: Option<String>,
85 pub content_type: Option<String>,
86 pub content_length: Option<u64>,
87}
88
89#[derive(Debug, Clone)]
90pub struct HeadObjectOutput {
91 pub e_tag: Option<String>,
92 pub content_type: Option<String>,
93 pub content_length: Option<u64>,
94}
95
96impl<H: HttpClient> Client<H> {
97 pub async fn put_object(&self, bucket: &str, key: &str, body: &[u8]) -> Result<(), crate::client::Error> {
98 let canonical_uri = canonical_object_uri(bucket, key);
99 let response = self.execute(HttpMethod::Put, &canonical_uri, "", body, bucket).await?;
100 consume_empty(response)
101 }
102
103 pub async fn get_object(&self, bucket: &str, key: &str) -> Result<GetObjectOutput, crate::client::Error> {
104 let canonical_uri = canonical_object_uri(bucket, key);
105 let response = self.execute(HttpMethod::Get, &canonical_uri, "", b"", bucket).await?;
106 let e_tag = header_to_string(&response, "etag");
107 let content_type = header_to_string(&response, "content-type");
108 let content_length = header_to_u64(&response, "content-length");
109
110 Ok(GetObjectOutput {
111 body: response.body,
112 e_tag,
113 content_type,
114 content_length,
115 })
116 }
117
118 pub async fn head_object(&self, bucket: &str, key: &str) -> Result<HeadObjectOutput, crate::client::Error> {
119 let canonical_uri = canonical_object_uri(bucket, key);
120 let response = self.execute(HttpMethod::Head, &canonical_uri, "", b"", bucket).await?;
121
122 Ok(HeadObjectOutput {
123 e_tag: header_to_string(&response, "etag"),
124 content_type: header_to_string(&response, "content-type"),
125 content_length: header_to_u64(&response, "content-length"),
126 })
127 }
128
129 pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<(), crate::client::Error> {
130 let canonical_uri = canonical_object_uri(bucket, key);
131 let response = self
132 .execute(HttpMethod::Delete, &canonical_uri, "", b"", bucket)
133 .await?;
134 consume_empty(response)
135 }
136
137 pub async fn delete_objects(
138 &self,
139 bucket: &str,
140 keys: &[&str],
141 ) -> Result<DeleteObjectsOutput, crate::client::Error> {
142 let canonical_uri = canonical_bucket_uri(bucket);
143 let body = to_string(&DeleteBodyXml {
144 object: keys
145 .iter()
146 .map(|k| DeleteObjectEntryXml {
147 key: k.to_string(),
148 })
149 .collect(),
150 })
151 .map_err(|e| Error::Xml(quick_xml::DeError::Custom(e.to_string())))?
152 .into_bytes();
153 let checksum_headers = delete_objects_checksum_headers(&body);
154 let response = self
155 .execute_with_headers(HttpMethod::Post, &canonical_uri, "delete=", &body, &checksum_headers, bucket)
156 .await?;
157 let xml_text = bytes_to_string(collect_body(response.body).await?)?;
158 let xml: DeleteResultXml = from_str(&xml_text)?;
159 Ok(DeleteObjectsOutput {
160 deleted: xml
161 .deleted
162 .into_iter()
163 .map(|entry| DeletedObject {
164 key: entry.key,
165 })
166 .collect(),
167 errors: xml
168 .errors
169 .into_iter()
170 .map(|entry| DeleteObjectsError {
171 key: entry.key,
172 code: entry.code,
173 message: entry.message,
174 })
175 .collect(),
176 })
177 }
178
179 pub async fn create_multipart_upload(&self, bucket: &str, key: &str) -> Result<String, crate::client::Error> {
180 let canonical_uri = canonical_object_uri(bucket, key);
181 let response = self
182 .execute(HttpMethod::Post, &canonical_uri, "uploads=", b"", bucket)
183 .await?;
184 let xml_text = bytes_to_string(collect_body(response.body).await?)?;
185 let xml: InitiateMultipartUploadResultXml = from_str(&xml_text)?;
186 Ok(xml.upload_id)
187 }
188
189 pub async fn upload_part(
190 &self,
191 bucket: &str,
192 key: &str,
193 upload_id: &str,
194 part_number: u32,
195 body: &[u8],
196 ) -> Result<UploadPartOutput, crate::client::Error> {
197 let canonical_uri = canonical_object_uri(bucket, key);
198 let mut params = BTreeMap::new();
199 params.insert("partNumber".to_string(), part_number.to_string());
200 params.insert("uploadId".to_string(), upload_id.to_string());
201 let canonical_query = canonical_query_string(¶ms);
202 let response = self
203 .execute(HttpMethod::Put, &canonical_uri, &canonical_query, body, bucket)
204 .await?;
205 let e_tag = header_to_string(&response, "etag");
206 Ok(UploadPartOutput {
207 e_tag,
208 })
209 }
210
211 pub async fn complete_multipart_upload(
212 &self,
213 bucket: &str,
214 key: &str,
215 upload_id: &str,
216 parts: &[CompletedPart],
217 ) -> Result<CompleteMultipartUploadOutput, crate::client::Error> {
218 let canonical_uri = canonical_object_uri(bucket, key);
219 let mut params = BTreeMap::new();
220 params.insert("uploadId".to_string(), upload_id.to_string());
221 let canonical_query = canonical_query_string(¶ms);
222 let xml_body = to_string(&CompleteMultipartBodyXml {
223 part: parts
224 .iter()
225 .map(|p| CompletePartEntryXml {
226 part_number: p.part_number,
227 e_tag: p.e_tag.clone(),
228 })
229 .collect(),
230 })
231 .map_err(|e| Error::Xml(quick_xml::DeError::Custom(e.to_string())))?
232 .into_bytes();
233 let response = self
234 .execute(HttpMethod::Post, &canonical_uri, &canonical_query, &xml_body, bucket)
235 .await?;
236 let xml_text = bytes_to_string(collect_body(response.body).await?)?;
237 let xml: CompleteMultipartUploadResultXml = from_str(&xml_text)?;
238 Ok(CompleteMultipartUploadOutput {
239 e_tag: xml.e_tag,
240 })
241 }
242
243 pub async fn abort_multipart_upload(
244 &self,
245 bucket: &str,
246 key: &str,
247 upload_id: &str,
248 ) -> Result<(), crate::client::Error> {
249 let canonical_uri = canonical_object_uri(bucket, key);
250 let mut params = BTreeMap::new();
251 params.insert("uploadId".to_string(), upload_id.to_string());
252 let canonical_query = canonical_query_string(¶ms);
253 let response = self
254 .execute(HttpMethod::Delete, &canonical_uri, &canonical_query, b"", bucket)
255 .await?;
256 consume_empty(response)
257 }
258
259 pub async fn list_multipart_uploads(
260 &self,
261 bucket: &str,
262 ) -> Result<ListMultipartUploadsOutput, crate::client::Error> {
263 let canonical_uri = canonical_bucket_uri(bucket);
264 let response = self
265 .execute(HttpMethod::Get, &canonical_uri, "uploads=", b"", bucket)
266 .await?;
267 let xml_text = bytes_to_string(collect_body(response.body).await?)?;
268 let xml: ListMultipartUploadsResultXml = from_str(&xml_text)?;
269 Ok(ListMultipartUploadsOutput {
270 uploads: xml
271 .uploads
272 .into_iter()
273 .map(|entry| MultipartUpload {
274 key: entry.key,
275 upload_id: entry.upload_id,
276 initiated: entry.initiated,
277 })
278 .collect(),
279 })
280 }
281
282 pub async fn list_parts(
283 &self,
284 bucket: &str,
285 key: &str,
286 upload_id: &str,
287 ) -> Result<ListPartsOutput, crate::client::Error> {
288 let canonical_uri = canonical_object_uri(bucket, key);
289 let mut params = BTreeMap::new();
290 params.insert("uploadId".to_string(), upload_id.to_string());
291 let canonical_query = canonical_query_string(¶ms);
292 let response = self
293 .execute(HttpMethod::Get, &canonical_uri, &canonical_query, b"", bucket)
294 .await?;
295 let xml_text = bytes_to_string(collect_body(response.body).await?)?;
296 let xml: ListPartsResultXml = from_str(&xml_text)?;
297 Ok(ListPartsOutput {
298 parts: xml
299 .parts
300 .into_iter()
301 .map(|entry| UploadedPart {
302 part_number: entry.part_number,
303 e_tag: entry.e_tag,
304 size: entry.size,
305 last_modified: entry.last_modified,
306 })
307 .collect(),
308 })
309 }
310
311 pub async fn put_object_tagging(&self, bucket: &str, key: &str, tags: &[Tag]) -> Result<(), crate::client::Error> {
312 let canonical_uri = canonical_object_uri(bucket, key);
313 let body = to_string(&TaggingBodyXml {
314 tag_set: TagSetBodyXml {
315 tag: tags
316 .iter()
317 .map(|t| TagEntryXml {
318 key: t.key.clone(),
319 value: t.value.clone(),
320 })
321 .collect(),
322 },
323 })
324 .map_err(|e| Error::Xml(quick_xml::DeError::Custom(e.to_string())))?
325 .into_bytes();
326 let response = self
327 .execute(HttpMethod::Put, &canonical_uri, "tagging=", &body, bucket)
328 .await?;
329 consume_empty(response)
330 }
331
332 pub async fn get_object_tagging(
333 &self,
334 bucket: &str,
335 key: &str,
336 ) -> Result<GetObjectTaggingOutput, crate::client::Error> {
337 let canonical_uri = canonical_object_uri(bucket, key);
338 let response = self
339 .execute(HttpMethod::Get, &canonical_uri, "tagging=", b"", bucket)
340 .await?;
341 let xml_text = bytes_to_string(collect_body(response.body).await?)?;
342 let xml: TaggingXml = from_str(&xml_text)?;
343 Ok(GetObjectTaggingOutput {
344 tags: xml
345 .tag_set
346 .tag
347 .into_iter()
348 .map(|entry| Tag {
349 key: entry.key,
350 value: entry.value,
351 })
352 .collect(),
353 })
354 }
355
356 pub async fn delete_object_tagging(&self, bucket: &str, key: &str) -> Result<(), crate::client::Error> {
357 let canonical_uri = canonical_object_uri(bucket, key);
358 let response = self
359 .execute(HttpMethod::Delete, &canonical_uri, "tagging=", b"", bucket)
360 .await?;
361 consume_empty(response)
362 }
363}
364
365fn delete_objects_checksum_headers(body: &[u8]) -> [(String, String); 2] {
366 [
367 ("x-amz-sdk-checksum-algorithm".to_string(), "SHA256".to_string()),
368 ("x-amz-checksum-sha256".to_string(), checksum_sha256(body)),
369 ]
370}
371
372fn checksum_sha256(body: &[u8]) -> String {
373 base64::encode(Sha256::hash(body).as_ref(), base64::Alphabet::Standard)
374}
375
376#[derive(Debug, Serialize)]
377#[serde(rename = "Delete")]
378struct DeleteBodyXml {
379 #[serde(rename = "Object")]
380 object: Vec<DeleteObjectEntryXml>,
381}
382
383#[derive(Debug, Serialize)]
384struct DeleteObjectEntryXml {
385 #[serde(rename = "Key")]
386 key: String,
387}
388
389#[derive(Debug, Serialize)]
390#[serde(rename = "CompleteMultipartUpload")]
391struct CompleteMultipartBodyXml {
392 #[serde(rename = "Part")]
393 part: Vec<CompletePartEntryXml>,
394}
395
396#[derive(Debug, Serialize)]
397struct CompletePartEntryXml {
398 #[serde(rename = "PartNumber")]
399 part_number: u32,
400 #[serde(rename = "ETag")]
401 e_tag: String,
402}
403
404#[derive(Debug, Serialize)]
405#[serde(rename = "Tagging")]
406struct TaggingBodyXml {
407 #[serde(rename = "TagSet")]
408 tag_set: TagSetBodyXml,
409}
410
411#[derive(Debug, Serialize)]
412struct TagSetBodyXml {
413 #[serde(rename = "Tag")]
414 tag: Vec<TagEntryXml>,
415}
416
417#[derive(Debug, Serialize)]
418struct TagEntryXml {
419 #[serde(rename = "Key")]
420 key: String,
421 #[serde(rename = "Value")]
422 value: String,
423}
424
425#[derive(Debug, Deserialize)]
426#[serde(rename = "InitiateMultipartUploadResult")]
427struct InitiateMultipartUploadResultXml {
428 #[serde(rename = "UploadId")]
429 upload_id: String,
430}
431
432#[derive(Debug, Deserialize)]
433#[serde(rename = "CompleteMultipartUploadResult")]
434struct CompleteMultipartUploadResultXml {
435 #[serde(rename = "ETag")]
436 e_tag: Option<String>,
437}
438
439#[derive(Debug, Deserialize)]
440#[serde(rename = "DeleteResult")]
441struct DeleteResultXml {
442 #[serde(rename = "Deleted", default)]
443 deleted: Vec<DeletedXml>,
444 #[serde(rename = "Error", default)]
445 errors: Vec<DeleteErrorXml>,
446}
447
448#[derive(Debug, Deserialize)]
449struct DeletedXml {
450 #[serde(rename = "Key")]
451 key: String,
452}
453
454#[derive(Debug, Deserialize)]
455struct DeleteErrorXml {
456 #[serde(rename = "Key")]
457 key: Option<String>,
458 #[serde(rename = "Code")]
459 code: Option<String>,
460 #[serde(rename = "Message")]
461 message: Option<String>,
462}
463
464#[derive(Debug, Deserialize)]
465#[serde(rename = "ListMultipartUploadsResult")]
466struct ListMultipartUploadsResultXml {
467 #[serde(rename = "Upload", default)]
468 uploads: Vec<MultipartUploadXml>,
469}
470
471#[derive(Debug, Deserialize)]
472struct MultipartUploadXml {
473 #[serde(rename = "Key")]
474 key: String,
475 #[serde(rename = "UploadId")]
476 upload_id: String,
477 #[serde(rename = "Initiated")]
478 initiated: Option<String>,
479}
480
481#[derive(Debug, Deserialize)]
482#[serde(rename = "ListPartsResult")]
483struct ListPartsResultXml {
484 #[serde(rename = "Part", default)]
485 parts: Vec<PartXml>,
486}
487
488#[derive(Debug, Deserialize)]
489struct PartXml {
490 #[serde(rename = "PartNumber")]
491 part_number: u32,
492 #[serde(rename = "ETag")]
493 e_tag: Option<String>,
494 #[serde(rename = "Size")]
495 size: Option<u64>,
496 #[serde(rename = "LastModified")]
497 last_modified: Option<String>,
498}
499
500#[derive(Debug, Deserialize)]
501#[serde(rename = "Tagging")]
502struct TaggingXml {
503 #[serde(rename = "TagSet")]
504 tag_set: TagSetXml,
505}
506
507#[derive(Debug, Deserialize)]
508struct TagSetXml {
509 #[serde(rename = "Tag", default)]
510 tag: Vec<TagXml>,
511}
512
513#[derive(Debug, Deserialize)]
514struct TagXml {
515 #[serde(rename = "Key")]
516 key: String,
517 #[serde(rename = "Value")]
518 value: String,
519}
520
521#[cfg(test)]
522mod tests {
523 use std::sync::{Arc, Mutex};
524
525 use quick_xml::de::from_str;
526 use tokio::runtime::Runtime;
527
528 use super::*;
529 use crate::client::{HttpRequest, HttpResponseData, StaticCredentials};
530
531 #[test]
532 fn builds_delete_objects_body() {
533 let xml = to_string(&DeleteBodyXml {
534 object: vec![
535 DeleteObjectEntryXml {
536 key: "a".to_string(),
537 },
538 DeleteObjectEntryXml {
539 key: "b/c".to_string(),
540 },
541 ],
542 })
543 .unwrap();
544 assert!(xml.contains("<Key>a</Key>"));
545 assert!(xml.contains("<Key>b/c</Key>"));
546 }
547
548 #[test]
549 fn parses_delete_objects_response() {
550 let xml = r#"
551<DeleteResult>
552 <Deleted><Key>a.txt</Key></Deleted>
553 <Error><Key>b.txt</Key><Code>AccessDenied</Code><Message>Denied</Message></Error>
554</DeleteResult>
555"#;
556 let parsed: DeleteResultXml = from_str(xml).unwrap();
557 assert_eq!(parsed.deleted.len(), 1);
558 assert_eq!(parsed.deleted[0].key, "a.txt");
559 assert_eq!(parsed.errors.len(), 1);
560 assert_eq!(parsed.errors[0].code.as_deref(), Some("AccessDenied"));
561 }
562
563 #[test]
564 fn delete_objects_sets_sha256_checksum_headers() {
565 #[derive(Clone)]
566 struct CapturingHttpClient {
567 request: Arc<Mutex<Option<HttpRequest>>>,
568 }
569
570 impl crate::client::HttpClient for CapturingHttpClient {
571 async fn send(&self, request: HttpRequest) -> Result<HttpResponseData, crate::client::HttpError> {
572 *self.request.lock().unwrap() = Some(request);
573 Ok(HttpResponseData {
574 status_code: 200,
575 headers: Vec::new(),
576 body: Box::pin(futures_util::stream::once(async {
577 Ok(bytes::Bytes::from_static(b"<DeleteResult />"))
578 })),
579 })
580 }
581 }
582
583 let captured = Arc::new(Mutex::new(None));
584 let http = CapturingHttpClient {
585 request: Arc::clone(&captured),
586 };
587 let cfg = crate::client::ClientConfig {
588 endpoint: "http://127.0.0.1:9000",
589 credentials: StaticCredentials {
590 access_key_id: "minioadmin",
591 secret_access_key: "minioadmin",
592 session_token: "",
593 },
594 region: "auto",
595 virtual_hosted: false,
596 };
597 let client = Client::with_http_client(&cfg, http).unwrap();
598
599 Runtime::new().unwrap().block_on(async {
600 client.delete_objects("bucket", &["a", "b/c"]).await.unwrap();
601 });
602
603 let request = captured.lock().unwrap().clone().unwrap();
604 let checksum_algorithm = request
605 .headers
606 .iter()
607 .find(|(name, _)| name.eq_ignore_ascii_case("x-amz-sdk-checksum-algorithm"))
608 .map(|(_, value)| value.as_str());
609 let checksum_sha256 = request
610 .headers
611 .iter()
612 .find(|(name, _)| name.eq_ignore_ascii_case("x-amz-checksum-sha256"))
613 .map(|(_, value)| value.as_str());
614
615 assert_eq!(checksum_algorithm, Some("SHA256"));
616 let expected_body = to_string(&DeleteBodyXml {
617 object: vec![
618 DeleteObjectEntryXml {
619 key: "a".to_string(),
620 },
621 DeleteObjectEntryXml {
622 key: "b/c".to_string(),
623 },
624 ],
625 })
626 .unwrap();
627 assert_eq!(checksum_sha256, Some(super::checksum_sha256(expected_body.as_bytes()).as_str()));
628 }
629
630 #[test]
631 fn builds_and_parses_tagging_xml() {
632 let body = to_string(&TaggingBodyXml {
633 tag_set: TagSetBodyXml {
634 tag: vec![
635 TagEntryXml {
636 key: "env".to_string(),
637 value: "dev".to_string(),
638 },
639 TagEntryXml {
640 key: "team".to_string(),
641 value: "infra".to_string(),
642 },
643 ],
644 },
645 })
646 .unwrap();
647 assert!(body.contains("<Key>env</Key><Value>dev</Value>"));
648 assert!(body.contains("<Key>team</Key><Value>infra</Value>"));
649
650 let parsed: TaggingXml =
651 from_str("<Tagging><TagSet><Tag><Key>a</Key><Value>b</Value></Tag></TagSet></Tagging>").unwrap();
652 assert_eq!(parsed.tag_set.tag.len(), 1);
653 assert_eq!(parsed.tag_set.tag[0].key, "a");
654 assert_eq!(parsed.tag_set.tag[0].value, "b");
655 }
656
657 #[test]
658 fn parses_list_parts_response() {
659 let xml = r#"
660<ListPartsResult>
661 <Part>
662 <PartNumber>1</PartNumber>
663 <ETag>"etag-1"</ETag>
664 <Size>5</Size>
665 <LastModified>2026-01-01T00:00:00.000Z</LastModified>
666 </Part>
667</ListPartsResult>
668"#;
669 let parsed: ListPartsResultXml = from_str(xml).unwrap();
670 assert_eq!(parsed.parts.len(), 1);
671 assert_eq!(parsed.parts[0].part_number, 1);
672 }
673
674 #[test]
675 fn parses_list_multipart_uploads_response() {
676 let xml = r#"
677<ListMultipartUploadsResult>
678 <Upload>
679 <Key>big.bin</Key>
680 <UploadId>upload-1</UploadId>
681 <Initiated>2026-01-01T00:00:00.000Z</Initiated>
682 </Upload>
683</ListMultipartUploadsResult>
684"#;
685 let parsed: ListMultipartUploadsResultXml = from_str(xml).unwrap();
686 assert_eq!(parsed.uploads.len(), 1);
687 assert_eq!(parsed.uploads[0].key, "big.bin");
688 assert_eq!(parsed.uploads[0].upload_id, "upload-1");
689 }
690}