Skip to content
This repository has been archived by the owner on Dec 16, 2024. It is now read-only.

Sentinel #39

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Supported Redis features:
* Pipelining
* Authentication & multiple dbs
* Pubsub
* Sentinel failover

## Example

Expand Down Expand Up @@ -53,14 +54,14 @@ Pubsub:
received {message,<<"foo">>,<<"bar">>,<0.34.0>}

Pattern Subscribe:
1> eredis_sub:psub_example().

1> eredis_sub:psub_example().
received {subscribed,<<"foo*">>,<0.33.0>}
{<0.33.0>,<0.36.0>}
2> eredis_sub:ppub_example().
received {pmessage,<<"foo*">>,<<"foo123">>,<<"bar">>,<0.33.0>}
ok
3>
3>

EUnit tests:

Expand Down Expand Up @@ -110,6 +111,54 @@ stampede of clients just waiting for a failed connection attempt or
Note: If Eredis is starting up and cannot connect, it will fail
immediately with `{connection_error, Reason}`.

## Redis sentinel support

### Overview

Starting from version 2.4.16 and 2.6.0-rc6 redis shipped with
standart monitoring and automatic failover tool called Sentinel.
It started as separate process that monitors redis instances and automatically
switch to new master if the current one fails. After this all slaves are reconfigured
to get data from new master automatically by sentinel.
More information is here - http://redis.io/topics/sentinel
When working with cluster that uses sentinel, clients should ask sentinel processes
about current master instance.

### Working with sentinels

To enable sentinel support for eredis app:

Start eredis_sentinel main process under supervisor with list of all sentinels as argument:

eredis_sentinel:start_link([{"host1.lan", 20367}, {"host2.lan", 20367}]).


When starting eredis clients use string `sentinel:master_name` instead host:

eredis:start_link("sentinel:mymaster", 0).

Port is ignored in this case, but needed as eredis:start_link/1 is a special form used in poolboy integration.

`eredis_client` process will ask `eredis_sentinel` about current master for `mymaster` cluster and
connect to it. `eredis_sentinel` also tracks all clients and in case that master changes
it will send notifications to all interested clients.

`eredis_sentinel` implements algorithm described in "Guidelines for Redis clients with
support for Redis Sentinel":http://redis.io/topics/sentinel-clients .
If it is unable to discover master for some cluster it return error code describing source of problem:

1. `sentinel_unreachable` - couldn't connect to any of sentinels
2. `master_unknown` - sentinels do not know about this cluster name
3. `master_unreachable` - there are no valid master for this cluster now

### Testing sentinel support

`eredis_sentinel` has testing suite wich uses real redis cluster with sentinel monitoring.
So for running these tests you should have be allowed to run `redis-server` and `redis-sentinel` executables.
Test suite is integrated as part of common eredis eunit test suite.
Before start it checks that `redis-server` and `redis-sentinel` is installed and prints warning if not.
Every test case start with fresh cluster with config files from `priv/redis_*.conf` at the end of case cluster is shutted down.

## Pubsub

Thanks to Dave Peticolas (jdavisp3), eredis supports
Expand Down
12 changes: 12 additions & 0 deletions include/eredis_sentinel.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
%% Types
-type master_name() :: atom().
-type master_host() :: string().
-type master_port() :: integer().

%% Sentinel constants
-define(SENTINEL_PORT, 26379).

% Sentinel errors
-define(SENTINEL_UNREACHABLE, sentinel_unreachable).
-define(MASTER_UNKNOWN, master_unknown).
-define(MASTER_UNREACHABLE, master_unreachable).
7 changes: 7 additions & 0 deletions priv/redis_cache1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bind 127.0.0.1
port 6382
pidfile ./redis_cache1.pid
daemonize yes
timeout 0
logfile stdout
#slaveof 127.0.0.1 6381
7 changes: 7 additions & 0 deletions priv/redis_cache2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bind 127.0.0.1
port 6383
pidfile ./redis_cache2.pid
daemonize yes
timeout 0
logfile stdout
slaveof 127.0.0.1 6382
21 changes: 21 additions & 0 deletions priv/redis_sentinel1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
port 26380
daemonize yes
pidfile ./redis_sentinel1.pid

sentinel monitor session 127.0.0.1 6380 1
sentinel down-after-milliseconds session 1000
sentinel failover-timeout session 10000
sentinel can-failover session yes
sentinel parallel-syncs session 1

sentinel monitor cache 127.0.0.1 6382 1
sentinel down-after-milliseconds cache 1000
sentinel failover-timeout cache 10000
sentinel can-failover cache yes
sentinel parallel-syncs cache 1

sentinel monitor badmaster localhost 6385 1
sentinel down-after-milliseconds badmaster 1000
sentinel failover-timeout badmaster 10000
sentinel can-failover badmaster yes
sentinel parallel-syncs badmaster 1
21 changes: 21 additions & 0 deletions priv/redis_sentinel2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
port 26381
daemonize yes
pidfile ./redis_sentinel2.pid

sentinel monitor session 127.0.0.1 6380 1
sentinel down-after-milliseconds session 1000
sentinel failover-timeout session 10000
sentinel can-failover session yes
sentinel parallel-syncs session 1

sentinel monitor cache 127.0.0.1 6382 1
sentinel down-after-milliseconds cache 1000
sentinel failover-timeout cache 10000
sentinel can-failover cache yes
sentinel parallel-syncs cache 1

sentinel monitor badmaster localhost 6385 1
sentinel down-after-milliseconds badmaster 1000
sentinel failover-timeout badmaster 10000
sentinel can-failover badmaster yes
sentinel parallel-syncs badmaster 1
6 changes: 6 additions & 0 deletions priv/redis_session1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
bind 127.0.0.1
port 6380
pidfile ./redis_session1.pid
daemonize yes
timeout 0
logfile stdout
7 changes: 7 additions & 0 deletions priv/redis_session2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bind 127.0.0.1
port 6381
pidfile ./redis_session2.pid
daemonize yes
timeout 0
logfile stdout
slaveof 127.0.0.1 6380
80 changes: 67 additions & 13 deletions src/eredis_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
port :: integer() | undefined,
password :: binary() | undefined,
database :: binary() | undefined,

sentinel :: undefined | atom(),

reconnect_sleep :: reconnect_sleep() | undefined,

socket :: port() | undefined,
Expand Down Expand Up @@ -68,14 +71,24 @@ stop(Pid) ->
%%====================================================================

init([Host, Port, Database, Password, ReconnectSleep]) ->
Sentinel =
case Host of
"sentinel:"++MasterStr ->
list_to_atom(MasterStr);
_ ->
undefined
end,

State = #state{host = Host,
port = Port,
database = read_database(Database),
password = list_to_binary(Password),
reconnect_sleep = ReconnectSleep,

parser_state = eredis_parser:init(),
queue = queue:new()},
queue = queue:new(),
sentinel = Sentinel
},

case connect(State) of
{ok, NewState} ->
Expand Down Expand Up @@ -126,18 +139,18 @@ handle_info({tcp_closed, _Socket}, #state{reconnect_sleep = no_reconnect} = Stat
{stop, normal, State#state{socket = undefined}};

handle_info({tcp_closed, _Socket}, State) ->
Self = self(),
spawn(fun() -> reconnect_loop(Self, State) end),

%% Throw away the socket and the queue, as we will never get a
%% response to the requests sent on the old socket. The absence of
%% a socket is used to signal we are "down"
{noreply, State#state{socket = undefined, queue = queue:new()}};
{ok, StateNew} = start_reconnect(State#state{socket = undefined}),
{noreply, StateNew};

%% Redis is ready to accept requests, the given Socket is a socket
%% already connected and authenticated.
handle_info({connection_ready, Socket}, #state{socket = undefined} = State) ->
{noreply, State#state{socket = Socket}};
%% Also keep master Host/Port in case it changed during reconnection
handle_info({connection_ready, Socket, Host, Port}, #state{socket = undefined} = State) ->
{noreply, State#state{socket = Socket, host=Host, port=Port}};

%% Notification from eredis_sentinel about new master
handle_info({sentinel, {reconnect, _MasterName, Host, Port}}, State) ->
do_sentinel_reconnect(Host, Port, State);

%% eredis can be used in Poolboy, but it requires to support a simple API
%% that Poolboy uses to manage the connections.
Expand Down Expand Up @@ -250,7 +263,17 @@ safe_reply(From, Value) ->
%% the correct database. These commands are synchronous and if Redis
%% returns something we don't expect, we crash. Returns {ok, State} or
%% {SomeError, Reason}.
connect(State) ->
connect(#state{sentinel = undefined} = State) ->
connect1(State);
connect(#state{sentinel = Master} = State) ->
case eredis_sentinel:get_master(Master, true) of
{ok, {Host, Port}} ->
connect1(State#state{host=Host, port=Port});
{error, Error} ->
{error, {sentinel_error, Error}}
end.

connect1(State) ->
case gen_tcp:connect(State#state.host, State#state.port, ?SOCKET_OPTS) of
{ok, Socket} ->
case authenticate(Socket, State#state.password) of
Expand Down Expand Up @@ -301,9 +324,9 @@ do_sync_command(Socket, Command) ->
%% connection, give the socket to the redis client.
reconnect_loop(Client, #state{reconnect_sleep = ReconnectSleep} = State) ->
case catch(connect(State)) of
{ok, #state{socket = Socket}} ->
{ok, #state{socket = Socket, host=Host, port=Port}} ->
gen_tcp:controlling_process(Socket, Client),
Client ! {connection_ready, Socket};
Client ! {connection_ready, Socket, Host, Port};
{error, _Reason} ->
timer:sleep(ReconnectSleep),
reconnect_loop(Client, State);
Expand All @@ -319,3 +342,34 @@ read_database(undefined) ->
undefined;
read_database(Database) when is_integer(Database) ->
list_to_binary(integer_to_list(Database)).


%% Handle sentinel "reconnect to new master" message
%% 1. we already connected to new master - ignore
do_sentinel_reconnect(Host, Port, #state{host=Host,port=Port}=State) ->
{noreply, State};
%% 2. we are waiting for reconnecting already - ignore
do_sentinel_reconnect(_Host, _Port, #state{socket=undefined}=State) ->
{noreply, State};
%% 3. we are not supposed to reconnect - stop processing
do_sentinel_reconnect(_Host, _Port, #state{reconnect_sleep=no_reconnect}=State) ->
{stop, sentinel_reconnect, State};
%% 4. we are connected to wrong master - reconnect
do_sentinel_reconnect(Host, Port, State) ->
{ok, StateNew} = start_reconnect(State#state{host=Host, port=Port}),
{noreply, StateNew}.

%% @doc Start reconnecting loop, close old connection if present.
-spec start_reconnect(#state{}) -> {ok, #state{}}.
start_reconnect(#state{socket=undefined} = State) ->
Self = self(),
spawn(fun() -> reconnect_loop(Self, State) end),

%% Throw away the socket and the queue, as we will never get a
%% response to the requests sent on the old socket. The absence of
%% a socket is used to signal we are "down"
%% TODO shouldn't we need to send error reply to waiting clients?
{ok, State#state{queue = queue:new()}};
start_reconnect(#state{socket=Socket} = State) ->
gen_tcp:close(Socket),
start_reconnect(State#state{socket=undefined}).
Loading