Add SQLx operation tracing (#5223)

* wip: vendor sqlx-tracing

* (compiles) standardize pg types used

* more standardization

* general log message improvements

* wip: improve sqlx-tracing architecture

* unify sqlx::Executor type

* wip: try fix sqlx tracing

* wip: sqlx-tracing compiles

* so close

* it compiles

* fix ci
This commit is contained in:
aecsocket
2026-01-28 13:38:57 +00:00
committed by GitHub
parent 7cb7e881fa
commit e57c15b3ce
146 changed files with 7320 additions and 801 deletions

View File

@@ -0,0 +1,103 @@
use crate::{AnyConnection, Database};
impl<'c, 's, DB> sqlx::Executor<'s> for &'s mut AnyConnection<'c, DB>
where
DB: Database,
// I attempted to have `DB::ConnectionRef<'c>` unify to `&'c mut DB::Connection`.
// This *can* be unified apparently, but we can't actually use the fact that
// `DB::ConnectionRef<'c>: sqlx::Executor` if we do this.
// So, we need a casting function in `crate::Database`.
// Maybe this can be revisited sometime to not require the casting fn.
//
// for<'a> DB: Database<ConnectionRef<'a> = &'a mut <DB as sqlx::Database>::Connection>,
{
type Database = DB;
fn fetch_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<
sqlx::Either<
<Self::Database as sqlx::Database>::QueryResult,
<Self::Database as sqlx::Database>::Row,
>,
sqlx::Error,
>,
>
where
's: 'e,
E: 'q + sqlx::Execute<'q, Self::Database>,
{
match self {
AnyConnection::Pool(pool) => {
DB::cast_connection(&mut pool.inner).fetch_many(query)
}
AnyConnection::Raw(conn) => {
DB::cast_connection(conn.inner).fetch_many(query)
}
}
}
fn fetch_optional<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
's: 'e,
E: 'q + sqlx::Execute<'q, Self::Database>,
{
match self {
AnyConnection::Pool(pool) => {
DB::cast_connection(&mut pool.inner).fetch_optional(query)
}
AnyConnection::Raw(conn) => {
DB::cast_connection(conn.inner).fetch_optional(query)
}
}
}
fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
>
where
's: 'e,
{
match self {
AnyConnection::Pool(pool) => DB::cast_connection(&mut pool.inner)
.prepare_with(sql, parameters),
AnyConnection::Raw(conn) => {
DB::cast_connection(conn.inner).prepare_with(sql, parameters)
}
}
}
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<sqlx::Describe<Self::Database>, sqlx::Error>,
>
where
's: 'e,
{
match self {
AnyConnection::Pool(pool) => {
DB::cast_connection(&mut pool.inner).describe(sql)
}
AnyConnection::Raw(conn) => {
DB::cast_connection(conn.inner).describe(sql)
}
}
}
}

View File

@@ -0,0 +1,512 @@
use futures::{StreamExt, TryStreamExt};
use tracing::Instrument;
impl<DB> AsMut<<DB as sqlx::Database>::Connection> for crate::PoolConnection<DB>
where
DB: crate::Database,
{
fn as_mut(&mut self) -> &mut <DB as sqlx::Database>::Connection {
self.inner.as_mut()
}
}
impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::PoolConnection<DB>
where
DB: crate::Database,
// impl<'a> Executor<'a> for PgConnection
for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
{
type Database = DB;
#[doc(hidden)]
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<sqlx::Describe<Self::Database>, sqlx::Error>,
>
where
'c: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.describe", attrs, sql);
let fut = self.inner.as_mut().describe(sql);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn execute<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute", attrs, sql);
let fut = self.inner.execute(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn execute_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute_many", attrs, sql);
let stream = self.inner.execute_many(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch", attrs, sql);
let stream = self.inner.fetch(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_all<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let fut = self.inner.fetch_all(query);
Box::pin(
async move {
fut.await
.inspect(|res| {
let span = tracing::Span::current();
span.record("db.response.returned_rows", res.len());
})
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<
sqlx::Either<
<Self::Database as sqlx::Database>::QueryResult,
<Self::Database as sqlx::Database>::Row,
>,
sqlx::Error,
>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let stream = self.inner.fetch_many(query);
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_one<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_one", attrs, sql);
let fut = self.inner.fetch_one(query);
Box::pin(
async move {
fut.await
.inspect(|_| {
tracing::Span::current()
.record("db.response.returned_rows", 1);
})
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_optional<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_optional", attrs, sql);
let fut = self.inner.fetch_optional(query);
Box::pin(
async move {
fut.await
.inspect(|res| {
tracing::Span::current().record(
"db.response.returned_rows",
if res.is_some() { 1 } else { 0 },
);
})
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn prepare<'e, 'q: 'e>(
self,
query: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
>
where
'c: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare", attrs, query);
let fut = self.inner.prepare(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
>
where
'c: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare_with", attrs, sql);
let fut = self.inner.prepare_with(sql, parameters);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
}
impl<'c, DB> sqlx::Executor<'c> for &'c mut crate::Connection<'c, DB>
where
DB: crate::Database,
for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
{
type Database = DB;
#[doc(hidden)]
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<sqlx::Describe<Self::Database>, sqlx::Error>,
>
where
'c: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.describe", attrs, sql);
let fut = self.inner.describe(sql);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn execute<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute", attrs, sql);
let fut = self.inner.execute(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn execute_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute_many", attrs, sql);
let stream = self.inner.execute_many(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch", attrs, sql);
let stream = self.inner.fetch(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_all<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let fut = self.inner.fetch_all(query);
Box::pin(
async move {
fut.await
.inspect(|res| {
let span = tracing::Span::current();
span.record("db.response.returned_rows", res.len());
})
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<
sqlx::Either<
<Self::Database as sqlx::Database>::QueryResult,
<Self::Database as sqlx::Database>::Row,
>,
sqlx::Error,
>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let stream = self.inner.fetch_many(query);
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_one<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_one", attrs, sql);
let fut = self.inner.fetch_one(query);
Box::pin(
async move {
fut.await
.inspect(crate::span::record_one)
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_optional<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
'c: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_optional", attrs, sql);
let fut = self.inner.fetch_optional(query);
Box::pin(
async move {
fut.await
.inspect(crate::span::record_optional)
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn prepare<'e, 'q: 'e>(
self,
query: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
>
where
'c: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare", attrs, query);
let fut = self.inner.prepare(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
>
where
'c: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare_with", attrs, sql);
let fut = self.inner.prepare_with(sql, parameters);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
}

View File

@@ -0,0 +1,219 @@
#![doc = include_str!("../README.md")]
use std::sync::Arc;
use derive_more::{Deref, DerefMut};
use futures::future::BoxFuture;
mod any_connection;
mod connection;
mod pool;
pub(crate) mod span;
mod transaction;
pub use sqlx::Executor;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "sqlite")]
pub mod sqlite;
/// Attributes describing the database connection and context.
/// Used for span enrichment and attribute propagation.
#[derive(Debug, Default)]
struct Attributes {
name: Option<String>,
host: Option<String>,
port: Option<u16>,
database: Option<String>,
}
pub trait Database: sqlx::Database {
const SYSTEM: &'static str;
/// Defines the type of reference to a database connection, equivalent to
/// `&'c mut <Self as sqlx::Database>::Connection`.
///
/// But we can't actually use the `sqlx::Database` named connection type,
/// since we can't statically prove that it implements `sqlx::Executor`.
/// Even if we unify the two types (see `any_connection.rs`), we can't use
/// connection refs as an executor. So we need this intermediate associated
/// type.
type ConnectionRef<'c>: sqlx::Executor<'c, Database = Self>;
/// Casts a `&'c mut Self::Connection` to a `Self::ConnectionRef<'c>`.
///
/// This should just return `conn`.
fn cast_connection<'c>(
conn: &'c mut <Self as sqlx::Database>::Connection,
) -> Self::ConnectionRef<'c>;
}
/// Builder for constructing a [`Pool`] with custom attributes.
///
/// Allows setting database name, host, port, and other identifying information
/// for tracing purposes.
#[derive(Debug)]
pub struct PoolBuilder<DB: Database> {
pool: sqlx::Pool<DB>,
attributes: Attributes,
}
// this is required because `pool.connect_options().to_url_lossy()` panics with sqlite
#[cfg(feature = "postgres")]
impl From<sqlx::Pool<sqlx::Postgres>> for PoolBuilder<sqlx::Postgres> {
/// Create a new builder from an existing SQLx pool.
fn from(pool: sqlx::Pool<sqlx::Postgres>) -> Self {
use sqlx::ConnectOptions;
let url = pool.connect_options().to_url_lossy();
let attributes = Attributes {
name: None,
host: url.host_str().map(String::from),
port: url.port(),
database: url
.path_segments()
.and_then(|mut segments| segments.next().map(String::from)),
};
Self { pool, attributes }
}
}
// this is required because `pool.connect_options().to_url_lossy()` panics with sqlite
#[cfg(feature = "sqlite")]
impl From<sqlx::Pool<sqlx::Sqlite>> for PoolBuilder<sqlx::Sqlite> {
/// Create a new builder from an existing SQLx pool.
fn from(pool: sqlx::Pool<sqlx::Sqlite>) -> Self {
let attributes = Attributes {
name: None,
host: pool
.connect_options()
.get_filename()
.to_str()
.map(String::from),
port: None,
database: None,
};
Self { pool, attributes }
}
}
impl<DB: Database> PoolBuilder<DB> {
/// Set a custom name for the pool (for peer.service attribute).
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.attributes.name = Some(name.into());
self
}
/// Set the database name attribute.
pub fn with_database(mut self, database: impl Into<String>) -> Self {
self.attributes.database = Some(database.into());
self
}
/// Set the host attribute.
pub fn with_host(mut self, host: impl Into<String>) -> Self {
self.attributes.host = Some(host.into());
self
}
/// Set the port attribute.
pub fn with_port(mut self, port: u16) -> Self {
self.attributes.port = Some(port);
self
}
/// Build the [`Pool`] with the configured attributes.
pub fn build(self) -> Pool<DB> {
Pool {
inner: self.pool,
attributes: Arc::new(self.attributes),
}
}
}
/// An asynchronous pool of SQLx database connections with tracing instrumentation.
///
/// Wraps a SQLx [`Pool`] and propagates tracing attributes to all acquired connections.
#[derive(Debug, Deref, DerefMut)]
pub struct Pool<DB: Database> {
#[deref]
#[deref_mut]
inner: sqlx::Pool<DB>,
attributes: Arc<Attributes>,
}
// manually impl `Clone` because `DB` may not be `Clone`
impl<DB: Database> Clone for Pool<DB> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
attributes: self.attributes.clone(),
}
}
}
impl<DB> From<sqlx::Pool<DB>> for Pool<DB>
where
DB: Database,
PoolBuilder<DB>: From<sqlx::Pool<DB>>,
{
/// Convert a SQLx [`Pool`] into a tracing-instrumented [`Pool`].
fn from(inner: sqlx::Pool<DB>) -> Self {
PoolBuilder::from(inner).build()
}
}
/// Wrapper for a mutable SQLx connection reference with tracing attributes.
///
/// Used internally for transaction and pool connection executors.
pub struct Connection<'c, DB: Database> {
inner: &'c mut DB::Connection,
attributes: Arc<Attributes>,
}
impl<'c, DB: Database> std::fmt::Debug for Connection<'c, DB> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection").finish_non_exhaustive()
}
}
/// A pooled SQLx connection instrumented for tracing.
///
/// Implements [`sqlx::Executor`] and propagates tracing attributes.
#[derive(Debug)]
pub struct PoolConnection<DB: Database> {
inner: sqlx::pool::PoolConnection<DB>,
attributes: Arc<Attributes>,
}
/// An in-progress database transaction or savepoint, instrumented for tracing.
///
/// Wraps a SQLx [`Transaction`] and propagates tracing attributes.
#[derive(Debug)]
pub struct Transaction<'c, DB: Database> {
inner: sqlx::Transaction<'c, DB>,
attributes: Arc<Attributes>,
}
/// Acquire connections or transactions from a database in a generic way.
///
/// Equivalent of [`sqlx::Acquire`] with tracing.
pub trait Acquire<'c> {
type Database: Database;
fn acquire(
self,
) -> BoxFuture<'c, Result<AnyConnection<'c, Self::Database>, sqlx::Error>>;
fn begin(
self,
) -> BoxFuture<'c, Result<Transaction<'c, Self::Database>, sqlx::Error>>;
}
#[derive(Debug)]
pub enum AnyConnection<'c, DB: Database> {
Pool(PoolConnection<DB>),
Raw(Connection<'c, DB>),
}

View File

@@ -0,0 +1,310 @@
use futures::{StreamExt, TryStreamExt, future::BoxFuture};
use tracing::Instrument;
use crate::AnyConnection;
impl<'c, DB> crate::Acquire<'c> for &'c crate::Pool<DB>
where
DB: crate::Database,
{
type Database = DB;
fn acquire(
self,
) -> BoxFuture<'c, Result<AnyConnection<'c, DB>, sqlx::Error>> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.acquire", attrs);
let fut = self.inner.acquire();
let fut = async move {
let conn = fut.await.inspect_err(crate::span::record_error)?;
let conn = crate::PoolConnection {
inner: conn,
attributes: self.attributes.clone(),
};
let conn = AnyConnection::Pool(conn);
Ok::<_, sqlx::Error>(conn)
};
Box::pin(fut.instrument(span))
}
fn begin(
self,
) -> BoxFuture<
'c,
Result<crate::Transaction<'c, Self::Database>, sqlx::Error>,
> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.begin", attrs);
let fut = self.inner.begin();
Box::pin(
async move {
let txn = fut.await.inspect_err(crate::span::record_error)?;
let txn = crate::Transaction {
inner: txn,
attributes: self.attributes.clone(),
};
Ok::<_, sqlx::Error>(txn)
}
.instrument(span),
)
}
}
impl<DB: crate::Database> crate::Pool<DB> {
/// Retrieves a connection and immediately begins a new transaction.
///
/// The returned [`Transaction`] is instrumented for tracing.
///
/// [`Transaction`]: crate::Transaction
pub async fn begin(
&self,
) -> Result<crate::Transaction<'static, DB>, sqlx::Error> {
self.inner.begin().await.map(|inner| crate::Transaction {
inner,
attributes: self.attributes.clone(),
})
}
/// Acquires a pooled connection, instrumented for tracing.
pub async fn acquire(
&self,
) -> Result<crate::PoolConnection<DB>, sqlx::Error> {
self.inner
.acquire()
.await
.map(|inner| crate::PoolConnection {
attributes: self.attributes.clone(),
inner,
})
}
}
impl<'p, DB> sqlx::Executor<'p> for &crate::Pool<DB>
where
DB: crate::Database,
for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
{
type Database = DB;
#[doc(hidden)]
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<sqlx::Describe<Self::Database>, sqlx::Error>,
> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.describe", attrs, sql);
let fut = self.inner.describe(sql);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn execute<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute", attrs, sql);
let fut = self.inner.execute(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn execute_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute_many", attrs, sql);
let stream = self.inner.execute_many(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch", attrs, sql);
let stream = self.inner.fetch(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_all<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let fut = self.inner.fetch_all(query);
Box::pin(
async move {
fut.await
.inspect(|res| {
let span = tracing::Span::current();
span.record("db.response.returned_rows", res.len());
})
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<
sqlx::Either<
<Self::Database as sqlx::Database>::QueryResult,
<Self::Database as sqlx::Database>::Row,
>,
sqlx::Error,
>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let stream = self.inner.fetch_many(query);
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_one<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_one", attrs, sql);
let fut = self.inner.fetch_one(query);
Box::pin(
async move {
fut.await
.inspect(crate::span::record_one)
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_optional<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_optional", attrs, sql);
let fut = self.inner.fetch_optional(query);
Box::pin(
async move {
fut.await
.inspect(crate::span::record_optional)
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn prepare<'e, 'q: 'e>(
self,
query: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare", attrs, query);
let fut = self.inner.prepare(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare_with", attrs, sql);
let fut = self.inner.prepare_with(sql, parameters);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
}

View File

@@ -0,0 +1,23 @@
impl crate::Database for sqlx::Postgres {
const SYSTEM: &'static str = "postgresql";
type ConnectionRef<'a> = &'a mut sqlx::PgConnection;
fn cast_connection<'c>(
conn: &'c mut <Self as sqlx::Database>::Connection,
) -> Self::ConnectionRef<'c> {
conn
}
// fn cast_pool_connection<'c>(
// conn: &'c mut PoolConnection<Self>,
// ) -> Self::PoolConnection<'c> {
// &mut conn.inner
// }
// fn cast_raw_connection<'c>(
// conn: &'c mut <Self as sqlx::Database>::Connection,
// ) -> Self::RawConnection<'c> {
// conn
// }
}

View File

@@ -0,0 +1,88 @@
/// Macro to create a tracing span for a SQLx operation with OpenTelemetry-compatible fields.
///
/// - `$name`: The operation name (e.g., "sqlx.execute").
/// - `$statement`: The SQL statement being executed.
/// - `$attributes`: Connection or pool attributes for peer and db context.
///
/// This macro is used internally by the crate to instrument all major SQLx operations.
#[macro_export]
macro_rules! instrument {
($name:expr, $attributes:expr $(, $statement:expr)? ) => {
tracing::info_span!(
$name,
// Database name (if available)
"db.name" = $attributes.database,
// Operation type (filled by SQLx or left empty)
"db.operation" = ::tracing::field::Empty,
// The SQL query text
$( "db.query.text" = $statement, )?
// Number of affected rows (to be filled after execution)
"db.response.affected_rows" = ::tracing::field::Empty,
// Number of returned rows (to be filled after execution)
"db.response.returned_rows" = ::tracing::field::Empty,
// Status code of the response (to be filled after execution)
"db.response.status_code" = ::tracing::field::Empty,
// Table name (optional, left empty)
"db.sql.table" = ::tracing::field::Empty,
// Database system (e.g., "postgresql", "sqlite")
"db.system.name" = DB::SYSTEM,
// Error type, message, and stacktrace (to be filled on error)
"error.type" = ::tracing::field::Empty,
"error.message" = ::tracing::field::Empty,
"error.stacktrace" = ::tracing::field::Empty,
// Peer (server) host and port
"net.peer.name" = $attributes.host,
"net.peer.port" = $attributes.port,
// OpenTelemetry semantic fields
"otel.kind" = "client",
"otel.status_code" = ::tracing::field::Empty,
"otel.status_description" = ::tracing::field::Empty,
// Peer service name (if set)
"peer.service" = $attributes.name,
)
};
}
/// Records that a single row was returned in the current tracing span.
/// Used for fetch_one operations.
pub fn record_one<T>(_value: &T) {
let span = tracing::Span::current();
span.record("db.response.returned_rows", 1);
}
/// Records whether an optional row was returned in the current tracing span.
/// Used for fetch_optional operations.
pub fn record_optional<T>(value: &Option<T>) {
let span = tracing::Span::current();
span.record(
"db.response.returned_rows",
if value.is_some() { 1 } else { 0 },
);
}
/// Records error details in the current tracing span for a SQLx error.
/// Sets OpenTelemetry status and error fields for observability backends.
pub fn record_error(err: &sqlx::Error) {
let span = tracing::Span::current();
// Mark the span as an error for OpenTelemetry
span.record("otel.status_code", "error");
span.record("otel.status_description", err.to_string());
// Classify error type as client or server
match err {
sqlx::Error::ColumnIndexOutOfBounds { .. }
| sqlx::Error::ColumnDecode { .. }
| sqlx::Error::ColumnNotFound(_)
| sqlx::Error::Decode { .. }
| sqlx::Error::Encode { .. }
| sqlx::Error::RowNotFound
| sqlx::Error::TypeNotFound { .. } => {
span.record("error.type", "client");
}
_ => {
span.record("error.type", "server");
}
}
// Attach error message and stacktrace for debugging
span.record("error.message", err.to_string());
span.record("error.stacktrace", format!("{err:?}"));
}

View File

@@ -0,0 +1,11 @@
impl crate::Database for sqlx::Sqlite {
const SYSTEM: &'static str = "sqlite";
type ConnectionRef<'a> = &'a mut sqlx::SqliteConnection;
fn cast_connection<'c>(
conn: &'c mut <Self as sqlx::Database>::Connection,
) -> Self::ConnectionRef<'c> {
conn
}
}

View File

@@ -0,0 +1,334 @@
use futures::{StreamExt, TryStreamExt, future::BoxFuture};
use sqlx::{Acquire, Error};
use tracing::Instrument;
use crate::AnyConnection;
impl<'c, DB> crate::Transaction<'c, DB>
where
DB: crate::Database,
{
/// Returns a tracing-instrumented executor for this transaction.
///
/// This allows running queries with full span context and attributes.
pub fn executor(&mut self) -> crate::Connection<'_, DB> {
crate::Connection {
inner: &mut *self.inner,
attributes: self.attributes.clone(),
}
}
/// Commits this transaction or savepoint.
pub async fn commit(self) -> Result<(), Error> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.commit", attrs);
let fut = self.inner.commit();
fut.instrument(span).await
}
/// Aborts this transaction or savepoint.
pub async fn rollback(self) -> Result<(), Error> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.rollback", attrs);
let fut = self.inner.rollback();
fut.instrument(span).await
}
}
impl<'c, 't, DB> crate::Acquire<'t> for &'t mut crate::Transaction<'c, DB>
where
DB: crate::Database,
{
type Database = DB;
#[inline]
fn acquire(
self,
) -> BoxFuture<'t, Result<AnyConnection<'t, DB>, sqlx::Error>> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.acquire", attrs);
let fut = self.inner.acquire();
let fut = async move {
let conn = fut.await.inspect_err(crate::span::record_error)?;
let conn = crate::Connection {
inner: conn,
attributes: attrs.clone(),
};
let conn = AnyConnection::Raw(conn);
Ok(conn)
};
Box::pin(fut.instrument(span))
}
fn begin(
self,
) -> BoxFuture<
't,
Result<crate::Transaction<'t, Self::Database>, sqlx::Error>,
> {
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.begin", attrs);
let fut = self.inner.begin();
let fut = async move {
let txn = fut.await.inspect_err(crate::span::record_error)?;
let txn = crate::Transaction {
inner: txn,
attributes: attrs.clone(),
};
Ok(txn)
};
Box::pin(fut.instrument(span))
}
}
/// Implements `sqlx::Executor` for a mutable reference to a tracing-instrumented transaction.
///
/// Each method creates a tracing span for the SQL operation, attaches relevant attributes,
/// and records errors or row counts as appropriate for observability.
impl<'c, 't, DB> sqlx::Executor<'t> for &'t mut crate::Transaction<'c, DB>
where
DB: crate::Database,
for<'a> &'a mut DB::Connection: sqlx::Executor<'a, Database = DB>,
{
type Database = DB;
#[doc(hidden)]
fn describe<'e, 'q: 'e>(
self,
sql: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<sqlx::Describe<Self::Database>, sqlx::Error>,
>
where
't: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.describe", attrs, sql);
Box::pin(
async move {
let fut = (&mut self.inner).describe(sql);
fut.await.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn execute<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
't: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute", attrs, sql);
let fut = (&mut self.inner).execute(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn execute_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::QueryResult, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
't: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.execute_many", attrs, sql);
let stream = (&mut self.inner).execute_many(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
't: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch", attrs, sql);
let stream = (&mut self.inner).fetch(query);
use futures::StreamExt;
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_all<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Vec<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
't: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let fut = (&mut self.inner).fetch_all(query);
Box::pin(
async move {
fut.await
.inspect(|res| {
let span = tracing::Span::current();
span.record("db.response.returned_rows", res.len());
})
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_many<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::stream::BoxStream<
'e,
Result<
sqlx::Either<
<Self::Database as sqlx::Database>::QueryResult,
<Self::Database as sqlx::Database>::Row,
>,
sqlx::Error,
>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
't: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_all", attrs, sql);
let stream = (&mut self.inner).fetch_many(query);
Box::pin(
stream
.inspect(move |_| {
let _enter = span.enter();
})
.inspect_err(crate::span::record_error),
)
}
fn fetch_one<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Row, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
't: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_one", attrs, sql);
let fut = (&mut self.inner).fetch_one(query);
Box::pin(
async move {
fut.await
.inspect(crate::span::record_one)
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn fetch_optional<'e, 'q: 'e, E>(
self,
query: E,
) -> futures::future::BoxFuture<
'e,
Result<Option<<Self::Database as sqlx::Database>::Row>, sqlx::Error>,
>
where
E: 'q + sqlx::Execute<'q, Self::Database>,
't: 'e,
{
let sql = query.sql();
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.fetch_optional", attrs, sql);
let fut = (&mut self.inner).fetch_optional(query);
Box::pin(
async move {
fut.await
.inspect(crate::span::record_optional)
.inspect_err(crate::span::record_error)
}
.instrument(span),
)
}
fn prepare<'e, 'q: 'e>(
self,
query: &'q str,
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
>
where
't: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare", attrs, query);
let fut = (&mut self.inner).prepare(query);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
fn prepare_with<'e, 'q: 'e>(
self,
sql: &'q str,
parameters: &'e [<Self::Database as sqlx::Database>::TypeInfo],
) -> futures::future::BoxFuture<
'e,
Result<<Self::Database as sqlx::Database>::Statement<'q>, sqlx::Error>,
>
where
't: 'e,
{
let attrs = &self.attributes;
let span = crate::instrument!("sqlx.prepare_with", attrs, sql);
let fut = (&mut self.inner).prepare_with(sql, parameters);
Box::pin(
async move { fut.await.inspect_err(crate::span::record_error) }
.instrument(span),
)
}
}