NSQ 源码阅读 (四) diskqueue 文件读写
引言我们在这个系列第一篇文章中提到过,如果需要消息落地而对存储子系统的选择上,从速度上来说 何时写入文件?在内存的msg chan buffer 已满的时候,会将msg 写入文件,代码如下: func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b,m,c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf(LOG_ERROR,"CHANNEL(%s): failed to write message to backend - %s",c.name,err) return err } } return nil } 写一条messagefunc (d *diskQueue) writeOne(data []byte) error {
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { ... writePos int64 ... writeFile *os.File ... } 利用Seek 函数将写文件的偏移量设置为 if d.writePos > 0 { _,err = d.writeFile.Seek(d.writePos,0) 然后以二进制的方式写入data的size: dataLen := int32(len(data)) d.writeBuf.Reset() err = binary.Write(&d.writeBuf,binary.BigEndian,dataLen) 此处的巧妙在于 _,err = d.writeBuf.Write(data) _,err = d.writeFile.Write(d.writeBuf.Bytes()) 读一条message
func (d *diskQueue) readOne() ([]byte,error) {
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { readPos int64 ... readFile *os.File 利用Seek 函数将当前文件的偏移量设置为readPos: if d.readPos > 0 { _,err = d.readFile.Seek(d.readPos,0) if err != nil { d.readFile.Close() d.readFile = nil return nil,err } } 先把一个message的大小读出来: err = binary.Read(d.reader,&msgSize)
readBuf := make([]byte,msgSize) _,err = io.ReadFull(d.reader,readBuf) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |