加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

NSQ 源码阅读 (四) diskqueue 文件读写

发布时间:2020-12-16 18:05:09 所属栏目:大数据 来源:网络整理
导读:引言 我们在这个系列第一篇文章中提到过,如果需要消息落地而对存储子系统的选择上,从速度上来说 文件系统分布式KV(持久化)分布式文件系统数据库 。而NSQ选择了文件系统作为存储子系统。这篇文章将重点介绍nsq 对于文件的操作。 何时写入文件? 在内存的m

引言

我们在这个系列第一篇文章中提到过,如果需要消息落地而对存储子系统的选择上,从速度上来说 文件系统>分布式KV(持久化)>分布式文件系统>数据库。而NSQ选择了文件系统作为存储子系统。这篇文章将重点介绍nsq 对于文件的操作。

何时写入文件?

在内存的msg chan buffer 已满的时候,会将msg 写入文件,代码如下:

func (c *Channel) put(m *Message) error {
    select {
    case c.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b,m,c.backend)
        bufferPoolPut(b)
        c.ctx.nsqd.SetHealth(err)
        if err != nil {
            c.ctx.nsqd.logf(LOG_ERROR,"CHANNEL(%s): failed to write message to backend - %s",c.name,err)
            return err
        }
    }
    return nil
}

写一条message

func (d *diskQueue) writeOne(data []byte) error {

diskQueue维护了写文件和写文件的offset

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
    ...
    writePos     int64
    ...
    writeFile *os.File
    ...
}

利用Seek 函数将写文件的偏移量设置为writePos:

if d.writePos > 0 {
            _,err = d.writeFile.Seek(d.writePos,0)

然后以二进制的方式写入data的size:

dataLen := int32(len(data))
d.writeBuf.Reset()
err = binary.Write(&d.writeBuf,binary.BigEndian,dataLen)

此处的巧妙在于binary.Write会根据写入数据的类型写入一段固定大小的数据。此处dataLen 是int32,所以会写入一段4个byte的数据来表示data的size。读取的时候先读一段4个byte的数据就知道了data的size。
之后写入data:

_,err = d.writeBuf.Write(data)
_,err = d.writeFile.Write(d.writeBuf.Bytes())

读一条message

readOne函数以byte 数组的形式读一条message 出来

func (d *diskQueue) readOne() ([]byte,error) {

diskQueue 维护了当前读取的文件和文件的offset

// diskQueue implements a filesystem backed FIFO queue
type diskQueue struct {
readPos      int64
...
readFile  *os.File

利用Seek 函数将当前文件的偏移量设置为readPos:

if d.readPos > 0 {
            _,err = d.readFile.Seek(d.readPos,0)
            if err != nil {
                d.readFile.Close()
                d.readFile = nil
                return nil,err
            }
        }

先把一个message的大小读出来:

err = binary.Read(d.reader,&msgSize)

msgSize 和写文件时候的dataLen都是int32类型
有了msgSize,定义一段msgSize大小的buffer,从文件里读一段数据来填满这个buffer,buffer里面的数据就是一条message

readBuf := make([]byte,msgSize)
    _,err = io.ReadFull(d.reader,readBuf)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读