加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang bolt库操作手册

发布时间:2020-12-16 09:42:28 所属栏目:大数据 来源:网络整理
导读:打开数据库 Transactions Read-write transactions Read-Only transactions Batch read-write transactions 手动控制transaction 使用 buckets 使用keyvalue bucket自动递增整数 迭代便利keys prefix scans 范围搜索 ForEach 嵌套 buckets 今天介绍的库bolt
  • 打开数据库
  • Transactions
    • Read-write transactions
    • Read-Only transactions
    • Batch read-write transactions
    • 手动控制transaction
    • 使用 buckets
    • 使用keyvalue
    • bucket自动递增整数
    • 迭代便利keys
    • prefix scans
    • 范围搜索
    • ForEach
    • 嵌套 buckets

今天介绍的库bolt是一个纯粹用go编写的key/value数据库,这个库的目的是为了提供一个简单,快速可靠的数据库同时无需单独安装一个例如Postgres或MySQL之类的负责的数据库服务。作者在介绍里面还提及了如何通过阅读代码来了解一个数据库的基本原理,感谢作者的无私奉献。

打开数据库

package main

import (
    "os"

    "github.com/boltdb/bolt"
    "github.com/go-kit/kit/log"
)

func main() {
    logger := log.NewLogfmtLogger(os.Stdout)
    db,err := bolt.Open("mydb.db", 0600,nil)
    if err != nil {
        logger.Log("open",err)
    }
    defer db.Close()
}

Bolt在打开数据库文件的时候会获取一个文件锁,所以多个进程不能同时打开一个数据库文件。
打开一个已经Open的Bolt数据库会导致当前进程挂起直到其他进程关闭该Bolt数据库。
为了避免无限等待你可以在打开数据库文件的时候制定一个超时时间。

func worker() {
    logger := log.NewLogfmtLogger(os.Stdout)
    db,err := bolt.Open("kes.db",0600,&bolt.Options{Timeout: 1 * time.Second})
    if err != nil {
        logger.Log("open",err)
        return
    }
    db.Close()
}

func main() {
    logger := log.NewLogfmtLogger(os.Stdout)
    db,err)
        return
    }
    defer db.Close()

    go worker()

    time.Sleep(10 * time.Second)
}

如上面代码所示,main函数中打开kes.go后另起一个routine打开同样的数据库文件,就会阻塞直到超时:

$ go run bolt.go
open=timeout

Transactions

Bolt数据库同时只支持一个read-write transaction或者多个read-only transactions。
每个独立的transaction以及在这个transaction中创建的所有对象(buckerts,keys等)都不是thread safe的。如果要在多个routine中处理数据,那么必须在每个routine中单独使用一个transaction或者显式的使用lock以确保在每个时刻只有一个routine访问这个transaction.

read-only的transaction和read-write的transaction不应该相互之间有依赖,一般来说在同一个goroutine中不要同时打开这两种transaction,因为read-write transaction需要周期性的re-map数据文件,但是由于read-only transaction打开导致read-write transaction的re-map操作无法进行造成死锁。

Read-write transactions

通过DB.UPdate()打开一个read-write transaction.

if err := db.Update(func(tx *bolt.Tx) error {
        if _,err := tx.CreateBucketIfNotExists([]byte("kes")); err != nil {
            logger.Log("create failed",err)
            return err
        }
        return nil
    }); err != nil {
        logger.Log("update",err)
    }

在closure闭包内部,获取一个数据库的连续view。在closure最后返回nil完成commit的操作,可以在任何地方通过返回error完成rolleback的操作。
在read-write transaction中允许所有的数据库操作

func (db *DB) Update(fn func(*Tx) error) error { t,err := db.Begin(true) if err != nil { return err } // Make sure the transaction rolls back in the event of a panic. defer func() { if t.db != nil { t.rollback() } }() // Mark as a managed tx so that the inner function cannot manually commit. t.managed = true // If an error is returned from the function then rollback and return error. err = fn(t) t.managed = false if err != nil { _ = t.Rollback() return err } return t.Commit() }

Read-Only transactions

通过 DB.View()函数打开一个read-only transaction。

if err := db.View(func(tx *bolt.Tx) error {
        // if _,err := tx.CreateBucket([]byte("kes")); err != nil {
        //  logger.Log("create failed",err)
        //  return err
        // }
        return nil
    }); err != nil {
        logger.Log("view",err)
    }

在read-only transaction中不允许更改操作。只能获取buckets,查询value,复制数据库。
上面的代码中,如果把注释掉的代码加上,就会报错

view="tx not writable"

View常常和Cursor一起使用。

Batch read-write transactions

如果多个routine执行了写入操作,可以使用Batch

// Iterate over multiple updates in separate goroutines.
    n := 2
    ch := make(chan error)
    for i := 0; i < n; i++ {
        go func(i int) {
            ch <- db.Batch(func(tx *bolt.Tx) error {
                return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)),[]byte{})
            })
        }(i)
    }

    // Check all responses to make sure there's no error.
    for i := 0; i < n; i++ {
        if err := <-ch; err != nil {
            t.Fatal(err)
        }
    }

但是程序报错

goroutine 5 [running]:
panic(0xea9e0,0xc42000a0b0)
        /usr/local/Cellar/go/1.7.5/libexec/src/runtime/panic.go:500 +0x1a1
main.main.func2.1(0xc4200aa1c0,0x117a48,0xc4200aa1c0)
        /Users/kes/Documents/program-learn/go/src/bolt.go:57 +0x1c4
github.com/boltdb/bolt.(*DB).Update(0xc420092000,0xc4200a0000,0x0,0x0)
        /Users/kes/goproject/src/github.com/boltdb/bolt/db.go:598 +0xb5
github.com/boltdb/bolt.(*DB).Batch(0xc420092000,0x0)
        /Users/kes/goproject/src/github.com/boltdb/bolt/db.go:680 +0x306
main.main.func2(0xc42001a180,0xc420092000,0x0)
        /Users/kes/Documents/program-learn/go/src/bolt.go:58 +0x5c
created by main.main
        /Users/kes/Documents/program-learn/go/src/bolt.go:59 +0x212

在于

func u64tob(v uint64) []byte { b := make([]byte,8) binary.BigEndian.PutUint64(b,v) return b }

初始化 b:=make([]byte,8) 将8写为了0.

手动控制transaction

DB.View()DB.UPdate()函数包括了DB.Begin(),这些函数会启动transaction,执行函数,然后在返回error的时候安全的关闭transaction.
DB.View()DB.UPdate()是Bolt推荐的使用方法。

有的时候你想手动启动和结束transactionn,就可以调用DB.Begin()函数来启动一个transaction然后调用CommmitRollback()来结束transaction.
可以参考源码中的测试用例:

func ExampleDB_Begin_ReadOnly() {
    // Open the database.
    db,err := bolt.Open(tempfile(), 0666,nil)
    if err != nil {
        log.Fatal(err)
    }
    defer os.Remove(db.Path())

    // Create a bucket using a read-write transaction.
    if err := db.Update(func(tx *bolt.Tx) error {
        _,err := tx.CreateBucket([]byte("widgets"))
        return err
    }); err != nil {
        log.Fatal(err)
    }

    // Create several keys in a transaction.
    tx,err := db.Begin(true)
    if err != nil {
        log.Fatal(err)
    }
    b := tx.Bucket([]byte("widgets"))
    if err := b.Put([]byte("john"),[]byte("blue")); err != nil {
        log.Fatal(err)
    }
    if err := b.Put([]byte("abby"),[]byte("red")); err != nil {
        log.Fatal(err)
    }
    if err := b.Put([]byte("zephyr"),[]byte("purple")); err != nil {
        log.Fatal(err)
    }
    if err := tx.Commit(); err != nil {
        log.Fatal(err)
    }

    // Iterate over the values in sorted key order.
    tx,err = db.Begin(false)
    if err != nil {
        log.Fatal(err)
    }
    c := tx.Bucket([]byte("widgets")).Cursor()
    for k,v := c.First(); k != nil; k,v = c.Next() {
        fmt.Printf("%s likes %sn",k,v)
    }

    if err := tx.Rollback(); err != nil {
        log.Fatal(err)
    }

    if err := db.Close(); err != nil {
        log.Fatal(err)
    }

    // Output:
    // abby likes red
    // john likes blue
    // zephyr likes purple
}

使用 buckets

Buckets是bolt数据库中存放key/value对的地方。一个bucket 中的所有key必须是唯一的,可以通过DB.CreateBucket()CreateBucketIfNotExists来创建。

if err := db.Update(func(tx *bolt.Tx) error {
        if _,err)
    }

CreateBucketIfNotExists用来创建一个不存在的bucket,如果已经存在就不会创建。
删除一个buckets用函数Tx.DeleteBucket()
删除一个bucket,调用Tx.DeleteBucket()即可。

使用key/value

把一个key/value保存到bucket中,使用Bucket.Put()

if err := db.Update(func(tx *bolt.Tx) error {
        if _,err := tx.CreateBucketIfNotExists([]byte("kes")); err != nil {
            logger.Log("create failed",err)
            return err
        }

        b := tx.Bucket([]byte("kes"))
        err = b.Put([]byte("answer"),[]byte("42"))
        return err

        // return nil
    }); err != nil {
        logger.Log("update",err)
    }

上面代码保存”answer”->”42”保存到bucket”kes”中,为了获取这个值,可以使用 Bucket.Get() 函数。

if err := db.View(func(tx *bolt.Tx) error {
        b := tx.Bucket([]byte("kes"))
        v := b.Get([]byte("answer"))
        fmt.Printf("the anser is :%sn",v)

        return nil
    }); err != nil {
        logger.Log("view",err)
    }

    if err := db.View(func(tx *bolt.Tx) error {
        b := tx.Bucket([]byte("kes"))
        v := b.Get([]byte("answernotexist"))
        fmt.Printf("the anser is :%sn",err)
    }

看到输出

the anser is :42
the anser is :

Get()函数不会返回错误,如果key存在,则返回byte slice值,如果不存在就会返回nil。

key不存在和key对应的值是0长度的值是不一样的。

bucket自动递增整数

通过使用NextSequence(),你可以让Bolt生成一个key/value的唯一标记。

for i := 0; i < 3; i++ {
        if err := db.Update(func(tx *bolt.Tx) error {
            b := tx.Bucket([]byte("kes"))
            id,_ := b.NextSequence()
            err := b.Put(u64tob(id),[]byte("sds"))
            return err
        }); err != nil {
            logger.Log("view",err)
        }

    }

迭代便利keys

Bolt在bucket中按照byte-sorted order存储key,遍历这些key非常快。通过使用Cursor遍历key。

db.View(func(tx *bolt.Tx) error {

        b := tx.Bucket([]byte("kes"))
        c := b.Cursor()
        for k,v := c.First(); k != nil; k,v = c.Next() {
            fmt.Printf("key = %s,value = %sn",v)
        }
        return nil

    })

bolt允许Cursor移动到特定的一点,当迭代到最后一个key的时候,再次调用Next会返回nil.

First()  Move to the first key.
Last()   Move to the last key.
Seek()   Move to a specific key.
Next()   Move to the next key.
Prev()   Move to the previous key.

在迭代的过程中,如果key是non-nil但是value是nil,这就意味着key指向一个bucker而不是一个value,这时候调用Buckert.Bucket()来访问sub-bucker.

prefix scans

搜索含有特定前缀的key,结合Seekbytes.HasPrefix来实现。

db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    c := tx.Bucket([]byte("MyBucket")).Cursor()

    prefix := []byte("1234")
    for k,v := c.Seek(prefix); k != nil && bytes.HasPrefix(k,prefix); k,v = c.Next() {
        fmt.Printf("key=%s,value=%sn",v)
    }

    return nil
})

范围搜索

db.View(func(tx *bolt.Tx) error {
    // Assume our events bucket exists and has RFC3339 encoded time keys.
    c := tx.Bucket([]byte("Events")).Cursor()

    // Our time range spans the 90's decade.
    min := []byte("1990-01-01T00:00:00Z")
    max := []byte("2000-01-01T00:00:00Z")

    // Iterate over the 90's.
    for k,v := c.Seek(min); k != nil && bytes.Compare(k,max) <= 0; k,v = c.Next() {
        fmt.Printf("%s: %sn",v)
    }

    return nil
})

在前面讲过,bolt的key是依据byte-order排序的,所以seek的查询比较特别,参见下面示例代码:

if err := db.Update(func(tx *bolt.Tx) error {

        if _,err := tx.CreateBucketIfNotExists([]byte("testseek")); err != nil {
            logger.Log("create failed",err)
            return err
        }

        b := tx.Bucket([]byte("testseek"))
        // fmt.Println("bucket")

        if err := b.Put([]byte("cata"),[]byte("0001")); err != nil {
            logger.Log(err)

            return err
        }

        if err := b.Put([]byte("catc"),[]byte("0002")); err != nil {
            logger.Log(err)

            return err
        }

        if err := b.Put([]byte("catd"),[]byte("0003")); err != nil {
            logger.Log(err)

            return err
        }
        return nil
    }); err != nil {
        logger.Log("update",err)
    }
    // fmt.Printf("updaten")
    // db.View(func(tx *bolt.Tx) error {
    // b := tx.Bucket([]byte("testseek"))
    // b.ForEach(func(k,v []byte) error {
    // fmt.Printf("for each key = %s,value = %xn",v)
    // return nil
    // })
    // return nil
    // })
    // fmt.Printf("foreachn")

    //seek 借鉴seek_test的代码,在前面讲过,bolt的key是依据byte-order排序的,所以seek的查询比较特别
    if err := db.View(func(tx *bolt.Tx) error {
        c := tx.Bucket([]byte("testseek")).Cursor()

        //精确匹配,就定位到"cata"
        k,v := c.Seek([]byte("cata"))
        fmt.Printf("%s %sn",v)

        //非精确匹配,返回byte-order在"catb"之后的第一个key,就是"catc"
        k,v = c.Seek([]byte("catb"))
        fmt.Printf("%s %sn",v)

        //非精确匹配,"cat"小于当前bucket中的任何一个key,则返回第一个key
        k,v = c.Seek([]byte("cat"))
        fmt.Printf("%s %sn",v)

        //非精确匹配,""小于当前bucket中的任何一个key,则返回第一个key
        k,v = c.Seek([]byte(""))
        fmt.Printf("seek empty -> %s %sn",v)

        //非精确匹配,"catz"小于当前bucket中的任何一个key,则返回nil
        k,v = c.Seek([]byte("catz"))
        fmt.Printf("seek catz -> %s %sn",v)

        return nil
    }); err != nil {
        logger.Log(err)
    }

ForEach

可以通过ForEach()来便利bucket的所有key。

db.View(func(tx *bolt.Tx) error {
    // Assume bucket exists and has keys
    b := tx.Bucket([]byte("MyBucket"))

    b.ForEach(func(k,v []byte) error {
        fmt.Printf("key=%s,value=%sn",v)
        return nil
    })
    return nil
})

foreach返回的key/value只再transaction中有效,那么如果要在transaction之外使用,要调用copy来把数据拷贝到别的slice中。

嵌套 buckets

可以在bucket里面存储多个bucket

func createUser(accountID int,u *User) error {
    // Start the transaction.
    tx,err := db.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Retrieve the root bucket for the account.
    // Assume this has already been created when the account was set up.
    root := tx.Bucket([]byte(strconv.FormatUint(accountID, 10)))

    // Setup the users bucket.
    bkt,err := root.CreateBucketIfNotExists([]byte("USERS"))
    if err != nil {
        return err
    }

    // Generate an ID for the new user.
    userID,err := bkt.NextSequence()
    if err != nil {
        return err
    }
    u.ID = userID

    // Marshal and save the encoded user.
    if buf,err := json.Marshal(u); err != nil {
        return err
    } else if err := bkt.Put([]byte(strconv.FormatUint(u.ID, 10)),buf); err != nil {
        return err
    }

    // Commit the transaction.
    if err := tx.Commit(); err != nil {
        return err
    }

    return nil
}

上面是官网给的示例代码,比较负责,自己就写了一个简单的方便大家理解

if err := db.Update(func(tx *bolt.Tx) error {
        b := tx.Bucket([]byte("kes"))
        bkt,err := b.CreateBucketIfNotExists([]byte("user"))
        if err != nil {
            return err
        }
        bkt.Put([]byte("nest key"),[]byte("nest value"))
        return err
    }); err != nil {
        logger.Log("view",err)
    }
    db.View(func(tx *bolt.Tx) error {
        b := tx.Bucket([]byte("kes"))
        b.ForEach(func(k,v []byte) error {
            fmt.Printf("for each key = %s,v)
            return nil
        })
        return nil
    })

在上面代码中,原始的bucket “kes”中嵌套了bucket “user”,然后遍历key/value得到

for each key = answer,value = 42
for each key = newkye,value = sds
for each key = user,value =

前两个是之前添加的真正的key/value,最后一个嵌套的bucket “user”的名称也作为key被遍历出来。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读