Skip to main content

pg/
queryer.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use tokio_stream::Stream;
7
8use crate::{connection::Connection, decode::FromSql, encode::ToSql, error::Result, row::Row};
9
10/// Trait for anything that can execute SQL queries.
11/// Implemented for Connection, Pool, Transaction.
12/// All methods take &self thanks to interior mutability.
13pub trait Queryer: Send + Sync {
14    async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>>;
15
16    async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64>;
17
18    async fn query<T: FromSql>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Vec<T>>> {
19        let rows = self.query_raw(sql, params).await?;
20        let mut result: Vec<Vec<T>> = Vec::new();
21        for row in &rows {
22            let val: T = row.try_get_by_index(0)?;
23            match result.last_mut() {
24                Some(last) => last.push(val),
25                None => {
26                    result.push(vec![val]);
27                }
28            }
29        }
30        Ok(result)
31    }
32
33    async fn query_as<T: FromRow>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<T>> {
34        let rows = self.query_raw(sql, params).await?;
35        let mut result = Vec::with_capacity(rows.len());
36        for row in &rows {
37            result.push(T::from_row(row)?);
38        }
39        Ok(result)
40    }
41
42    async fn query_one_as<T: FromRow>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<T> {
43        let rows = self.query_raw(sql, params).await?;
44        match rows.first() {
45            Some(row) => T::from_row(row),
46            None => Err(crate::error::PgError::RowNotFound),
47        }
48    }
49
50    async fn query_first_as<T: FromRow>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Option<T>> {
51        let rows = self.query_raw(sql, params).await?;
52        match rows.into_iter().next() {
53            Some(row) => T::from_row(&row).map(Some),
54            None => Ok(None),
55        }
56    }
57
58    async fn query_first<T: FromSql>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Option<T>> {
59        let mut rows = self.query_raw(sql, params).await?;
60        match rows.first_mut() {
61            Some(row) => row.try_get_by_index(0).map(Some),
62            None => Ok(None),
63        }
64    }
65}
66
67impl Queryer for Connection {
68    async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
69        Connection::query_raw(self, sql, params).await
70    }
71
72    async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
73        Connection::execute_raw(self, sql, params).await
74    }
75}
76
77impl<T: Queryer + Send + Sync + ?Sized> Queryer for &T {
78    async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
79        (**self).query_raw(sql, params).await
80    }
81
82    async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
83        (**self).execute_raw(sql, params).await
84    }
85}
86
87/// Trait for types that can be constructed from a database Row.
88/// Used by query_as, query_one_as, query_first_as.
89/// Typically derived with #[derive(FromRow)].
90pub trait FromRow: Sized {
91    fn from_row(row: &Row) -> Result<Self>;
92}
93
94pub struct RowStream {
95    inner: tokio::sync::mpsc::Receiver<Result<Row>>,
96}
97
98impl Stream for RowStream {
99    type Item = Result<Row>;
100
101    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        self.inner.poll_recv(cx)
103    }
104}