1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use tokio_stream::Stream;
7
8use crate::{connection::Connection, decode::FromSql, encode::ToSql, error::Result, row::Row};
9
10pub trait Queryer: Send + Sync {
14 async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>>;
15
16 async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64>;
17
18 async fn query<T: FromSql>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Vec<T>>> {
19 let rows = self.query_raw(sql, params).await?;
20 let mut result: Vec<Vec<T>> = Vec::new();
21 for row in &rows {
22 let val: T = row.try_get_by_index(0)?;
23 match result.last_mut() {
24 Some(last) => last.push(val),
25 None => {
26 result.push(vec![val]);
27 }
28 }
29 }
30 Ok(result)
31 }
32
33 async fn query_as<T: FromRow>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<T>> {
34 let rows = self.query_raw(sql, params).await?;
35 let mut result = Vec::with_capacity(rows.len());
36 for row in &rows {
37 result.push(T::from_row(row)?);
38 }
39 Ok(result)
40 }
41
42 async fn query_one_as<T: FromRow>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<T> {
43 let rows = self.query_raw(sql, params).await?;
44 match rows.first() {
45 Some(row) => T::from_row(row),
46 None => Err(crate::error::PgError::RowNotFound),
47 }
48 }
49
50 async fn query_first_as<T: FromRow>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Option<T>> {
51 let rows = self.query_raw(sql, params).await?;
52 match rows.into_iter().next() {
53 Some(row) => T::from_row(&row).map(Some),
54 None => Ok(None),
55 }
56 }
57
58 async fn query_first<T: FromSql>(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Option<T>> {
59 let mut rows = self.query_raw(sql, params).await?;
60 match rows.first_mut() {
61 Some(row) => row.try_get_by_index(0).map(Some),
62 None => Ok(None),
63 }
64 }
65}
66
67impl Queryer for Connection {
68 async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
69 Connection::query_raw(self, sql, params).await
70 }
71
72 async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
73 Connection::execute_raw(self, sql, params).await
74 }
75}
76
77impl<T: Queryer + Send + Sync + ?Sized> Queryer for &T {
78 async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
79 (**self).query_raw(sql, params).await
80 }
81
82 async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
83 (**self).execute_raw(sql, params).await
84 }
85}
86
87pub trait FromRow: Sized {
91 fn from_row(row: &Row) -> Result<Self>;
92}
93
94pub struct RowStream {
95 inner: tokio::sync::mpsc::Receiver<Result<Row>>,
96}
97
98impl Stream for RowStream {
99 type Item = Result<Row>;
100
101 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102 self.inner.poll_recv(cx)
103 }
104}