Skip to content

Commit

Permalink
Merge pull request #107 from nihohit/pull-from-upstream
Browse files Browse the repository at this point in the history
Pull from upstream
  • Loading branch information
shachlanAmazon authored Jan 28, 2024
2 parents 9e51226 + f04ea10 commit 2e31b36
Show file tree
Hide file tree
Showing 25 changed files with 1,084 additions and 364 deletions.
22 changes: 11 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ test:
@echo "===================================================================="
@echo "Testing Connection Type TCP without features"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --no-default-features -- --nocapture --test-threads=1
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --no-default-features -- --nocapture --test-threads=1

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and RESP2"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and RESP3"
Expand All @@ -21,52 +21,52 @@ test:
@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and Rustls support"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp+tls RUST_BACKTRACE=1 cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp+tls RUST_BACKTRACE=1 cargo test -p redis --all-features -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type TCP with all features and native-TLS support"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp+tls RUST_BACKTRACE=1 cargo test -p redis --features=json,tokio-native-tls-comp,connection-manager,cluster-async -- --nocapture --test-threads=1 --skip test_module
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp+tls RUST_BACKTRACE=1 cargo test -p redis --features=json,tokio-native-tls-comp,connection-manager,cluster-async -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type UNIX"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=unix RUST_BACKTRACE=1 cargo test -p redis --test parser --test test_basic --test test_types --all-features -- --test-threads=1 --skip test_module
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=unix RUST_BACKTRACE=1 cargo test -p redis --test parser --test test_basic --test test_types --all-features -- --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing Connection Type UNIX SOCKETS"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=unix RUST_BACKTRACE=1 cargo test -p redis --all-features -- --test-threads=1 --skip test_cluster --skip test_async_cluster --skip test_module
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=unix RUST_BACKTRACE=1 cargo test -p redis --all-features -- --test-threads=1 --skip test_cluster --skip test_async_cluster --skip test_module

@echo "===================================================================="
@echo "Testing async-std with Rustls"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-rustls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-rustls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing async-std with native-TLS"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-native-tls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test -p redis --features=async-std-native-tls-comp,cluster-async -- --nocapture --test-threads=1 --skip test_module

@echo "===================================================================="
@echo "Testing redis-test"
@echo "===================================================================="
@RUST_BACKTRACE=1 cargo test -p redis-test
@RUSTFLAGS="-D warnings" RUST_BACKTRACE=1 cargo test -p redis-test


test-module:
@echo "===================================================================="
@echo "Testing with module support enabled (currently only RedisJSON)"
@echo "===================================================================="
@REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test --all-features test_module -- --test-threads=1
@RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=tcp RUST_BACKTRACE=1 cargo test --all-features test_module -- --test-threads=1

test-single: test

bench:
cargo bench --all-features

docs:
@RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features --no-deps
@RUSTFLAGS="-D warnings" RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features --no-deps

upload-docs: docs
@./upload-docs.sh
Expand Down
17 changes: 16 additions & 1 deletion redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,22 @@ rustls-pki-types = { version = "1", optional = true }
serde = { version = "1.0.82", optional = true }
serde_json = { version = "1.0.82", optional = true }

# Only needed for bignum Support
rust_decimal = { version = "1.33.1", optional = true }
bigdecimal = { version = "0.4.2", optional = true }

# Optional aHash support
ahash = { version = "0.8.6", optional = true }
num-bigint = "0.4.3"

num-bigint = "0.4.4"
tracing = "0.1"
futures-time = { version = "3.0.0", optional = true }
arcstr = "1.1.5"
ordered-float = "4.1.1"

# Optional uuid support
uuid = { version = "1.6.1", optional = true }

[features]
default = ["acl", "streams", "geospatial", "script", "keep-alive"]
acl = []
Expand All @@ -119,6 +126,11 @@ cluster-async = ["cluster", "futures", "futures-util"]
keep-alive = ["socket2"]
sentinel = ["rand"]
tcp_nodelay = []
rust_decimal = ["dep:rust_decimal"]
bigdecimal = ["dep:bigdecimal"]
num-bigint = []
uuid = ["dep:uuid"]
disable-client-setinfo = []

# Deprecated features
tls = ["tls-native-tls"] # use "tls-native-tls" instead
Expand Down Expand Up @@ -166,6 +178,9 @@ required-features = ["cluster-async"]
name = "test_async_cluster_connections_logic"
required-features = ["cluster-async"]

[[test]]
name = "test_bignum"

[[bench]]
name = "bench_basic"
harness = false
Expand Down
2 changes: 1 addition & 1 deletion redis/examples/async-await.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use redis::AsyncCommands;
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_async_connection().await?;
let mut con = client.get_multiplexed_async_connection().await?;

con.set("key1", b"foo").await?;

Expand Down
24 changes: 9 additions & 15 deletions redis/examples/async-connection-loss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use redis::RedisResult;
use tokio::time::interval;

enum Mode {
Deprecated,
Default,
Multiplexed,
Reconnect,
}

Expand Down Expand Up @@ -63,14 +63,14 @@ async fn main() -> RedisResult<()> {
println!("Using default connection mode\n");
Mode::Default
}
Some("multiplexed") => {
println!("Using multiplexed connection mode\n");
Mode::Multiplexed
}
Some("reconnect") => {
println!("Using reconnect manager mode\n");
Mode::Reconnect
}
Some("deprecated") => {
println!("Using deprecated connection mode\n");
Mode::Deprecated
}
Some(_) | None => {
println!("Usage: reconnect-manager (default|multiplexed|reconnect)");
process::exit(1);
Expand All @@ -79,16 +79,10 @@ async fn main() -> RedisResult<()> {

let client = redis::Client::open("redis://127.0.0.1/").unwrap();
match mode {
Mode::Default => run_single(client.get_async_connection().await?).await?,
Mode::Multiplexed => run_multi(client.get_multiplexed_tokio_connection().await?).await?,
Mode::Reconnect => {
run_multi(
client
.get_connection_manager_with_backoff(2, 100, 6)
.await?,
)
.await?
}
Mode::Default => run_multi(client.get_multiplexed_tokio_connection().await?).await?,
Mode::Reconnect => run_multi(client.get_connection_manager().await?).await?,
#[allow(deprecated)]
Mode::Deprecated => run_single(client.get_async_connection().await?).await?,
};
Ok(())
}
4 changes: 2 additions & 2 deletions redis/examples/async-pub-sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use redis::AsyncCommands;
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut publish_conn = client.get_async_connection().await?;
let mut pubsub_conn = client.get_async_connection().await?.into_pubsub();
let mut publish_conn = client.get_multiplexed_async_connection().await?;
let mut pubsub_conn = client.get_async_pubsub().await?;

pubsub_conn.subscribe("wavephone").await?;
let mut pubsub_stream = pubsub_conn.on_message();
Expand Down
2 changes: 1 addition & 1 deletion redis/examples/async-scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use redis::{AsyncCommands, AsyncIter};
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_async_connection().await?;
let mut con = client.get_multiplexed_async_connection().await?;

con.set("async-key1", b"foo").await?;
con.set("async-key2", b"foo").await?;
Expand Down
17 changes: 12 additions & 5 deletions redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(deprecated)]

#[cfg(feature = "async-std-comp")]
use super::async_std;
use super::ConnectionLike;
Expand All @@ -10,7 +12,7 @@ use crate::connection::{
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
use crate::parser::ValueCodec;
use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value};
use crate::{from_redis_value, ProtocolVersion, ToRedisArgs};
use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs};
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
use ::async_std::net::ToSocketAddrs;
use ::tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
Expand All @@ -28,6 +30,7 @@ use std::pin::Pin;
use tokio_util::codec::Decoder;

/// Represents a stateful redis TCP connection.
#[deprecated(note = "aio::Connection is deprecated. Use aio::MultiplexedConnection instead.")]
pub struct Connection<C = Pin<Box<dyn AsyncStream + Send + Sync>>> {
con: C,
buf: Vec<u8>,
Expand Down Expand Up @@ -153,7 +156,9 @@ where
let mut received_unsub = false;
let mut received_punsub = false;
if self.protocol == ProtocolVersion::RESP3 {
while let Value::Push { kind, data } = from_redis_value(&self.read_response().await?)? {
while let Value::Push { kind, data } =
from_owned_redis_value(self.read_response().await?)?
{
if data.len() >= 2 {
if let Value::Int(num) = data[1] {
if resp3_is_pub_sub_state_cleared(
Expand All @@ -169,7 +174,8 @@ where
}
} else {
loop {
let res: (Vec<u8>, (), isize) = from_redis_value(&self.read_response().await?)?;
let res: (Vec<u8>, (), isize) =
from_owned_redis_value(self.read_response().await?)?;
if resp2_is_pub_sub_state_cleared(
&mut received_unsub,
&mut received_punsub,
Expand Down Expand Up @@ -380,6 +386,7 @@ where
}

/// Exits from `PubSub` mode and converts [`PubSub`] into [`Connection`].
#[deprecated(note = "aio::Connection is deprecated")]
pub async fn into_connection(mut self) -> Connection<C> {
self.0.exit_pubsub().await.ok();

Expand All @@ -406,7 +413,7 @@ where
ValueCodec::default()
.framed(&mut self.0.con)
.filter_map(|value| {
Box::pin(async move { T::from_redis_value(&value.ok()?.ok()?).ok() })
Box::pin(async move { T::from_owned_redis_value(value.ok()?.ok()?).ok() })
})
}

Expand All @@ -415,7 +422,7 @@ where
ValueCodec::default()
.framed(self.0.con)
.filter_map(|value| {
Box::pin(async move { T::from_redis_value(&value.ok()?.ok()?).ok() })
Box::pin(async move { T::from_owned_redis_value(value.ok()?.ok()?).ok() })
})
}
}
Expand Down
6 changes: 6 additions & 0 deletions redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ pub trait ConnectionLike {
/// Sends multiple already encoded (packed) command into the TCP socket
/// and reads `count` responses from it. This is used to implement
/// pipelining.
/// Important - this function is meant for internal usage, since it's
/// easy to pass incorrect `offset` & `count` parameters, which might
/// cause the connection to enter an erroneous state. Users shouldn't
/// call it, instead using the Pipeline::query_async function.
#[doc(hidden)]
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a crate::Pipeline,
Expand Down Expand Up @@ -158,6 +163,7 @@ where

// result is ignored, as per the command's instructions.
// https://redis.io/commands/client-setinfo/
#[cfg(not(feature = "disable-client-setinfo"))]
let _: RedisResult<()> = crate::connection::client_set_info_pipeline()
.query_async(con)
.await;
Expand Down
5 changes: 4 additions & 1 deletion redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,10 @@ impl MultiplexedConnection {
driver
}
futures_util::future::Either::Right(((), _)) => {
unreachable!("Multiplexed connection driver unexpectedly terminated")
return Err(RedisError::from((
crate::ErrorKind::IoError,
"Multiplexed connection driver unexpectedly terminated",
)));
}
}
};
Expand Down
32 changes: 32 additions & 0 deletions redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Client {
impl Client {
/// Returns an async connection from the client.
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
#[deprecated(
note = "aio::Connection is deprecated. Use client::get_multiplexed_async_connection instead."
)]
#[allow(deprecated)]
pub async fn get_async_connection(&self) -> RedisResult<crate::aio::Connection> {
let (con, _ip) = match Runtime::locate() {
#[cfg(feature = "tokio-comp")]
Expand All @@ -99,6 +103,10 @@ impl Client {
/// Returns an async connection from the client.
#[cfg(feature = "tokio-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))]
#[deprecated(
note = "aio::Connection is deprecated. Use client::get_multiplexed_tokio_connection instead."
)]
#[allow(deprecated)]
pub async fn get_tokio_connection(&self) -> RedisResult<crate::aio::Connection> {
use crate::aio::RedisRuntime;
Ok(
Expand All @@ -111,6 +119,10 @@ impl Client {
/// Returns an async connection from the client.
#[cfg(feature = "async-std-comp")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-std-comp")))]
#[deprecated(
note = "aio::Connection is deprecated. Use client::get_multiplexed_async_std_connection instead."
)]
#[allow(deprecated)]
pub async fn get_async_std_connection(&self) -> RedisResult<crate::aio::Connection> {
use crate::aio::RedisRuntime;
Ok(
Expand Down Expand Up @@ -655,6 +667,26 @@ impl Client {

inner_build_with_tls(connection_info, tls_certs)
}

/// Returns an async receiver for pub-sub messages.
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
// TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
#[allow(deprecated)]
self.get_async_connection()
.await
.map(|connection| connection.into_pubsub())
}

/// Returns an async receiver for monitor messages.
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
// TODO - do we want to type-erase monitor using a trait, to allow us to replace it with a different implementation later?
pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
#[allow(deprecated)]
self.get_async_connection()
.await
.map(|connection| connection.into_monitor())
}
}

#[cfg(feature = "aio")]
Expand Down
14 changes: 6 additions & 8 deletions redis/src/cluster_pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cluster::ClusterConnection;
use crate::cmd::{cmd, Cmd};
use crate::types::{
from_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
from_owned_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
};

pub(crate) const UNROUTABLE_ERROR: (ErrorKind, &str) = (
Expand Down Expand Up @@ -118,13 +118,11 @@ impl ClusterPipeline {
}
}

from_redis_value(
&(if self.commands.is_empty() {
Value::Array(vec![])
} else {
self.make_pipeline_results(con.execute_pipeline(self)?)
}),
)
from_owned_redis_value(if self.commands.is_empty() {
Value::Array(vec![])
} else {
self.make_pipeline_results(con.execute_pipeline(self)?)
})
}

/// This is a shortcut to `query()` that does not return a value and
Expand Down
Loading

0 comments on commit 2e31b36

Please sign in to comment.