Skip to main content

pg/
transaction.rs

1use std::{sync::Arc, time::Instant};
2
3use crate::{
4    config::{IdleConn, PoolInner},
5    connection::Connection,
6    encode::ToSql,
7    error::Result,
8    queryer::Queryer,
9    row::Row,
10};
11
12pub struct Transaction {
13    conn: Option<Connection>,
14    done: bool,
15    pool: Option<PoolBacking>,
16}
17
18struct PoolBacking {
19    inner: Arc<PoolInner>,
20    _permit: tokio::sync::OwnedSemaphorePermit,
21}
22
23impl Transaction {
24    pub async fn begin(conn: Connection) -> Result<Self> {
25        conn.execute_raw("BEGIN", &[]).await?;
26        Ok(Transaction {
27            conn: Some(conn),
28            done: false,
29            pool: None,
30        })
31    }
32
33    pub(crate) async fn begin_pooled(
34        conn: Connection,
35        pool: Arc<PoolInner>,
36        permit: tokio::sync::OwnedSemaphorePermit,
37    ) -> Result<Self> {
38        match conn.execute_raw("BEGIN", &[]).await {
39            Ok(_) => Ok(Transaction {
40                conn: Some(conn),
41                done: false,
42                pool: Some(PoolBacking {
43                    inner: pool,
44                    _permit: permit,
45                }),
46            }),
47            Err(e) => {
48                pool.idle.lock().await.push(IdleConn {
49                    conn,
50                    since: Instant::now(),
51                    created: Instant::now(),
52                    _permit: permit,
53                });
54                Err(e)
55            }
56        }
57    }
58
59    pub async fn commit(mut self) -> Result<()> {
60        let conn = self.conn.as_ref().expect("Transaction already completed");
61        let result = conn.execute_raw("COMMIT", &[]).await.map(|_| ());
62        self.done = true;
63        result
64    }
65
66    pub async fn rollback(mut self) -> Result<()> {
67        let conn = self.conn.as_ref().expect("Transaction already completed");
68        let result = conn.execute_raw("ROLLBACK", &[]).await.map(|_| ());
69        self.done = true;
70        result
71    }
72
73    pub async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
74        self.conn().query_raw(sql, params).await
75    }
76
77    pub async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
78        self.conn().execute_raw(sql, params).await
79    }
80
81    fn conn(&self) -> &Connection {
82        self.conn.as_ref().expect("Transaction already completed")
83    }
84}
85
86impl Drop for Transaction {
87    fn drop(&mut self) {
88        let done = self.done;
89
90        if let (Some(conn), Some(pool)) = (self.conn.take(), self.pool.take()) {
91            tokio::spawn(async move {
92                if !done {
93                    let _ = conn.execute_raw("ROLLBACK", &[]).await;
94                }
95                pool.inner.idle.lock().await.push(IdleConn {
96                    conn,
97                    since: Instant::now(),
98                    created: Instant::now(),
99                    _permit: pool._permit,
100                });
101            });
102        } else if let Some(conn) = self.conn.take() {
103            if !done {
104                tokio::spawn(async move {
105                    let _ = conn.execute_raw("ROLLBACK", &[]).await;
106                });
107            }
108        }
109    }
110}
111
112impl Queryer for Transaction {
113    async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
114        self.conn().query_raw(sql, params).await
115    }
116
117    async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
118        self.conn().execute_raw(sql, params).await
119    }
120}