1use std::{sync::Arc, time::Instant};
2
3use crate::{
4 config::{IdleConn, PoolInner},
5 connection::Connection,
6 encode::ToSql,
7 error::Result,
8 queryer::Queryer,
9 row::Row,
10};
11
12pub struct Transaction {
13 conn: Option<Connection>,
14 done: bool,
15 pool: Option<PoolBacking>,
16}
17
18struct PoolBacking {
19 inner: Arc<PoolInner>,
20 _permit: tokio::sync::OwnedSemaphorePermit,
21}
22
23impl Transaction {
24 pub async fn begin(conn: Connection) -> Result<Self> {
25 conn.execute_raw("BEGIN", &[]).await?;
26 Ok(Transaction {
27 conn: Some(conn),
28 done: false,
29 pool: None,
30 })
31 }
32
33 pub(crate) async fn begin_pooled(
34 conn: Connection,
35 pool: Arc<PoolInner>,
36 permit: tokio::sync::OwnedSemaphorePermit,
37 ) -> Result<Self> {
38 match conn.execute_raw("BEGIN", &[]).await {
39 Ok(_) => Ok(Transaction {
40 conn: Some(conn),
41 done: false,
42 pool: Some(PoolBacking {
43 inner: pool,
44 _permit: permit,
45 }),
46 }),
47 Err(e) => {
48 pool.idle.lock().await.push(IdleConn {
49 conn,
50 since: Instant::now(),
51 created: Instant::now(),
52 _permit: permit,
53 });
54 Err(e)
55 }
56 }
57 }
58
59 pub async fn commit(mut self) -> Result<()> {
60 let conn = self.conn.as_ref().expect("Transaction already completed");
61 let result = conn.execute_raw("COMMIT", &[]).await.map(|_| ());
62 self.done = true;
63 result
64 }
65
66 pub async fn rollback(mut self) -> Result<()> {
67 let conn = self.conn.as_ref().expect("Transaction already completed");
68 let result = conn.execute_raw("ROLLBACK", &[]).await.map(|_| ());
69 self.done = true;
70 result
71 }
72
73 pub async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
74 self.conn().query_raw(sql, params).await
75 }
76
77 pub async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
78 self.conn().execute_raw(sql, params).await
79 }
80
81 fn conn(&self) -> &Connection {
82 self.conn.as_ref().expect("Transaction already completed")
83 }
84}
85
86impl Drop for Transaction {
87 fn drop(&mut self) {
88 let done = self.done;
89
90 if let (Some(conn), Some(pool)) = (self.conn.take(), self.pool.take()) {
91 tokio::spawn(async move {
92 if !done {
93 let _ = conn.execute_raw("ROLLBACK", &[]).await;
94 }
95 pool.inner.idle.lock().await.push(IdleConn {
96 conn,
97 since: Instant::now(),
98 created: Instant::now(),
99 _permit: pool._permit,
100 });
101 });
102 } else if let Some(conn) = self.conn.take() {
103 if !done {
104 tokio::spawn(async move {
105 let _ = conn.execute_raw("ROLLBACK", &[]).await;
106 });
107 }
108 }
109 }
110}
111
112impl Queryer for Transaction {
113 async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
114 self.conn().query_raw(sql, params).await
115 }
116
117 async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
118 self.conn().execute_raw(sql, params).await
119 }
120}