sqlx-record/sqlx-record-ctl/src/main.rs

231 lines
9.0 KiB
Rust

use clap::Parser;
use dotenv::dotenv;
use sqlx::Row;
use std::env;
use url::Url;
#[cfg(feature = "mysql")]
use sqlx::mysql::MySqlPool as Pool;
#[cfg(feature = "postgres")]
use sqlx::postgres::PgPool as Pool;
#[cfg(feature = "sqlite")]
use sqlx::sqlite::SqlitePool as Pool;
/// Command line arguments
#[derive(Parser, Debug)]
#[command(author, version, about = "CLI tool for managing sqlx-record audit tables", long_about = None)]
struct Args {
/// The name of the schema to operate on
#[arg(long, short)]
schema_name: Option<String>,
/// The database URL, optionally provided. Defaults to the DATABASE_URL environment variable.
#[arg(long, short)]
db_url: Option<String>,
#[arg(long, short)]
env: Option<String>,
#[arg(long)]
delete: bool,
}
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
// Parse command-line arguments
let args = Args::parse();
// Load environment variables from .env file
if let Some(env) = args.env {
dotenv::from_filename(env).ok();
} else {
dotenv().ok();
}
// Use provided --db-url or fallback to the DATABASE_URL from .env
let database_url = args
.db_url
.unwrap_or_else(|| env::var("DATABASE_URL").expect("DATABASE_URL must be set"));
// Connect to the database
let pool = Pool::connect(&database_url).await?;
// Use the schema name from command line arguments
let schema_name = args.schema_name.unwrap_or_else(|| {
let parsed_url = Url::parse(&database_url).expect("Failed to parse database URL");
parsed_url
.path_segments()
.and_then(|segments| segments.last())
.map(|db_name| db_name.to_string())
.expect("No schema (database) name found in the database URL")
});
// Find all tables marked as auditable in the metadata table
let tables: Vec<String> = sqlx::query(
"SELECT table_name FROM entity_changes_metadata WHERE is_auditable = TRUE"
)
.fetch_all(&pool)
.await?
.iter()
.map(|row| row.get::<String, _>("table_name"))
.collect();
// Iterate over each table and create/delete an entity_changes table
for table_name in tables {
println!("table_name: {}", table_name);
let entity_changes_table = format!("entity_changes_{}", table_name);
if args.delete {
println!("delete table: {}", entity_changes_table);
#[cfg(feature = "mysql")]
let drop_stmt = format!("DROP TABLE IF EXISTS {}.{}", schema_name, entity_changes_table);
#[cfg(feature = "postgres")]
let drop_stmt = format!("DROP TABLE IF EXISTS \"{}\".\"{}\"", schema_name, entity_changes_table);
#[cfg(feature = "sqlite")]
let drop_stmt = format!("DROP TABLE IF EXISTS \"{}\"", entity_changes_table);
sqlx::query(&drop_stmt).execute(&pool).await?;
} else {
println!("create table: {}", entity_changes_table);
// Create table with database-specific syntax
#[cfg(feature = "mysql")]
{
sqlx::query(&format!(
"CREATE TABLE IF NOT EXISTS {}.{} (
id BINARY(16) PRIMARY KEY,
entity_id BINARY(16) NOT NULL,
action ENUM('insert', 'update', 'delete', 'restore', 'hard-delete') NOT NULL,
changed_at BIGINT NOT NULL,
actor_id BINARY(16) NOT NULL,
session_id BINARY(16) NOT NULL,
change_set_id BINARY(16) NOT NULL,
new_value JSON
);",
schema_name, entity_changes_table,
)).execute(&pool).await?;
// Create indexes
sqlx::query(&format!(
"CREATE INDEX IF NOT EXISTS idx_{}_entity_id ON {}.{} (entity_id);",
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
"CREATE INDEX IF NOT EXISTS idx_{}_change_set_id ON {}.{} (change_set_id);",
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
"CREATE INDEX IF NOT EXISTS idx_{}_session_id ON {}.{} (session_id);",
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
"CREATE INDEX IF NOT EXISTS idx_{}_actor_id ON {}.{} (actor_id);",
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
"CREATE INDEX IF NOT EXISTS idx_{}_entity_id_actor_id ON {}.{} (entity_id, actor_id);",
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
}
#[cfg(feature = "postgres")]
{
sqlx::query(&format!(
r#"CREATE TABLE IF NOT EXISTS "{}"."{}" (
id UUID PRIMARY KEY,
entity_id UUID NOT NULL,
action VARCHAR(20) NOT NULL CHECK (action IN ('insert', 'update', 'delete', 'restore', 'hard-delete')),
changed_at BIGINT NOT NULL,
actor_id UUID NOT NULL,
session_id UUID NOT NULL,
change_set_id UUID NOT NULL,
new_value JSONB
);"#,
schema_name, entity_changes_table,
)).execute(&pool).await?;
// Create indexes
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_entity_id ON "{}"."{}" (entity_id);"#,
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_change_set_id ON "{}"."{}" (change_set_id);"#,
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_session_id ON "{}"."{}" (session_id);"#,
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_actor_id ON "{}"."{}" (actor_id);"#,
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_entity_id_actor_id ON "{}"."{}" (entity_id, actor_id);"#,
entity_changes_table, schema_name, entity_changes_table,
)).execute(&pool).await?;
}
#[cfg(feature = "sqlite")]
{
sqlx::query(&format!(
r#"CREATE TABLE IF NOT EXISTS "{}" (
id BLOB PRIMARY KEY,
entity_id BLOB NOT NULL,
action TEXT NOT NULL CHECK (action IN ('insert', 'update', 'delete', 'restore', 'hard-delete')),
changed_at INTEGER NOT NULL,
actor_id BLOB NOT NULL,
session_id BLOB NOT NULL,
change_set_id BLOB NOT NULL,
new_value TEXT
);"#,
entity_changes_table,
)).execute(&pool).await?;
// Create indexes
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_entity_id ON "{}" (entity_id);"#,
entity_changes_table, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_change_set_id ON "{}" (change_set_id);"#,
entity_changes_table, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_session_id ON "{}" (session_id);"#,
entity_changes_table, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_actor_id ON "{}" (actor_id);"#,
entity_changes_table, entity_changes_table,
)).execute(&pool).await?;
sqlx::query(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_{}_entity_id_actor_id ON "{}" (entity_id, actor_id);"#,
entity_changes_table, entity_changes_table,
)).execute(&pool).await?;
}
println!("Created indexes for {}", entity_changes_table);
}
}
Ok(())
}