Redis命令处理过程实例源码分析
本文基于社区版Redis 4.0.8
1、命令解析Redis服务器接收到的命令请求首先存储在客户端对象的querybuf输入缓冲区,然后解析命令请求的各个参数,并存储在客户端对象的argv和argc字段。
客户端解析命令请求的入口函数为readQueryFromClient,会读取socket数据存储到客户端对象的输入缓冲区,并调用函数processInputBuffer解析命令请求。
注:内联命令:使用telnet会话输入命令的方式
void processInputBuffer(client *c) {......
//循环遍历输入缓冲区,获取命令参数,调用processMultibulkBuffer解析命令参数和长度
while(sdslen(c->
querybuf)) {
if (c->
reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
//处理telnet方式的内联命令
} else if (c->
reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
//解析命令参数和长度暂存到客户端结构体中
} else {
serverPanic("
Unknown request type"
);
}
}
}
//解析命令参数和长度暂存到客户端结构体中
int processMultibulkBuffer(client *c) {
//定位到行尾
newline = strchr(c->
querybuf,'
\r'
);
//解析命令请求参数数目,并存储在客户端对象的c->
multibulklen字段
serverAssertWithInfo(c,NULL,c->
querybuf[0] == '
*'
);
ok = string2ll(c->
querybuf+1,newline-(c->
querybuf+1),&
ll);
c->
multibulklen = ll;
pos = (newline-c->
querybuf)+2;
//记录已解析命令的请求长度resp的长度
/* Setup argv array on client structure */
//分配请求参数存储空间
c->
argv = zmalloc(sizeof(robj*)*c->
multibulklen);
// 开始循环解析每个请求参数
while(c->
multibulklen) {
......
newline = strchr(c->
querybuf+pos,'
\r'
);
if (c->
querybuf[pos] != '
$'
) {
return C_ERR;
ok = string2ll(c->
querybuf+pos+1,newline-(c->
querybuf+pos+1),&
ll);
pos += newline-(c->
querybuf+pos)+2;
c->
bulklen = ll;
//字符串参数长度暂存在客户端对象的bulklen字段
//读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen
c->
argv[c->
argc++] =createStringObject(c->
querybuf+pos,c->
bulklen);
pos += c->
bulklen+2;
c->
multibulklen--;
} 2、命令调用
当multibulklen的值更新为0时,表示参数解析完成,开始调用processCommand来处理命令,处理命令前有很多校验逻辑,如下:
void processInputBuffer(client *c) {......
//调用processCommand来处理命令
if (processCommand(c) == C_OK) {
......
}
}
//处理命令函数
int processCommand(client *c) {
//校验是否是quit命令
if (!strcasecmp(c->
argv[0]->
ptr,"
quit"
)) {
addReply(c,shared.ok);
c->
flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
//调用lookupCommand,查看该命令是否存在
c->
cmd = c->
lastcmd = lookupCommand(c->
argv[0]->
ptr);
if (!c->
cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"
unknown command '
%s'
"
,
(char*)c->
argv[0]->
ptr);
return C_OK;
//检查用户权限
if (server.requirepass &
&
!c->
authenticated &
&
c->
cmd->
proc != authCommand)
{
addReply(c,shared.noautherr);
//还有很多检查,不一一列举,比如集群/持久化/复制等
/* 真正执行命令 */
if (c->
flags &
CLIENT_MULTI &
&
c->
cmd->
proc != execCommand &
&
c->
cmd->
proc != discardCommand &
&
c->
cmd->
proc != multiCommand &
&
c->
cmd->
proc != watchCommand)
queueMultiCommand(c);
//将结果写入outbuffer
addReply(c,shared.queued);
}
// 调用execCommand执行命令
void execCommand(client *c) {
call(c,CMD_CALL_FULL);
//调用call执行命令
//调用execCommand调用call执行命令
void call(client *c, int flags) {
start = ustime();
c->
cmd->
proc(c);
//执行命令
duration = ustime()-start;
//如果是慢查询,记录慢查询
if (flags &
CMD_CALL_SLOWLOG &
&
c->
cmd->
proc != execCommand) {
char *latency_event = (c->
cmd->
flags &
CMD_FAST) ?
"
fast-command"
: "
command"
;
latencyAddSampleIfNeeded(latency_event,duration/1000);
//记录到慢日志中
slowlogPushEntryIfNeeded(c,c->
argv,c->
argc,duration);
//更新统计信息:当前命令执行时间和调用次数
if (flags &
CMD_CALL_STATS) {
c->
lastcmd->
microseconds += duration;
c->
lastcmd->
calls++;
3、返回结果
Redis返回结果并不是直接返回给客户端,而是先写入到输出缓冲区(buf字段)或者输出链表(reply字段)
int processCommand(client *c) {......
//将结果写入outbuffer
addReply(c,shared.queued);
......
}
//将结果写入outbuffer
void addReply(client *c, robj *obj) {
//调用listAddNodeHead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送
if (prepareClientToWrite(c) != C_OK) return;
//然后添加字符串到输出缓冲区
if (_addReplyToBuffer(c,obj->
ptr,sdslen(obj->
ptr)) != C_OK)
//如果添加失败,则添加到输出链表中
_addReplyObjectToList(c,obj);
}
addReply函数只是将待发送给客户端的数据暂存在输出链表或者输出缓冲区,那么什么时候将这些数据发送给客户端呢?答案是开启事件循环时,调用的beforesleep函数,该函数专门执行一些不是很费时的操作,如过期键删除,向客户端返回命令回复等
void beforeSleep(struct aeEventLoop *eventLoop) {......
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
}
//回复客户端命令函数
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&
li);
while((ln = listNext(&
li))) {
client *c = listNodeValue(ln);
c->
flags &
= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* 发送客户端数据 */
if (writeToClient(c->
fd,c,0) == C_ERR) continue;
/* If there is nothing left, do nothing. Otherwise install
* the write handler. */
//如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可
if (clientHasPendingReplies(c) &
&
aeCreateFileEvent(server.el, c->
fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
return processed;
Redis是一个开源的基于内存的数据结构存储系统,它支持多种数据结构,如字符串、哈希表、列表、集合等,另外还提供了一些特殊的功能,如发布/订阅、事务等。作为一个高效的存储系统,Redis在数据处理过程中采用了一些优化策略,本文将分析Redis命令处理过程的源码实现。
一、命令解析
在Redis中,命令是由客户端发送给服务器的,如何解析命令是Redis命令处理的第一步。Redis的命令解析器是按照协议规定的格式来进行解析的,一个命令由多个参数组成,每个参数以\\r\
结束。Redis使用了sds来保存命令参数,这是一种基于C语言字符串的扩展,它提供了O(1)的复杂度的字符串常量级别的长度计算、二进制安全、防止缓冲区溢出等特性。在命令解析过程中,通过读取输入缓冲区的内容,将参数解析成sds对象,并将这些sds对象保存到一个数组中,之后便可以使用这些sds对象来执行后续的操作。
二、命令执行
命令执行是Redis命令处理的核心部分,它决定了Redis系统的性能。在Redis中,命令执行器是由多个不同的函数组成的,每个函数都用于执行不同的命令。例如,SET命令的执行函数是setCommand,而GET命令的执行函数是getCommand。命令执行器使用命令名来查找对应的执行函数,并将sds数组作为参数传递给该函数。执行函数会根据具体命令实现相应的操作。通常情况下,命令操作会涉及到对内存数据的读写,这里需要调用Redis底层的数据结构实现来完成。命令执行完成后,执行结果将放入到回复缓冲区,该缓冲区中保存了所有待发送回复的内容,包括状态回复、错误回复、整数回复、字符串回复等。
三、命令回复
命令回复是Redis处理完客户端请求后所做的最后一步。在Redis中,回复的格式与请求的格式一样,协议规定的格式是以一个字节的命令符号开始,后面跟着一个或多个回复参数,每个参数以\\r\
结束。回复内容是由命令执行器产生的,将执行结果写入到回复缓冲区中,当需要回复客户端时,Redis会从回复缓冲区中读取数据并发送给客户端。在回复过程中,需要注意的是,由于多个客户端可能同时来访问Redis,如果在回复过程中发生了阻塞操作,可能会造成后续的访问延迟,因此需要尽可能减少阻塞操作。
综上所述,Redis命令处理过程的源码实现是一个非常复杂的过程,它涉及到命令解析、命令执行和命令回复三个核心环节。虽然Redis内部的实现细节非常复杂,但是由于Redis提供了非常完善的命令和API,用户可以通过这些接口来方便地实现存储和数据处理等功能,这使得Redis成为现代Web应用中必不可少的高性能存储系统。