怎么用python快速搭建redis集群

  redis通信协议

  列出主要的点,便于对于下面程序的理解。

  Redis在TCP端口6379(默认端口,在配置可以修改)上监听到来的连接,在客户端与服务器端之间传输的每个Redis命令或者数据都以rn结尾。

  回复(服务端可客户端恢复的协议)

  Redis用不同的回复类型回复命令。它可能从服务器发送的第一个字节开始校验回复类型:

  * 用单行回复(状态回复),回复的第一个字节将是&
ldquo;
+&
rdquo;

  * 错误消息,回复的第一个字节将是&
ldquo;
-&
rdquo;

  * 整型数字,回复的第一个字节将是&

用Python轻松搭建Redis集群,快速提升应用的性能!


ldquo;
:&
rdquo;

  * 批量回复,回复的第一个字节将是&
ldquo;
$&
rdquo;

  * 多个批量回复,回复的第一个字节将是&
ldquo;
*&
rdquo;

  Bulk Strings(批量回复)

  批量回复被服务器用于返回一个单二进制安全字符串。

  C: GET mykey

  S: $6rnfoobarrn

  服务器发送第一行回复,该行以&
ldquo;
$&
rdquo;
开始后面跟随实际要发送的字节数,随后是CRLF,然后发送实际数据,随后是2个字节的额外数据用于最后的CRLF。服务器发送的准确序列如下:

  ”$6rnfoobarrn”

  如果请求的值不存在,批量回复将使用特殊的值-1来作为数据长度,例如:

  C: GET nonexistingkey

  S: $-1

  当请求的对象不存在时,客户端库API不会返回空字符串,而会返回空对象。例如:Ruby库返回&
lsquo;
nil&
rsquo;
,而C库返回NULL(或者在回复的对象里设置指定的标志)等等。

  二进制

  简单说下二进制,就是会包含,所以C语言在处理的时候,就不能用str函数,像strlen、strcpy等,因为它们都是以来判断字符串结尾的。

  redis集群    超简单搭建redis集群

  官网也介绍了怎么搭建redis集群,试过比较麻烦,因为用的centos6.5,如果用较新的centos,可能会好点。

  Redis 集群的数据分片

  Redis 集群没有使用一致性hash, 而是引入了 哈希槽的概念.

  Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽.集群的每个节点负责一部分hash槽,举个例子,比如当前集群有3个节点,那么:

  * 节点 A 包含 0 到 5500号哈希槽.

  * 节点 B 包含5501 到 11000 号哈希槽.

  * 节点 C 包含11001 到 16384号哈希槽.

  这种结构很容易添加或者删除节点. 比如如果我想新添加个节点D, 我需要从节点 A, B, C中得部分槽到D上. 如果我想移除节点A,需要将A中的槽移到B和C节点上,然后将没有任何槽的A节点从集群中移除即可.   由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽的数量都不会造成集群不可用的状态.

  Redis 集群协议中的客户端和服务器端

  在 Redis 集群中,节点负责存储数据、记录集群的状态(包括键值到正确节点的映射)。集群节点同样能自动发现其他节点,检测出没正常工作的节点, 并且在需要的时候在从节点中推选出主节点。

  为了执行这些任务,所有的集群节点都通过TCP连接(TCP bus?)和一个二进制协议(集群连接,cluster bus)建立通信。 每一个节点都通过集群连接(cluster bus)与集群上的其余每个节点连接起来。  节点们使用一个 gossip 协议来传播集群的信息,这样可以:发现新的节点、 发送ping包(用来确保所有节点都在正常工作中)、在特定情况发生时发送集群消息。集群连接也用于在集群中发布或订阅消息。

  由于集群节点不能代理(proxy)请求,所以客户端在接收到重定向错误(redirections errors) -MOVED 和 -ASK 的时候, 将命令重定向到其他节点。理论上来说,客户端是可以自由地向集群中的所有节点发送请求,在需要的时候把请求重定向到其他节点,所以客户端是不需要保存集群状态。 不过客户端可以缓存键值和节点之间的映射关系,这样能明显提高命令执行的效率。

  -MOVED

  简单说下返回-MOVED的情况,就是客户端连节点A请求处理key,但其实key其实在节点B,就返回-MOVED,协议如下:-MOVED 3999 127.0.0.1:6381

不用考虑-ASK的情况。

  C语言实现redis客户端

  代码如下:

#include <
string.h>
#include <
sys/socket.h>
#include <
arpa/inet.h>
#include <
errno.h>
#include <
fcntl.h>
#include <
netdb.h>
#include <
sys/poll.h>
#include <
unistd.h>
#include <
sys/types.h>
#include <
stdlib.h>
#include <
stdio.h>
ssize_t sock_write_loop( int fd, const void *vptr, size_t n )
{
size_t nleft = 0;

ssize_t nwritten = 0;
const char *ptr;


ptr = (char *) vptr;

nleft = n;
while( nleft >
0 )
{if( (nwritten = write(fd, ptr, nleft) ) <
= 0 )
{if( errno == EINTR )
{
nwritten = 0;
//再次调用write }else{return -5;

}
}
nleft = nleft - nwritten;

ptr = ptr + nwritten;

}return(n);

}int sock_read_wait( int fd, int timeout )
{struct pollfd pfd;


pfd.fd = fd;

pfd.events = POLLIN;

pfd.revents = 0;


timeout *= 1000;
for (;
;
)
{switch( poll(&
pfd, 1, timeout) )
{case -1:if( errno != EINTR )
{return (-2);

}continue;
case 0:
errno = ETIMEDOUT;
return (-1);
default:if( pfd.revents &
POLLIN )return (0);
elsereturn (-3);

}
}

}

ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout )
{ if( timeout >
0 &
&
sock_read_wait(fd, timeout) <
0 )return (-1);
elsereturn (read(fd, vptr, len));


}int sock_connect_nore(const char *IPaddr , int port , int timeout)
{ // char temp[4096];
int sock_fd = 0, n = 0, errcode = 0;
struct sockaddr_in servaddr;
if( IPaddr == NULL )
{return -1;

}if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) <
0 )
{return -1;

}

memset(&
servaddr, 0, sizeof(servaddr));

servaddr.sin_family = AF_INET;

servaddr.sin_port = htons(port);
//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &
servaddr.sin_addr) ) <
= 0 )
{//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;
char sBuf[2048], sHostIp[17];
int h_errnop = 0;


memset(&
host, 0, sizeof(host));

memset(sBuf, 0, sizeof(sBuf));

memset(sHostIp, 0 , sizeof(sHostIp));

pHost = &
host;


#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &
h_errnop) == NULL) ||
#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &
pHost, &
h_errnop) != 0) ||
#endif(pHost == NULL) )
{
close(sock_fd);
return -1;

}if( pHost->
h_addrtype != AF_INET &
&
pHost->
h_addrtype != AF_INET6 )
{
close(sock_fd);
return -1;

}//目前仅取第一个IP地址if( (inet_ntop(pHost->
h_addrtype, *(pHost->
h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
{
close(sock_fd);
return -1;

} if( (errcode = inet_pton(AF_INET, sHostIp, &
servaddr.sin_addr) ) <
= 0 )
{
close(sock_fd);
return -1;

}//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&
servaddr, sizeof(servaddr), timeout) ) <
0 )
{
close(sock_fd);
return -1;

}return sock_fd;

}int sock_connect(const char *IPaddr , int port , int timeout)
{char temp[4096];
int sock_fd = 0, n = 0, errcode = 0;
struct sockaddr_in servaddr;
if( IPaddr == NULL )
{return -1;

}if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) <
0 )
{return -1;

}

memset(&
servaddr, 0, sizeof(servaddr));

servaddr.sin_family = AF_INET;

servaddr.sin_port = htons(port);
//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &
servaddr.sin_addr) ) <
= 0 )
{//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;
char sBuf[2048], sHostIp[17];
int h_errnop = 0;


memset(&
host, 0, sizeof(host));

memset(sBuf, 0, sizeof(sBuf));

memset(sHostIp, 0 , sizeof(sHostIp));

pHost = &
host;


#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &
h_errnop) == NULL) ||
#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &
pHost, &
h_errnop) != 0) ||
#endif(pHost == NULL) )
{
close(sock_fd);
return -1;

}if( pHost->
h_addrtype != AF_INET &
&
pHost->
h_addrtype != AF_INET6 )
{
close(sock_fd);
return -1;

}//目前仅取第一个IP地址if( (inet_ntop(pHost->
h_addrtype, *(pHost->
h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
{
close(sock_fd);
return -1;

} if( (errcode = inet_pton(AF_INET, sHostIp, &
servaddr.sin_addr) ) <
= 0 )
{
close(sock_fd);
return -1;

}//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&
servaddr, sizeof(servaddr), timeout) ) <
0 )
{
close(sock_fd);
return -1;

}

n = sock_read_tmo(sock_fd, temp, 4096, timeout);
//一般错误if( n <
= 0 )
{
close(sock_fd);


sock_fd = -1;

}return sock_fd;

}int sock_non_blocking(int fd, int on)
{int flags;
if ((flags = fcntl(fd, F_GETFL, 0)) <
0){return -10;

}if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags &
~O_NONBLOCK) <
0){return -10;

}return 0;

}int sock_write_wait(int fd, int timeout)
{struct pollfd pfd;


pfd.fd = fd;

pfd.events = POLLOUT;

pfd.revents = 0;


timeout *= 1000;
for (;
;
)
{switch( poll(&
pfd, 1, timeout) )
{case -1:if( errno != EINTR )
{return (-2);

}continue;
case 0:
errno = ETIMEDOUT;
return (-1);
default:if( pfd.revents &
POLLOUT )return (0);
elsereturn (-3);

}
}

}int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout)
{int error = 0;

socklen_t error_len;


sock_non_blocking(sock, 1);
if( connect(sock, sa, len) == 0 )
{
sock_non_blocking(sock, 0);
return (0);

}if( errno != EINPROGRESS )
{
sock_non_blocking(sock, 0);
return (-1);

}/* * A connection is in progress. Wait for a limited amount of time for
* something to happen. If nothing happens, report an error. */if( sock_write_wait(sock, timeout) != 0)
{
sock_non_blocking(sock, 0);
return (-2);

}/* * Something happened. Some Solaris 2 versions have getsockopt() itself
* return the error, instead of returning it via the parameter list. */error = 0;

error_len = sizeof(error);
if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &
error, &
error_len) != 0 )
{
sock_non_blocking(sock, 0);
return (-3);

}if( error )
{
errno = error;

sock_non_blocking(sock, 0);
return (-4);

}

sock_non_blocking(sock, 0);
/* * No problems. */return (0);


}static int check_ip_in_list(const char *ip, char *iplist)
{ char *token = NULL;
char *saveptr = NULL;

token = strtok_r(iplist, "
,"
, &
saveptr);
while(token != NULL)
{ char *ptmp = NULL;
char *ip_mask = strtok_r(token, "
/"
, &
ptmp);
if(!ip_mask) return -1;
char *ip_bit = strtok_r(NULL, "
/"
, &
ptmp);
if(ip_bit)
{int mask_bit = atoi(ip_bit);
if(mask_bit <
0 || mask_bit >
32)continue;


unsigned long addr[4] = { 0 };

sscanf( ip_mask, "
%lu.%lu.%lu.%lu"
, addr, addr + 1, addr + 2, addr + 3 );

unsigned long vl1 = addr[0] <
<
24 | addr[1] <
<
16 | addr[2] <
<
8 | addr[3];


sscanf( ip, "
%lu.%lu.%lu.%lu"
, addr, addr + 1, addr + 2, addr + 3 );

unsigned long vl2 = addr[0] <
<
24 | addr[1] <
<
16 | addr[2] <
<
8 | addr[3];


vl1 = ( vl1 >
>
( 32 - mask_bit ) );

vl2 = ( vl2 >
>
( 32 - mask_bit ) );
if( vl1 == vl2 ) return 1;

}else{if(strcmp(ip,ip_mask) == 0) return 1;

}

token = strtok_r(NULL, "
,"
, &
saveptr);

} return 0;

}static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro)
{char buf[128];
int loops = 0;


strcpy(buf, redis_host);
do{
loops ++;
char *ptmp = NULL;
char *host = strtok_r(buf, "
:"
, &
ptmp);
if(!host) return -1;
char *s_port = strtok_r(NULL, "
:"
, &
ptmp);
if(!s_port) return -1;
int port = atoi(s_port);
char respone[40] = {0};
int sock_fd = -1;
if((sock_fd = sock_connect_nore(host, port, 5))<
0)return -1;
if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro))
{
close(sock_fd);
return -1;

}if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<
=0)
{
close(sock_fd);
return -1;

} if(strncmp("
:0"
, respone, 2) == 0)
{
close(sock_fd);
return 0;

} else if(strncmp("
:1"
, respone, 2) == 0)
{
close(sock_fd);
return 1;

} else if(strncmp("
$"
, respone, 1) == 0)
{ int data_size = 0;
int ret = 0;
char *data_line = strstr(respone,"
rn"
);
if(!data_line)
{
close(sock_fd);
return -1;

}
data_line = data_line+2;


data_size = atoi(respone+1);
if(data_size == -1)
{
close(sock_fd);
return 0;

}if(strlen(data_line) == data_size+2)
{
printf("
line = %d, data_line = %sn"
,__LINE__,data_line);

ret=check_ip_in_list(ip, data_line);

close(sock_fd);
return ret;

}char *data = calloc(data_size+3,1);
if(!data)
{
close(sock_fd);
return -1;

}
strcpy(data,data_line);
int read_size = strlen(data);
int left_size = data_size + 2 - read_size;
while(left_size >
0)
{int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);
if(nread<
=0)
{free(data);

close(sock_fd);
return -1;

}
read_size += nread;

left_size -= nread;

}
close(sock_fd);

printf("
line = %d, data = %sn"
,__LINE__,data);

ret=check_ip_in_list(ip, data);
free(data);
return ret;

} else if(strncmp("
-MOVED"
, respone, 6) == 0)
{
close(sock_fd);
char *p = strchr(respone, '
'
);
if(p == NULL)return -1;


p = strchr(p+1, '
'
);
if(p == NULL)return -1;


strcpy(buf, p+1);

}else{
close(sock_fd);
return -1;

}

}while(loops <
2);
return -1;

}int main(int argc,char *argv[])
{if(argc != 2)
{
printf("
please input ipn"
);
return -1;

} const char *redis_ip = "
127.0.0.1:7002"
;
const char *domain = "
test.com"
;
char exist_pro[128] = {0};
char get_pro[128] = {0};

snprintf(exist_pro,sizeof(exist_pro),"
EXISTS test|%s|%srn"
,domain,"
127.0.0.1"
);

snprintf(get_pro,sizeof(get_pro),"
GET test_%srn"
,domain);
int loops = 0;
int ret = 0;
do{
loops ++;

ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);
if(ret == 0)
ret = check_ip_in_redis(redis_ip, argv[1],get_pro);

}while(loops <
3 &
&
ret <
0);


printf("
line = %d, ret = %dn"
,__LINE__,ret);
return ret;

}

c_redis_cli.c

  主要看这个check_ip_in_redis函数就行了,其它都是一些socket的封装。

  python实现redis客户端#!/usr/bin/pythonimport sys
import socketdef main(argv):if(len(argv) != 3):print "
please input domain ip!"
returnhost = "
192.168.188.47"

port = 7002while 1:
s = socket.socket()
s.connect((host, port))

cmd = '
set %s_white_ip %srn'
% (argv[1],argv[2])
s.send(cmd)
res = s.recv(32)
s.close()
if res[0] == "
+"
:print "
set domain white ip suc!"
return elif res[0:6] == "
-MOVED"
:
list = res.split("
"
)
ip_list = list[2].split("
:"
)
host = ip_list[0]
port = int(ip_list[1]) else:print "
set domain white ip error!"
return if __name__ == "
__main__"
:
main(sys.argv)

Redis是当今流行的NoSQL数据库之一,其高性能、可扩展性以及灵活性广受认可,被越来越多的应用程序选用。为了满足大量应用对Redis的需求,搭建Redis集群已经成为一个必需的步骤。本文将介绍如何用Python快速搭建Redis集群,并探讨如何应对常见的问题。
1.搭建Redis集群的基本步骤
搭建Redis集群基于Redis Cluster,它将数据分片存储,提高了Redis数据库的可扩展性和容错性。 搭建Redis集群的步骤如下:
-安装Python Redis模块
-设置Redis集群配置文件
-启动Redis集群
-验证Redis集群是否正常工作
2.应对搭建Redis集群中的常见问题
搭建Redis集群可遇到多个问题,如节点迁移、数据丢失和故障恢复等。为了避免出现这些问题,建议在搭建Redis集群前做好准备工作,并按以下步骤操作:
-制定集群规划
-确保网络通畅
-设置节点间互信
-合理分配资源
-备份数据和文件
3.如何评估Redis集群的性能
性能评估是机器学习过程中一项关键任务,对于Redis集群同样如此。对Redis集群性能进行评估的步骤如下:
-确定评估对象和集群规模
-选择测试工具
-进行单节点测试和多节点测试
-记录评估结果
本文提供的不仅是纯技术方面的指导,还包括了一些操作实践技巧,不仅适用于Redis集群的搭建,也适用于其他集群的搭建。追随本文所讲的步骤,相信你一定能够实现快速搭建合理效率的Redis集群,为应用程序的高效稳定提供有力支撑。