点击(此处)折叠或打开
- struct _rio {
- /* Backend functions.
- * Since this functions do not tolerate short writes or reads the return
- * value is simplified to: zero on error, non zero on complete success. */
- size_t (*read)(struct _rio *, void *buf, size_t len);
- size_t (*write)(struct _rio *, const void *buf, size_t len);
- off_t (*tell)(struct _rio *);
- int (*flush)(struct _rio *);
- /* The update_cksum method if not NULL is used to compute the checksum of
- * all the data that was read or written so far. The method should be
- * designed so that can be called with the current checksum, and the buf
- * and len fields pointing to the new block of data to add to the checksum
- * computation. */
- void (*update_cksum)(struct _rio *, const void *buf, size_t len);
- /* The current checksum */
- uint64_t cksum;
- /* number of bytes read or written */
- size_t processed_bytes;
- /* maximum single read or write chunk size */
- size_t max_processing_chunk;
- /* Backend-specific vars. */
- union {
- /* In-memory buffer target. */
- struct {
- sds ptr;
- off_t pos;
- } buffer;
- /* Stdio file pointer target. */
- struct {
- FILE *fp;
- off_t buffered; /* Bytes written since last fsync. */
- off_t autosync; /* fsync after 'autosync' bytes written. */
- } file;
- /* Multiple FDs target (used to write to N sockets). */
- struct {
- int *fds; /* File descriptors. */
- int *state; /* Error state of each fd. 0 (if ok) or errno. */
- int numfds;
- off_t pos;
- sds buf;
- } fdset;
- } io;
- };
- typedef struct _rio rio;
1. 内存buffer,内部由sds表示
2. 文件指针,内部由FILE*表示
3. 文件描述符数组,内部由int *表示
rio结构留给外部调用的接口有read,write,tell,flush和update_cksum,而整个rio留给外部调用的接口是将结构体的接口进一步封装了:
点击(此处)折叠或打开
- static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
- while (len) {
- size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
- if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
- if (r->write(r,buf,bytes_to_write) == 0) /*0表示失败;非0表示成功*/
- return 0;
- buf = (char*)buf + bytes_to_write;
- len -= bytes_to_write;
- r->processed_bytes += bytes_to_write;
- }
- return 1;
- }
- static inline size_t rioRead(rio *r, void *buf, size_t len) {
- while (len) {
- size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
- if (r->read(r,buf,bytes_to_read) == 0)/*0表示失败;非0表示成功*/
- return 0;
- if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
- buf = (char*)buf + bytes_to_read;
- len -= bytes_to_read;
- r->processed_bytes += bytes_to_read;
- }
- return 1;
- }
- static inline off_t rioTell(rio *r) {
- return r->tell(r);
- }
- static inline int rioFlush(rio *r) {
- return r->flush(r);
- }
接下来聊聊内部实现,还是按照之前说过的顺序内存buffer-> 文件指针->文件描述符数组来进行,其总体思路就是自定义对应类别的io函数,然后实例化结构体,将对应io函数填入结构体
1. 内存buffer主要是针对sds的封装
点击(此处)折叠或打开
- /* Returns 1 or 0 for success/failure. */
- static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
- r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
- r->io.buffer.pos += len;
- return 1;
- }
- /* Returns 1 or 0 for success/failure. */
- static size_t rioBufferRead(rio *r, void *buf, size_t len) {
- if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
- return 0; /* not enough buffer to return len bytes. */
- memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
- r->io.buffer.pos += len;
- return 1;
- }
- /* Returns read/write position in buffer. */
- static off_t rioBufferTell(rio *r) {
- return r->io.buffer.pos;
- }
- /* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
- static int rioBufferFlush(rio *r) {
- UNUSED(r);
- return 1; /* Nothing to do, our write just appends to the buffer. */
- }
- static const rio rioBufferIO = {
- rioBufferRead,
- rioBufferWrite,
- rioBufferTell,
- rioBufferFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
- };
- void rioInitWithBuffer(rio *r, sds s) {
- *r = rioBufferIO;
- r->io.buffer.ptr = s;
- r->io.buffer.pos = 0;
- }
2. 文件指针操作的封装:
点击(此处)折叠或打开
- /* Returns 1 or 0 for success/failure. */
- static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
- size_t retval;
- retval = fwrite(buf,len,1,r->io.file.fp);
- r->io.file.buffered += len;
- if (r->io.file.autosync &&
- r->io.file.buffered >= r->io.file.autosync)
- {
- fflush(r->io.file.fp);
- aof_fsync(fileno(r->io.file.fp));
- r->io.file.buffered = 0;
- }
- return retval;
- }
- /* Returns 1 or 0 for success/failure. */
- static size_t rioFileRead(rio *r, void *buf, size_t len) {
- return fread(buf,len,1,r->io.file.fp);
- }
- /* Returns read/write position in file. */
- static off_t rioFileTell(rio *r) {
- return ftello(r->io.file.fp);
- }
- /* Flushes any buffer to target device if applicable. Returns 1 on success
- * and 0 on failures. */
- static int rioFileFlush(rio *r) {
- return (fflush(r->io.file.fp) == 0) ? 1 : 0;
- }
- static const rio rioFileIO = {
- rioFileRead,
- rioFileWrite,
- rioFileTell,
- rioFileFlush,
- NULL, /* update_checksum */
- 0, /* current checksum */
- 0, /* bytes read or written */
- 0, /* read/write chunk size */
- { { NULL, 0 } } /* union for io-specific vars */
- };
- void rioInitWithFile(rio *r, FILE *fp) {
- *r = rioFileIO;
- r->io.file.fp = fp;
- r->io.file.buffered = 0;
- r->io.file.autosync = 0;
- }
3. 文件描述符数组,这里要注意的是写操作,如果要写的buffer为空,会写入文件描述符数组自身的buffer,并且每个文件描述符最多写1024个字节,可以并行执行,代码如下:
点击(此处)折叠或打开
- /* Returns 1 or 0 for success/failure.
- * The function returns success as long as we are able to correctly write
- * to at least one file descriptor.
- *
- * When buf is NULL and len is 0, the function performs a flush operation
- * if there is some pending buffer, so this function is also used in order
- * to implement rioFdsetFlush(). */
- static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
- ssize_t retval;
- int j;
- unsigned char *p = (unsigned char*) buf;
- int doflush = (buf == NULL && len == 0);
- /* To start we always append to our buffer. If it gets larger than
- * a given size, we actually write to the sockets. */
- if (len) {
- r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
- len = 0; /* Prevent entering the while below if we don't flush. */
- if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1;
- }
- if (doflush) {
- p = (unsigned char*) r->io.fdset.buf;
- len = sdslen(r->io.fdset.buf);
- }
- /* Write in little chunchs so that when there are big writes we
- * parallelize while the kernel is sending data in background to
- * the TCP socket. */
- while(len) {
- size_t count = len < 1024 ? len : 1024;
- int broken = 0;
- for (j = 0; j < r->io.fdset.numfds; j++) {
- if (r->io.fdset.state[j] != 0) {
- /* Skip FDs alraedy in error. */
- broken++;
- continue;
- }
- /* Make sure to write 'count' bytes to the socket regardless
- * of short writes. */
- size_t nwritten = 0;
- while(nwritten != count) {
- retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
- if (retval <= 0) {
- /* With blocking sockets, which is the sole user of this
- * rio target, EWOULDBLOCK is returned only because of
- * the SO_SNDTIMEO socket option, so we translate the error
- * into one more recognizable by the user. */
- if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
- break;
- }
- nwritten += retval;
- }
- if (nwritten != count) {
- /* Mark this FD as broken. */
- r->io.fdset.state[j] = errno;
- if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
- }
- }
- if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */
- p += count;