diff --git a/.github/workflows/examples.yml b/.github/workflows/examples.yml index 1e79913108..eccdaeaf73 100644 --- a/.github/workflows/examples.yml +++ b/.github/workflows/examples.yml @@ -181,20 +181,6 @@ jobs: command: run args: -p sqlx-example-postgres-json - - name: Listen (Setup) - working-directory: examples/postgres/listen - env: - DATABASE_URL: postgres://postgres:password@localhost:5432/listen - run: sqlx db create - - - name: Listen (Run) - uses: actions-rs/cargo@v1 - env: - DATABASE_URL: postgres://postgres:password@localhost:5432/listen - with: - command: run - args: -p sqlx-example-postgres-listen - - name: Mockable TODOs (Setup) working-directory: examples/postgres/mockable-todos env: diff --git a/Cargo.lock b/Cargo.lock index ae48df55a1..de6788b3dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -574,6 +574,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + [[package]] name = "cast" version = "0.3.0" @@ -916,6 +922,31 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossterm" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67" +dependencies = [ + "bitflags 1.3.2", + "crossterm_winapi", + "libc", + "mio", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crypto-common" version = "0.1.5" @@ -2908,6 +2939,27 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "732768f1176d21d09e076c23a93123d40bba92d50c4058da34d45c8de8e682b9" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -3137,35 +3189,38 @@ dependencies = [ ] [[package]] -name = "sqlx-example-postgres-files" +name = "sqlx-example-postgres-chat" version = "0.1.0" dependencies = [ - "anyhow", - "dotenvy", + "crossterm", + "futures", "sqlx", "tokio", + "tui", + "unicode-width", ] [[package]] -name = "sqlx-example-postgres-json" +name = "sqlx-example-postgres-files" version = "0.1.0" dependencies = [ "anyhow", "dotenvy", - "futures", - "serde", - "serde_json", "sqlx", - "structopt", "tokio", ] [[package]] -name = "sqlx-example-postgres-listen" +name = "sqlx-example-postgres-json" version = "0.1.0" dependencies = [ + "anyhow", + "dotenvy", "futures", + "serde", + "serde_json", "sqlx", + "structopt", "tokio", ] @@ -3754,6 +3809,19 @@ dependencies = [ "toml", ] +[[package]] +name = "tui" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccdd26cbd674007e649a272da4475fb666d3aa0ad0531da7136db6fab0e5bad1" +dependencies = [ + "bitflags 1.3.2", + "cassowary", + "crossterm", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "typenum" version = "1.15.0" diff --git a/Cargo.toml b/Cargo.toml index cc09b4b161..3787a31060 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,9 @@ members = [ "sqlx-sqlite", "examples/mysql/todos", "examples/postgres/axum-social-with-tests", + "examples/postgres/chat", "examples/postgres/files", "examples/postgres/json", - "examples/postgres/listen", "examples/postgres/todos", "examples/postgres/mockable-todos", "examples/postgres/transaction", diff --git a/examples/postgres/chat/Cargo.toml b/examples/postgres/chat/Cargo.toml new file mode 100644 index 0000000000..0514e29eaf --- /dev/null +++ b/examples/postgres/chat/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "sqlx-example-postgres-chat" +version = "0.1.0" +edition = "2021" +workspace = "../../../" + +[dependencies] +sqlx = { path = "../../../", features = [ "postgres", "runtime-tokio-native-tls" ] } +futures = "0.3.1" +tokio = { version = "1.20.0", features = [ "rt-multi-thread", "macros" ] } +tui = "0.19.0" +crossterm = "0.25" +unicode-width = "0.1" diff --git a/examples/postgres/chat/README.md b/examples/postgres/chat/README.md new file mode 100644 index 0000000000..55dc0cd92c --- /dev/null +++ b/examples/postgres/chat/README.md @@ -0,0 +1,21 @@ +# Chat Example + +## Description + +This example demonstrates how to use PostgreSQL channels to create a very simple chat application. + +## Setup + +1. Declare the database URL + + ``` + export DATABASE_URL="postgres://postgres:password@localhost/files" + ``` + +## Usage + +Run the project + +``` +cargo run -p sqlx-examples-postgres-chat +``` diff --git a/examples/postgres/chat/src/main.rs b/examples/postgres/chat/src/main.rs new file mode 100644 index 0000000000..6a63b0e091 --- /dev/null +++ b/examples/postgres/chat/src/main.rs @@ -0,0 +1,178 @@ +use crossterm::{ + event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, + execute, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, +}; +use sqlx::postgres::PgListener; +use sqlx::PgPool; +use std::sync::Arc; +use std::{error::Error, io}; +use tokio::{sync::Mutex, time::Duration}; +use tui::{ + backend::{Backend, CrosstermBackend}, + layout::{Constraint, Direction, Layout}, + style::{Color, Modifier, Style}, + text::{Span, Spans, Text}, + widgets::{Block, Borders, List, ListItem, Paragraph}, + Frame, Terminal, +}; +use unicode_width::UnicodeWidthStr; + +struct ChatApp { + input: String, + messages: Arc>>, + pool: PgPool, +} + +impl ChatApp { + fn new(pool: PgPool) -> Self { + ChatApp { + input: String::new(), + messages: Arc::new(Mutex::new(Vec::new())), + pool, + } + } + + async fn run( + mut self, + terminal: &mut Terminal, + mut listener: PgListener, + ) -> Result<(), Box> { + // setup listener task + let messages = self.messages.clone(); + tokio::spawn(async move { + while let Ok(msg) = listener.recv().await { + messages.lock().await.push(msg.payload().to_string()); + } + }); + + loop { + let messages: Vec = self + .messages + .lock() + .await + .iter() + .map(|m| { + let content = vec![Spans::from(Span::raw(m.to_owned()))]; + ListItem::new(content) + }) + .collect(); + + terminal.draw(|f| self.ui(f, messages))?; + + if !event::poll(Duration::from_millis(20))? { + continue; + } + + if let Event::Key(key) = event::read()? { + match key.code { + KeyCode::Enter => { + notify(&self.pool, &self.input).await?; + self.input.clear(); + } + KeyCode::Char(c) => { + self.input.push(c); + } + KeyCode::Backspace => { + self.input.pop(); + } + KeyCode::Esc => { + return Ok(()); + } + _ => {} + } + } + } + } + + fn ui(&mut self, frame: &mut Frame, messages: Vec) { + let chunks = Layout::default() + .direction(Direction::Vertical) + .margin(2) + .constraints( + [ + Constraint::Length(1), + Constraint::Length(3), + Constraint::Min(1), + ] + .as_ref(), + ) + .split(frame.size()); + + let text = Text::from(Spans::from(vec![ + Span::raw("Press "), + Span::styled("Enter", Style::default().add_modifier(Modifier::BOLD)), + Span::raw(" to send the message, "), + Span::styled("Esc", Style::default().add_modifier(Modifier::BOLD)), + Span::raw(" to quit"), + ])); + let help_message = Paragraph::new(text); + frame.render_widget(help_message, chunks[0]); + + let input = Paragraph::new(self.input.as_ref()) + .style(Style::default().fg(Color::Yellow)) + .block(Block::default().borders(Borders::ALL).title("Input")); + frame.render_widget(input, chunks[1]); + frame.set_cursor( + // Put cursor past the end of the input text + chunks[1].x + self.input.width() as u16 + 1, + // Move one line down, from the border to the input line + chunks[1].y + 1, + ); + + let messages = + List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages")); + frame.render_widget(messages, chunks[2]); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // setup postgres + let conn_url = + std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example."); + let pool = sqlx::PgPool::connect(&conn_url).await?; + + let mut listener = PgListener::connect(&conn_url).await?; + listener.listen("chan0").await?; + + // setup terminal + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + // create app and run it + let app = ChatApp::new(pool); + let res = app.run(&mut terminal, listener).await; + + // restore terminal + disable_raw_mode()?; + execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + DisableMouseCapture, + )?; + terminal.show_cursor()?; + + if let Err(err) = res { + println!("{:?}", err) + } + + Ok(()) +} + +async fn notify(pool: &PgPool, s: &str) -> Result<(), sqlx::Error> { + sqlx::query( + r#" +SELECT pg_notify(chan, payload) +FROM (VALUES ('chan0', $1)) v(chan, payload) +"#, + ) + .bind(s) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/examples/postgres/listen/Cargo.toml b/examples/postgres/listen/Cargo.toml deleted file mode 100644 index ce1d9153aa..0000000000 --- a/examples/postgres/listen/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "sqlx-example-postgres-listen" -version = "0.1.0" -edition = "2021" -workspace = "../../../" - -[dependencies] -sqlx = { path = "../../../", features = [ "runtime-tokio", "postgres" ] } -futures = "0.3.1" -tokio = { version = "1.20.0", features = ["rt-multi-thread", "macros", "time"]} diff --git a/examples/postgres/listen/README.md b/examples/postgres/listen/README.md deleted file mode 100644 index 7a0c39a76b..0000000000 --- a/examples/postgres/listen/README.md +++ /dev/null @@ -1,18 +0,0 @@ -Postgres LISTEN/NOTIFY -====================== - -## Usage - -Declare the database URL. This example does not include any reading or writing of data. - -``` -export DATABASE_URL="postgres://postgres@localhost/postgres" -``` - -Run. - -``` -cargo run -``` - -The example program should connect to the database, and create a LISTEN loop on a predefined set of channels. A NOTIFY task will be spawned which will connect to the same database and will emit notifications on a 5 second interval. diff --git a/examples/postgres/listen/src/main.rs b/examples/postgres/listen/src/main.rs deleted file mode 100644 index 4d3e1db044..0000000000 --- a/examples/postgres/listen/src/main.rs +++ /dev/null @@ -1,110 +0,0 @@ -use futures::StreamExt; -use futures::TryStreamExt; -use sqlx::postgres::PgListener; -use sqlx::{Executor, PgPool}; -use std::pin; -use std::pin::pin; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::time::Duration; - -/// How long to sit in the listen loop before exiting. -/// -/// This ensures the example eventually exits, which is required for automated testing. -const LISTEN_DURATION: Duration = Duration::from_secs(5); - -#[tokio::main] -async fn main() -> Result<(), Box> { - println!("Building PG pool."); - let conn_str = - std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example."); - let pool = sqlx::PgPool::connect(&conn_str).await?; - - let mut listener = PgListener::connect_with(&pool).await?; - - let notify_pool = pool.clone(); - let _t = tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(2)); - - while !notify_pool.is_closed() { - interval.tick().await; - notify(¬ify_pool).await; - } - }); - - println!("Starting LISTEN loop."); - - listener.listen_all(vec!["chan0", "chan1", "chan2"]).await?; - - let mut counter = 0usize; - loop { - let notification = listener.recv().await?; - println!("[from recv]: {:?}", notification); - - counter += 1; - if counter >= 3 { - break; - } - } - - // Prove that we are buffering messages by waiting for 6 seconds - listener.execute("SELECT pg_sleep(6)").await?; - - let mut stream = listener.into_stream(); - - // `Sleep` must be pinned - let mut timeout = pin!(tokio::time::sleep(LISTEN_DURATION)); - - loop { - tokio::select! { - res = stream.try_next() => { - if let Some(notification) = res? { - println!("[from stream]: {:?}", notification); - } else { - break; - } - }, - _ = timeout.as_mut() => { - // Don't run forever - break; - } - } - } - - pool.close().await; - - Ok(()) -} - -async fn notify(pool: &PgPool) { - static COUNTER: AtomicI64 = AtomicI64::new(0); - - // There's two ways you can invoke `NOTIFY`: - // - // 1: `NOTIFY , ''` which cannot take bind parameters and - // is an identifier which is lowercased unless double-quoted - // - // 2: `SELECT pg_notify('', '')` which can take bind parameters - // and preserves its case - // - // We recommend #2 for consistency and usability. - - // language=PostgreSQL - let res = sqlx::query( - r#" --- this emits '{ "payload": N }' as the actual payload -select pg_notify(chan, json_build_object('payload', payload)::text) -from ( - values ('chan0', $1), - ('chan1', $2), - ('chan2', $3) - ) notifies(chan, payload) - "#, - ) - .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) - .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) - .bind(&COUNTER.fetch_add(1, Ordering::SeqCst)) - .execute(pool) - .await; - - println!("[from notify]: {:?}", res); -}