1use std::{
2 sync::{Arc, atomic::Ordering},
3 time::Instant,
4};
5
6use crate::{
7 config::{ConnectParams, IdleConn, PoolConfig, PoolInner},
8 connection::Connection,
9 encode::ToSql,
10 error::{PgError, Result},
11 queryer::Queryer,
12 row::Row,
13 transaction::Transaction,
14};
15
16#[derive(Clone)]
17pub struct Pool {
18 inner: Arc<PoolInner>,
19}
20
21impl Pool {
22 pub async fn connect(params: ConnectParams) -> Result<Self> {
23 Pool::connect_with_config(params, PoolConfig::default()).await
24 }
25
26 pub async fn connect_with_config(params: ConnectParams, config: PoolConfig) -> Result<Self> {
27 let inner = PoolInner::new(params, config);
28 let pool = Pool {
29 inner,
30 };
31
32 for _ in 0..pool.inner.config.min_connections {
33 let permit = match pool.inner.semaphore.clone().acquire_owned().await {
34 Ok(p) => p,
35 Err(_) => break,
36 };
37 match Connection::connect(&pool.inner.params).await {
38 Ok(conn) => {
39 pool.inner.idle.lock().await.push(IdleConn {
40 conn,
41 since: Instant::now(),
42 created: Instant::now(),
43 _permit: permit,
44 });
45 }
46 Err(_) => {
47 permit.forget();
48 break;
49 }
50 }
51 }
52
53 let pool_clone = pool.clone();
54 tokio::spawn(async move {
55 pool_clone.reaper_loop().await;
56 });
57
58 Ok(pool)
59 }
60
61 pub async fn get(&self) -> Result<PooledConnection> {
62 if self.is_closed() {
63 return Err(PgError::PoolClosed);
64 }
65
66 {
67 let mut idle = self.inner.idle.lock().await;
68 if let Some(idle_conn) = idle.pop() {
69 return Ok(PooledConnection {
70 conn: Some(idle_conn.conn),
71 pool: self.inner.clone(),
72 _permit: Some(idle_conn._permit),
73 });
74 }
75 }
76
77 let permit =
78 tokio::time::timeout(self.inner.config.connect_timeout, self.inner.semaphore.clone().acquire_owned())
79 .await
80 .map_err(|_| PgError::PoolTimeout)?
81 .map_err(|_| PgError::PoolClosed)?;
82
83 match Connection::connect(&self.inner.params).await {
84 Ok(conn) => Ok(PooledConnection {
85 conn: Some(conn),
86 pool: self.inner.clone(),
87 _permit: Some(permit),
88 }),
89 Err(e) => {
90 permit.forget();
91 Err(e)
92 }
93 }
94 }
95
96 pub async fn begin(&self) -> Result<Transaction> {
97 let mut pooled = self.get().await?;
98 let conn = pooled.conn.take().expect("connection already taken");
99 let permit = pooled._permit.take().expect("permit already taken");
100 Transaction::begin_pooled(conn, self.inner.clone(), permit).await
101 }
102
103 pub fn is_closed(&self) -> bool {
104 self.inner.closed.load(Ordering::Acquire)
105 }
106
107 pub async fn close(&self) {
108 self.inner.closed.store(true, Ordering::Release);
109 self.inner.semaphore.close();
110 self.inner.closed_notify.notify_waiters();
111 let mut idle = self.inner.idle.lock().await;
112 idle.clear();
113 }
114
115 async fn reaper_loop(&self) {
116 loop {
117 tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
118 if self.is_closed() {
119 return;
120 }
121
122 let mut idle = self.inner.idle.lock().await;
123 let now = Instant::now();
124
125 let mut keep = Vec::new();
126 for conn in idle.drain(..) {
127 if now - conn.since < self.inner.config.idle_timeout
128 && now - conn.created < self.inner.config.max_lifetime
129 {
130 keep.push(conn);
131 }
132 }
133
134 while keep.len() > self.inner.config.min_connections as usize {
135 keep.pop();
136 }
137
138 *idle = keep;
139 }
140 }
141}
142
143impl Queryer for Pool {
144 async fn query_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<Vec<Row>> {
145 self.get().await?.query_raw(sql, params).await
146 }
147
148 async fn execute_raw(&self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
149 self.get().await?.execute_raw(sql, params).await
150 }
151}
152
153pub struct PooledConnection {
154 pub(crate) conn: Option<Connection>,
155 pub(crate) pool: Arc<PoolInner>,
156 pub(crate) _permit: Option<tokio::sync::OwnedSemaphorePermit>,
157}
158
159impl PooledConnection {
160 pub async fn ping(&self) -> Result<()> {
161 if let Some(ref conn) = self.conn {
162 conn.ping().await
163 } else {
164 Err(PgError::PoolClosed)
165 }
166 }
167
168 pub async fn begin(mut self) -> Result<Transaction> {
169 let conn = self.conn.take().expect("connection already taken");
170 let permit = self._permit.take().expect("permit already taken");
171 let pool = self.pool.clone();
172 Transaction::begin_pooled(conn, pool, permit).await
173 }
174}
175
176impl std::ops::Deref for PooledConnection {
177 type Target = Connection;
178
179 fn deref(&self) -> &Connection {
180 self.conn.as_ref().expect("PooledConnection connection already taken")
181 }
182}
183
184impl std::ops::DerefMut for PooledConnection {
185 fn deref_mut(&mut self) -> &mut Connection {
186 self.conn.as_mut().expect("PooledConnection connection already taken")
187 }
188}
189
190impl Drop for PooledConnection {
191 fn drop(&mut self) {
192 if let (Some(conn), Some(permit)) = (self.conn.take(), self._permit.take()) {
193 let pool = self.pool.clone();
194 tokio::spawn(async move {
195 pool.idle.lock().await.push(IdleConn {
196 conn,
197 since: Instant::now(),
198 created: Instant::now(),
199 _permit: permit,
200 });
201 });
202 }
203 }
204}