Redis RDB

有一个常见的问题,如果Redis挂了,在启动Redis的时候怎么快速恢复数据?

如果Redis挂了,在启动的时候可以通过AOF或RDB恢复数据。

(1) 什么是RDB

RDB是Redis DataBase 的缩写,中文名为快照/内存快照。
RDB持久化是把当前进程数据生成快照保存到磁盘上的过程,可以抽象为全量备份

MySQL的全量备份通过mysqldump命令触发,Redis的RDB通过什么命令触发?
RDB可以通过savebgsave主从复制触发。


(2) 为什么要用RDB

Redis是个基于内存的数据库。那服务一旦宕机,内存中的数据将全部丢失。为了保存数据,需要有一个持久化方式,RDB是全量备份的方法。

主从复制是分布式数据系统保证可靠性的一个重要机制。RDB为Redis主从提供了数据同步机制。

(2.1) RDB优缺点

优点
RDB文件是某个时间节点的快照,默认使用LZF算法进行压缩,压缩后的文件体积远远小于内存大小,适用于备份、全量复制等场景;
Redis加载RDB文件恢复数据要远远快于AOF方式;

缺点
RDB方式实时性不够,无法做到秒级的持久化;
每次调用bgsave都需要fork子进程,fork子进程属于重量级操作,频繁执行成本较高;
RDB文件是二进制的,没有可读性,AOF文件在了解其结构的情况下可以手动修改或者补全;
版本兼容RDB文件问题;


(3) RDB的原理

Redis RDB数据格式


(4) 源码解读

(4.1) RDB创建的入口函数和触发时机

RDB 文件创建的三个时机,分别是 save命令执行bgsave命令执行 以及 主从复制
对应源码rdb.c文件中的3个函数

函数 作用 备注
rdbSave Redis Server在本地磁盘创建RDB文件。 对应save命令
rdbSaveBackground Redis Server 使用后台子进程方式,在本地磁盘创建 RDB文件 对应bgsaveCommand
rdbSaveToSlavesSockets Redis Server 在采用不落盘方式传输RDB文件进行主从复制时,创建RDB文件 对应了Redis Server 执行主从复制命令,以及周期性检测主从复制状态时触发RDB生成

(4.1.1) 同步阻塞保存-rdbSave

/*
 * 把数据库的数据保存到磁盘上
 *
 * 失败时返回 C_ERR
 * 成功时返回 C_OK 
 *
 * @param *filename 文件名
 * @param *rsi 
 */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp = NULL;
    rio rdb;
    int error = 0;

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

    // 打开文件
    fp = fopen(tmpfile,"w");
    // 文件不存在,为空
    if (!fp) {
        //  
        char *cwdp = getcwd(cwd,MAXPATHLEN);

        // 提示 打开文件失败
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }

    // rio初始化
    rioInitWithFile(&rdb,fp);

    // 标记开始保存
    startSaving(RDBFLAGS_NONE);

    // fsync 在 rdb 保存时递增 
    if (server.rdb_save_incremental_fsync)
        // todo 自动同步?
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

    // 真正把Redis数据库里的数据保存到文件 
    if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    // 确保数据不会保留在操作系统的输出缓冲区中

    // fflush 把缓冲区的数据写到文件里
    if (fflush(fp)) goto werr;

    // fsync 确保一直到写磁盘操作结束才会返回
    if (fsync(fileno(fp))) goto werr;

    // 关闭文件
    if (fclose(fp)) { fp = NULL; goto werr; }
    fp = NULL;
    

    // 使用 文件重命名 确保仅当生成的数据库文件正常时才自动更改数据库文件。 

    // 把缓存文件重命名正式文件时,可能没权限,会重命名失败  
    if (rename(tmpfile,filename) == -1) {  // 重命名失败
        char *cwdp = getcwd(cwd,MAXPATHLEN);

        // 打印日志 
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }

    // 打印日志  数据库数据已经保存到磁盘 
    serverLog(LL_NOTICE,"DB saved on disk");
    // 更新
    server.dirty = 0;
    // 更新上次保存时间
    server.lastsave = time(NULL);
    // 更新上次保存状态
    server.lastbgsave_status = C_OK;
    // 停止保存 发布事件
    stopSaving(1);
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    // 关闭文件
    if (fp) fclose(fp);
    // 删除缓存文件
    unlink(tmpfile);
    // 停止保存 发布事件
    stopSaving(0);
    return C_ERR;
}

Redis保存RDB文件的代码确实不错,在使用资源前校验资源的合法性,使用后确保数据都保存到磁盘,还通过重命名确认保存成功,并且在完后后货失败后释放对应资源。

虽然很细节很简单,但是很健壮。


(4.1.2) 异步保存-rdbSaveBackground

/*
 * 
 * @param *filename 文件名
 * @param *rsi 
 */
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;

    // 有活跃的子进程 
    if (hasActiveChildProcess()) return C_ERR;

    //  
    server.dirty_before_bgsave = server.dirty;
    //
    server.lastbgsave_try = time(NULL);
    // 打开子
    openChildInfoPipe();

    // 复制进程
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {  // 
        int retval;

        // 子进程设置调用名称
        redisSetProcTitle("redis-rdb-bgsave");
        // 设置 bgsave子进程的cpu关联列表
        redisSetCpuAffinity(server.bgsave_cpulist);
        // 在子进程里调用 rdbSave() 同步保存 
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            // 发送子进程写时复制(Copy On Write)信息 
            sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
        }
        // 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        if (childpid == -1) {
            // 关闭 
            closeChildInfoPipe();
            // 后台保存状态错误
            server.lastbgsave_status = C_ERR;
            // 不能后台保存  子进程复制出错
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }

        // 
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}

(4.1.3) 主从同步生成RDB-rdbSaveToSlavesSockets

/* 
 * 生成一个 RDB 子进程,它将 RDB 写入当前处于 SLAVE_STATE_WAIT_BGSAVE_START 状态的从属 sockets。
 */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {

    listNode *ln;
    listIter li;
    pid_t childpid;
    int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;

    // 有活跃子进程 
    if (hasActiveChildProcess()) return C_ERR;

    // 即使之前的fork子进程退出了,在我们排干管道之前不要开一个新的。
    if (server.rdb_pipe_conns) return C_ERR;

    /* Before to fork, create a pipe that is used to transfer the rdb bytes to
     * the parent, we can't let it write directly to the sockets, since in case
     * of TLS we must let the parent handle a continuous TLS state when the
     * child terminates and parent takes over. */
    if (pipe(pipefds) == -1) return C_ERR;
    // 
    server.rdb_pipe_read = pipefds[0]; /* read end */
    // 
    rdb_pipe_write = pipefds[1]; /* write end */
    // 
    anetNonBlock(NULL, server.rdb_pipe_read);

    // 创建另一个管道,父进程使用该管道向子进程发出可以退出的信号。 
    if (pipe(pipefds) == -1) {
        // 关闭rdb管道写操作 
        close(rdb_pipe_write);
        // 关闭rdb管道读
        close(server.rdb_pipe_read);
        return C_ERR;
    }
    safe_to_exit_pipe = pipefds[0]; /* read end */
    server.rdb_child_exit_pipe = pipefds[1]; /* write end */

    // 收集我们要将 RDB 传输到的 从节点 的连接,这些从节点处于 WAIT_BGSAVE_START 状态。 
    // 创建 connection数组  长度=从节点个数
    server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
    server.rdb_pipe_numconns = 0;
    server.rdb_pipe_numconns_writing = 0;
    // 链表迭代器 
    listRewind(server.slaves,&li);
    // 遍历链表
    while((ln = listNext(&li))) {
        // 从节点 
        client *slave = ln->value;
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { // 
            // 连接赋值
            server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
            // 发送 FULLRESYNC 回复
            replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
        }
    }

    // 创建父子进程通信管道
    openChildInfoPipe();
    // 创建子进程
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
        /* Child */
        int retval, dummy;
        rio rdb;

        rioInitWithFd(&rdb,rdb_pipe_write);

        // 设置进程名称  
        redisSetProcTitle("redis-rdb-to-slaves");
        // 设置 子进程的cpu关联列表
        redisSetCpuAffinity(server.bgsave_cpulist);

        // 写RDB数据
        retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
        if (retval == C_OK && rioFlush(&rdb) == 0)
            retval = C_ERR;

        if (retval == C_OK) {
            // 发送子进程写时复制信息 
            sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
        }

        //   
        rioFreeFd(&rdb);
        // 关闭rdb写管道    // 唤醒读取着,告诉他们我们已经完成 
        close(rdb_pipe_write);
        // 关闭rdb子进程退出管道    // 关闭写结束,以便我们可以检测到父级的关闭。
        close(server.rdb_child_exit_pipe); 
        // 等待退出直到父进程告诉我们它是安全的。 我们不期望读取任何内容,只是在管道关闭时收到错误。 
        dummy = read(safe_to_exit_pipe, pipefds, 1);
        // 
        UNUSED(dummy);
        // 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        close(safe_to_exit_pipe);
        if (childpid == -1) {
            // 打日志  不能后台保存 fork错误 
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));

            /* Undo the state change. The caller will perform cleanup on
             * all the slaves in BGSAVE_START state, but an early call to
             * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
            //  
            listRewind(server.slaves,&li);
            while((ln = listNext(&li))) {
                client *slave = ln->value;
                if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
                    slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
                }
            }
            // 关闭 rdb写管道
            close(rdb_pipe_write);
            // 
            close(server.rdb_pipe_read);
            // 释放内存
            zfree(server.rdb_pipe_conns);
            server.rdb_pipe_conns = NULL;
            server.rdb_pipe_numconns = 0;
            server.rdb_pipe_numconns_writing = 0;
            // 
            closeChildInfoPipe();
        } else {
            serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
                childpid);
            server.rdb_save_time_start = time(NULL);
            server.rdb_child_pid = childpid;
            server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
            updateDictResizePolicy();
            // 在父进程关闭rdb写管道,以便它可以检测到孩子的关闭。
            close(rdb_pipe_write);
            // 创建文件事件  设置回调函数 rdbPipeReadHandler
            if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
                serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
            }
        }
        return (childpid == -1) ? C_ERR : C_OK;
    }
    return C_OK; /* Unreached. */
}

(4.2) RDB文件如何生成

生成RDB文件的主要逻辑在rdbSaveRio函数

Redis RDB数据格式

主要步骤如下:

  1. 保存元数据信息,比如魔数属性信息 (类似RPC序列化里的)
  2. 保存所有数据库字典里的键值对(包括内存淘汰策略、过期时间)
  3. 保存结束符校验和 等。
/* 
 * 生成 RDB 格式的数据库转储,将其发送到指定的 Redis I/O 通道。 
 * 成功时返回 C_OK,否则返回 C_ERR,并且由于I/O错误可能会丢失部分输出或全部输出。
 *
 * 当函数返回 C_ERR 并且如果 'error' 不为 NULL 时,
 * 'error' 指向的整数将设置为紧接在 I/O 错误之后的 errno 的值。
 *
 * @param *rdb
 * @param *error
 * @param rdbflags
 * @param *rsi
 */
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;

    // 生成魔数magic    
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    // 将magic写入RDB文件
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;

    // 写入属性信息
    if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;

    // 写入 信息
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

    //  
    for (j = 0; j < server.dbnum; j++) {
        // 获取redisDb 
        redisDb *db = server.db+j;
        // 字典
        dict *d = db->dict;
        // 字典为空 跳过
        if (dictSize(d) == 0) continue;
        // 迭代器
        di = dictGetSafeIterator(d);


        // 写 SELECT DB 操作符  // 解决数据一致性问题  避免从节点写到其它库里
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        // 保存编码长度 
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Write the RESIZE DB opcode. */
        uint64_t db_size, expires_size;
        // 数据个数
        db_size = dictSize(db->dict);
        // 过期数据个数
        expires_size = dictSize(db->expires);
        // 
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        // 
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        // 
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        // 遍历DB里的每个entry 
        while((de = dictNext(di)) != NULL) {
            // 获取key  二进制安全的key
            sds keystr = dictGetKey(de);
            // 
            robj key, *o = dictGetVal(de);
            long long expire;

            // 初始化key的状态
            initStaticStringObject(key,keystr);
            // 获取key的过期时间
            expire = getExpire(db,&key);
            // 保存键值对
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;


            // 当此RDB作为AOF重写的一部分生成时,在重写时将累积的差异从父级移动到子级,以便最终写入更小。 

            if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                //  
                processed = rdb->processed_bytes;
                // 
                aofReadDiffFromParent();
            }
        }
        // 
        dictReleaseIterator(di);
        di = NULL;  // 这样我们就不会因为错误而再次发布它。 
    }

    /* If we are storing the replication information on disk, persist
     * the script cache as well: on successful PSYNC after a restart, we need
     * to be able to process any EVALSHA inside the replication backlog the
     * master will send us. */
    if (rsi && dictSize(server.lua_scripts)) {
        // 
        di = dictGetIterator(server.lua_scripts);
        // 遍历
        while((de = dictNext(di)) != NULL) {
            //  
            robj *body = dictGetVal(de);
            // 
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        // 
        dictReleaseIterator(di);
        di = NULL;  // 这样我们就不会因为错误而再次发布它。 
    }

    //  
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

    // 保存RDB文件结束符
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    // CRC64 校验和。 如果禁用校验和计算,它将为零,在这种情况下加载代码会跳过检查。
    cksum = rdb->cksum;
    // 小端编码
    memrev64ifbe(&cksum);
    // 写入校验和
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}
// file: src/rdb.h

/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_MODULE_AUX 247   /* Module auxiliary data. */
#define RDB_OPCODE_IDLE       248   /* LRU idle time. */
#define RDB_OPCODE_FREQ       249   /* LFU frequency. */
#define RDB_OPCODE_AUX        250   /* RDB aux field. */
#define RDB_OPCODE_RESIZEDB   251   /* Hash table resize hint. */
#define RDB_OPCODE_EXPIRETIME_MS 252    /* Expire time in milliseconds. */
#define RDB_OPCODE_EXPIRETIME 253       /* Old expire time in seconds. */
#define RDB_OPCODE_SELECTDB   254   /* DB number of the following keys. */
#define RDB_OPCODE_EOF        255   /* End of the RDB file. */
/* 
 * 保存一些默认的 AUX 字段,其中包含有关生成的 RDB 的信息。
 */
int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
    //  
    int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
    // 
    int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;

    // 在创建 RDB 时添加一些关于状态的字段。

    // 保存 Redis版本信息
    if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
    // 保存 Redis运行平台的架构信息 (32位还是64位) 
    if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
    // 保存 RDB文件的创建时间
    if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
    // 保存 Redis Server已使用的内存空间大小
    if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;

    /* Handle saving options that generate aux fields. */
    if (rsi) {
        // 保存  
        if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
            == -1) return -1;

        // 保存 复制id = Redis Server 的 server.replid
        if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
            == -1) return -1;

        // 保存 复制偏移量 = 主节点的复制偏移量  
        if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
            == -1) return -1;
    }

    // 保存 aof-preamble
    if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
    return 1;
}
// file: src/rdb.c

/* 
 * 保存键值对
 * 
 * 保存键值对,带过期时间、类型、键、值 
 * 错误时返回-1
 * 如果key实际保存成功,则返回1,否则返回0(key已过期)。
 * 
 * @param *rdb 
 * @param *key 
 * @param *val
 * @param expiretime
 */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    // 内存淘汰策略
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    // 过期时间
    if (expiretime != -1) {  // key是会过期的
        // 保存过期时间类型(ms)
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        // 保存过期时间(ms级)
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    // 保存LRU信息
    if (savelru) {  // 如果使用lru
        // 对象空闲时间  ms
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; // 使用秒就足够了,而且需要的空间更少。 
        // 保存类型为LRU空闲时间
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        // 保存对象空间时间 (秒级)
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    // 保存LFU信息
    if (savelfu) {
        uint8_t buf[1];
        // 
        buf[0] = LFUDecrAndReturn(val);

        // 我们可以用两个字节对其进行编码:操作码和一个8位计数器,因为频率是 0-255 范围内的对数。
        // 请注意,我们不存储减半时间,因为在加载时将其重置一次不会对频率产生太大影响。  

        // 保存类型为LFU
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        // 保存LFU信息
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }


    // 保存类型
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    // 保存key
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    // 保存value
    if (rdbSaveObject(rdb,val,key) == -1) return -1;

    // 如果需要延迟返回(用于测试)
    if (server.rdb_key_save_delay)
        usleep(server.rdb_key_save_delay);

    return 1;
}

(4.2.1) 写入数据-rdbWriteRaw

static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
    if (rdb && rioWrite(rdb,p,len) == 0)
        return -1;
    return len;
}
// file: src/io.h

/*
 * 
 * @param *r 
 * @param *buf 要写入的数据
 * @param len 数据长度
 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
    //  
    if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
    // 长度大于-0
    while (len) {
        // 要写入的字节数 
        size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        // 更新校验和
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
        // 写入数据
        if (r->write(r,buf,bytes_to_write) == 0) {
            r->flags |= RIO_FLAG_WRITE_ERROR;
            return 0;
        }
        //
        buf = (char*)buf + bytes_to_write;
        // 更新要写入的长度
        len -= bytes_to_write;
        // 
        r->processed_bytes += bytes_to_write;
    }
    return 1;
}

参考资料

[1] Redis 源码剖析与实战 - 18 | 如何生成和解读RDB文件?
[2] redis-rdb.c - github