Skip to main content

pg/
pool.rs

1use std::{
2    sync::{Arc, atomic::Ordering},
3    time::Instant,
4};
5
6use crate::{
7    config::{ConnectParams, IdleConn, PoolConfig, PoolInner},
8    connection::Connection,
9    encode::ToSql,
10    error::{PgError, Result},
11    queryer::Queryer,
12    row::Row,
13    transaction::Transaction,
14};
15
16#[derive(Clone)]
17pub struct Pool {
18    inner: Arc<PoolInner>,
19}
20
21impl Pool {
22    pub async fn connect(params: ConnectParams) -> Result<Self> {
23        Pool::connect_with_config(params, PoolConfig::default()).await
24    }
25
26    pub async fn connect_with_config(params: ConnectParams, config: PoolConfig) -> Result<Self> {
27        let inner = PoolInner::new(params, config);
28        let pool = Pool {
29            inner,
30        };
31
32        for _ in 0..pool.inner.config.min_connections {
33            let permit = match pool.inner.semaphore.clone().acquire_owned().await {
34                Ok(p) => p,
35                Err(_) => break,
36            };
37            match Connection::connect(&pool.inner.params).await {
38                Ok(conn) => {
39                    pool.inner.idle.lock().await.push(IdleConn {
40                        conn,
41                        since: Instant::now(),
42                        created: Instant::now(),
43                        _permit: permit,
44                    });
45                }
46                Err(_) => {
47                    permit.forget();
48                    break;
49                }
50            }
51        }
52
53        let pool_clone = pool.clone();
54        tokio::spawn(async move {
55            pool_clone.reaper_loop().await;
56        });
57
58        Ok(pool)
59    }
60
61    pub async fn get(&self) -> Result<PooledConnection> {
62        if self.is_closed() {
63            return Err(PgError::PoolClosed);
64        }
65
66        {
67            let mut idle = self.inner.idle.lock().await;
68            if let Some(idle_conn) = idle.pop() {
69                return Ok(PooledConnection {
70                    conn: Some(idle_conn.conn),
71                    pool: self.inner.clone(),
72                    _permit: Some(idle_conn._permit),
73                });
74            }
75        }
76
77        let permit =
78            tokio::time::timeout(self.inner.config.connect_timeout, self.inner.semaphore.clone().acquire_owned())
79                .await
80                .map_err(|_| PgError::PoolTimeout)?
81                .map_err(|_| PgError::PoolClosed)?;
82
83        match Connection::connect(&self.inner.params).await {
84            Ok(conn) => Ok(PooledConnection {
85                conn: Some(conn),
86                pool: self.inner.clone(),
87                _permit: Some(permit),
88            }),
89            Err(e) => {
90                permit.forget();
91                Err(e)
92            }
93        }
94    }
95
96    pub async fn begin(&self) -> Result<Transaction> {
97        let mut pooled = self.get().await?;
98        let conn = pooled.conn.take().expect("connection already taken");
99        let permit = pooled._permit.take().expect("permit already taken");
100        Transaction::begin_pooled(conn, self.inner.clone(), permit).await
101    }
102
103    pub fn is_closed(&self) -> bool {
104        self.inner.closed.load(Ordering::Acquire)
105    }
106
107    pub async fn close(&self) {
108        self.inner.closed.store(true, Ordering::Release);
109        self.inner.semaphore.close();
110        self.inner.closed_notify.notify_waiters();
111        let mut idle = self.inner.idle.lock().await;
112        idle.clear();
113    }
114
115    async fn reaper_loop(&self) {
116        loop {
117            tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
118            if self.is_closed() {
119                return;
120            }
121
122            let mut idle = self.inner.idle.lock().await;
123            let now = Instant::now();
124
125            let mut keep = Vec::new();
126            for conn in idle.drain(..) {
127                if now - conn.since < self.inner.config.idle_timeout
128                    && now - conn.created < self.inner.config.max_lifetime
129                {
130                    keep.push(conn);
131                }
132            }
133
134            while keep.len() > self.inner.config.min_connections as usize {
135                keep.pop();
136            }
137
138            *idle = keep;
139        }
140    }
141}
142
143impl Queryer for Pool {
144    async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
145        self.get().await?.query_raw(sql, params).await
146    }
147
148    async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
149        self.get().await?.execute_raw(sql, params).await
150    }
151}
152
153pub struct PooledConnection {
154    pub(crate) conn: Option<Connection>,
155    pub(crate) pool: Arc<PoolInner>,
156    pub(crate) _permit: Option<tokio::sync::OwnedSemaphorePermit>,
157}
158
159impl PooledConnection {
160    pub async fn ping(&self) -> Result<()> {
161        if let Some(ref conn) = self.conn {
162            conn.ping().await
163        } else {
164            Err(PgError::PoolClosed)
165        }
166    }
167
168    pub async fn begin(mut self) -> Result<Transaction> {
169        let conn = self.conn.take().expect("connection already taken");
170        let permit = self._permit.take().expect("permit already taken");
171        let pool = self.pool.clone();
172        Transaction::begin_pooled(conn, pool, permit).await
173    }
174}
175
176impl std::ops::Deref for PooledConnection {
177    type Target = Connection;
178
179    fn deref(&self) -> &Connection {
180        self.conn.as_ref().expect("PooledConnection connection already taken")
181    }
182}
183
184impl std::ops::DerefMut for PooledConnection {
185    fn deref_mut(&mut self) -> &mut Connection {
186        self.conn.as_mut().expect("PooledConnection connection already taken")
187    }
188}
189
190impl Drop for PooledConnection {
191    fn drop(&mut self) {
192        if let (Some(conn), Some(permit)) = (self.conn.take(), self._permit.take()) {
193            let pool = self.pool.clone();
194            tokio::spawn(async move {
195                pool.idle.lock().await.push(IdleConn {
196                    conn,
197                    since: Instant::now(),
198                    created: Instant::now(),
199                    _permit: permit,
200                });
201            });
202        }
203    }
204}