sqlite在多线程下的应用
这几天研究了一下SQLite这个嵌入式数据库在多线程环境下的应用,感觉里面的学问还挺多,于是就在此分享一下。 3.4.0 - iPhone OS 2.2.1当然,你也可以自己编译最新版本。 不过这个线程安全仍然是有限制的,在这篇 《Is SQLite thread-safe?》 里有详细的解释。 另一篇重要的文档就是 《SQLite And Multiple Threads》 。它指出SQLite支持3种线程模式:
另一个要说明的是prepared statement,它是由数据库连接(的pager)来管理的,使用它也可看成使用这个数据库连接。因此在多线程模式下,并发对同一个数据库连接调用sqlite3_prepare_v2()来创建prepared statement,或者对同一个数据库连接的任何prepared statement并发调用sqlite3_bind_*()和sqlite3_step()等函数都会出错(在iOS上,该线程会出现EXC_BAD_ACCESS而中止)。这种错误无关读写,就是只读也会出错。文档中给出的安全使用规则是:没有事务正在等待执行,所有prepared statement都被 finalized 。 顺带一提,调用sqlite3_threadsafe()可以获得编译期的SQLITE_THREADSAFE参数。标准发行版是1,也就是串行模式;而iOS上是2,也就是多线程模式;Python的sqlite3模块也默认使用串行模式,可以用sqlite3.threadsafety来配置。但是默认情况下,一个线程只能使用当前线程打开的数据库连接,除非在连接时设置了check_same_thread=False参数。 现在3种模式都有所了解了,清楚SQLite并不是对多线程无能为力后,接下来就了解下 事务 吧。 数据库只有在事务中才能被更改。所有更改数据库的命令(除SELECT以外的所有SQL命令)都会自动开启一个新事务,并且当最后一个查询完成时自动提交。 而BEGIN命令可以手动开始事务,并关闭自动提交。当下一条COMMIT命令执行时,自动提交再次打开,事务中所做的更改也被写入数据库。当COMMIT失败时,自动提交仍然关闭,以便让用户尝试再次提交。若执行的是ROLLBACK命令,则也打开自动提交,但不保存事务中的更改。关闭数据库或遇到错误时,也会自动回滚事务。 经常有人抱怨 SQLite的插入太慢 ,实际上它可以做到每秒插入几万次,但是每秒只能提交几十次事务。因此在插入大批数据时,可以通过禁用自动提交来提速。 事务在改写数据库文件时,会先生成一个rollback journal(回滚日志),记录初始状态(其实就是备份),所有改动都是在数据库文件上进行的。当事务需要回滚时,可以将备份文件的内容还原到数据库文件;提交成功时,默认的delete模式下会直接删除这个日志。这个日志也可以帮助解决事务执行过程中断电,导致数据库文件损坏的问题。但如果操作系统或文件系统有bug,或是磁盘损坏,则仍有可能无法恢复。 而从3.7.0版本(对应iOS 4.3)开始,SQLite还提供了 Write-Ahead Logging 模式。与delete模式相比,WAL模式在大部分情况下更快,并发性更好,读和写之间互不阻塞;而其缺点对于iPhone这种嵌入式设备来说可以忽略,只需注意不要以只读方式打开WAL模式的数据库即可。 使用WAL模式时,改写操是附加(append)到WAL文件,而不改动数据库文件,因此数据库文件可以被同时读取。当执行checkpoint操作时,WAL文件的内容会被写回数据库文件。当WAL文件达到SQLITE_DEFAULT_WAL_AUTOCHECKPOINT(默认值是1000)页(默认大小是1KB)时,会自动使用当前COMMIT的线程来执行checkpoint操作。也可以关闭自动checkpoint,改为手动定期checkpoint。 为了避免读取的数据不一致,查询时也需要读取WAL文件,并记录一个结尾标记(end mark)。这样的代价就是读取会变得稍慢,但是写入会变快很多。要提高查询性能的话,可以减小WAL文件的大小,但写入性能也会降低。 需要注意的是,低版本的SQLite不能读取高版本的SQLite生成的WAL文件,但是数据库文件是通用的。这种情况在用户进行iOS降级时可能会出现,可以把模式改成delete,再改回WAL来修复。 要对一个数据库连接启用WAL模式,需要执行“PRAGMA journal_mode=WAL;”这条命令,它的默认值是“journal_mode=DELETE”。执行后会返回新的journal_mode字符串值,即成功时为"wal",失败时为之前的模式(例如"delete")。一旦启用WAL模式后,数据库会保持这个模式,这样下次打开数据库时仍然是WAL模式。 要停止自动checkpoint,可以使用 wal_autocheckpoint 指令或 sqlite3_wal_checkpoint() 函数。手动执行checkpoint可以使用 wal_checkpoint 函数。 还有一个很重要的知识点需要强调:事务是和数据库连接相关的,每个数据库连接(使用pager来)维护自己的事务,且同时只能有一个事务(但是可以用 SAVEPOINT 来实现内嵌事务)。 也就是说,事务与线程无关,一个线程里可以同时用多个数据库连接来完成多个事务,而多个线程也可以同时(非并发)使用一个数据库连接来共同完成一个事务。 下面用Python来演示一下: # -*- coding: utf-8 -*- import sqlite3 import threading def f(): con.rollback() con = sqlite3.connect('test.db',check_same_thread=False) # 允许在其他线程中使用这个连接 cu = con.cursor() cu.execute('CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY)') print cu.execute('SELECT count(*) FROM test').fetchone()[0] # 0 cu.execute('INSERT INTO test VALUES (NULL)') print cu.execute('SELECT count(*) FROM test').fetchone()[0] # 1 thread = threading.Thread(target=f) thread.start() thread.join() print cu.execute('SELECT count(*) FROM test').fetchone()[0] # 0 cu.close() con.close()在这个例子中,虽然是在子线程中执行rollback,但由于和主线程用的是同一个数据库连接,所以主线程所做的更改也被回滚了。 而如果是用不同的数据库连接,每个连接都不能读取其他连接中未提交的数据,除非使用 read-uncommitted 模式。 而要实现事务,就不得不用到 锁 一个SQLite数据库文件有5种锁的状态:
另外,read-uncommitted和WAL模式会影响这个锁的机制。在这2种模式下,读线程不会被写线程阻塞,即使写线程持有PENDING或EXCLUSIVE锁。 提到锁就不得不说到死锁的问题,而SQLite也可能出现死锁。 下面举个例子: 连接1:BEGIN (UNLOCKED)现在2个连接都在等待对方释放锁,于是就死锁了。当然,实际情况并没那么糟糕,任何一方选择不继续等待,回滚事务就行了。 不过要更好地解决这个问题,就必须更深入地了解事务了。 实际上BEGIN语句可以有3种起始状态: 现在考虑2个事务在开始时都使用IMMEDIATE方式: 连接1:BEGIN IMMEDIATE (RESERVED) 连接1:SELECT ... (RESERVED) 连接1:INSERT ... (RESERVED) 连接2:BEGIN IMMEDIATE (尝试获取RESERVED锁,但已有RESERVED锁未释放,因此事务开始失败,返回SQLITE_BUSY,等待用户重试) 连接1:COMMIT (EXCLUSIVE,写入完成后释放) 连接2:BEGIN IMMEDIATE (RESERVED) 连接2:SELECT ... (RESERVED) 连接2:INSERT ... (RESERVED) 连接2:COMMIT (EXCLUSIVE,写入完成后释放) 这样死锁就被避免了。 而EXCLUSIVE方式则更为严苛,即使其他连接以DEFERRED方式开启事务也不会死锁: 连接1:BEGIN EXCLUSIVE (EXCLUSIVE) 连接1:SELECT ... (EXCLUSIVE) 连接1:INSERT ... (EXCLUSIVE) 连接2:BEGIN (UNLOCKED) 连接2:SELECT ... (尝试获取SHARED锁,但已有EXCLUSIVE锁未释放,返回SQLITE_BUSY,等待用户重试) 连接1:COMMIT (EXCLUSIVE,写入完成后释放) 连接2:SELECT ... (SHARED) 连接2:INSERT ... (RESERVED) 连接2:COMMIT (EXCLUSIVE,写入完成后释放) 不过在并非很高的情况下,直接获取EXCLUSIVE锁的难度比较大;而且为了避免EXCLUSIVE状态长期阻塞其他请求,最好的方式还是让所有写事务都以IMMEDIATE方式开始。 顺带一提,要实现重试的话,可以使用sqlite3_busy_timeout()或sqlite3_busy_handler()函数。 由此可见,要想保证线程安全的话,可以有这4种方式: 接下来就一一测试这几种方式在iPhone 4(iOS 4.3.3,SQLite 3.7.2)上的性能表现。 第一种方式太过麻烦,需要线程间通信,这里我就忽略了。 第二种方式可以用dispatch_queue_create()来创建一个serial queue,或者用一个maxConcurrentOperationCount为1的NSOperationQueue来实现。 这种方式的缺点就是事务必须在一个block或operation里完成,否则会乱序;而耗时较长的事务会阻塞队列。另外,没法利用多核CPU的优势。 先初始化数据库: #import <sqlite3.h> static char dbPath[200]; static sqlite3 *database; static sqlite3 *openDb() { if (sqlite3_open(dbPath,&database) != SQLITE_OK) { sqlite3_close(database); NSLog(@"Failed to open database: %s",sqlite3_errmsg(database)); } return database; } - (void)viewDidLoad { [super viewDidLoad]; sqlite3_config(SQLITE_CONFIG_SINGLETHREAD); NSLog(@"%d",sqlite3_threadsafe()); NSLog(@"%s",sqlite3_libversion()); NSArray *paths = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory,NSUserDomainMask,YES); NSString *documentsDirectory = [paths objectAtIndex:0]; strcpy(dbPath,[[documentsDirectory stringByAppendingPathComponent:@"data.sqlite3"] UTF8String]); database = openDb(); char *errorMsg; if (sqlite3_exec(database,"CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY AUTOINCREMENT,value INTEGER);",NULL,&errorMsg) != SQLITE_OK) { NSLog(@"Failed to create table: %s",errorMsg); } } 再插入1000条测试数据: static void insertData() { char *errorMsg; if (sqlite3_exec(database,"BEGIN TRANSACTION",NULL,&errorMsg) != SQLITE_OK) { NSLog(@"Failed to begin transaction: %s",errorMsg); } static const char *insert = "INSERT INTO test VALUES (NULL,?);"; sqlite3_stmt *stmt; if (sqlite3_prepare_v2(database,insert,-1,&stmt,NULL) == SQLITE_OK) { for (int i = 0; i < 1000; ++i) { sqlite3_bind_int(stmt,1,arc4random()); if (sqlite3_step(stmt) != SQLITE_DONE) { --i; NSLog(@"Error inserting table: %s",sqlite3_errmsg(database)); } sqlite3_reset(stmt); } sqlite3_finalize(stmt); } if (sqlite3_exec(database,"COMMIT TRANSACTION",&errorMsg) != SQLITE_OK) { NSLog(@"Failed to commit transaction: %s",errorMsg); } static const char *query = "SELECT count(*) FROM test;"; if (sqlite3_prepare_v2(database,query,NULL) == SQLITE_OK) { if (sqlite3_step(stmt) == SQLITE_ROW) { NSLog(@"Table size: %d",sqlite3_column_int(stmt,0)); } else { NSLog(@"Failed to read table: %s",sqlite3_errmsg(database)); } sqlite3_finalize(stmt); } } 然后创建一个串行队列: static dispatch_queue_t queue; - (void)viewDidLoad { // ... queue = dispatch_queue_create("net.keakon.db",NULL); } 再设置一个计数器,每秒执行一次: static int lastReadCount = 0; static int readCount = 0; static int lastWriteCount = 0; static int writeCount = 0; - (void)count { int lastRead = lastReadCount; int lastWrite = lastWriteCount; lastReadCount = readCount; lastWriteCount = writeCount; NSLog(@"%d,%d",lastReadCount - lastRead,lastWriteCount - lastWrite); } - (void)viewDidLoad { // ... [NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(count) userInfo:nil repeats:YES]; } 这样就可以开始测试select和update了: static void readData() { static const char *query = "SELECT value FROM test WHERE value < ? ORDER BY value DESC LIMIT 1;"; void (^ __block readBlock)() = Block_copy(^{ sqlite3_stmt *stmt; if (sqlite3_prepare_v2(database,NULL) == SQLITE_OK) { sqlite3_bind_int(stmt,arc4random()); int returnCode = sqlite3_step(stmt); if (returnCode == SQLITE_ROW || returnCode == SQLITE_DONE) { ++readCount; } sqlite3_finalize(stmt); } else { NSLog(@"Failed to prepare statement: %s",sqlite3_errmsg(database)); } dispatch_async(queue,readBlock); }); dispatch_async(queue,readBlock); } static void writeData() { static const char *update = "UPDATE test SET value = ? WHERE id = ?;"; void (^ __block writeBlock)() = Block_copy(^{ sqlite3_stmt *stmt; if (sqlite3_prepare_v2(database,update,arc4random()); sqlite3_bind_int(stmt,2,arc4random() % 1000 + 1); if (sqlite3_step(stmt) == SQLITE_DONE) { ++writeCount; } sqlite3_finalize(stmt); } else { NSLog(@"Failed to prepare statement: %s",writeBlock); }); dispatch_async(queue,writeBlock); } 这里是用dispatch_async()来异步地递归调用block。 因为block是在栈里生成的,异步执行时已经被销毁,所以需要copy到堆。因为需要一直执行,所以我就没release了。 此外,光copy的话还是无法正常执行,但是把block本身的存储类型设为__block后就正常了,原因我也不清楚。 测试结果为只读时平均每秒165次,只写时每秒68次,同时读写时每秒各47次。换成多线程或串行模式时,效率也差不多。 接着试试WAL模式: if (sqlite3_exec(database,"PRAGMA journal_mode=WAL;",&errorMsg) != SQLITE_OK) { NSLog(@"Failed to set WAL mode: %s",errorMsg); } sqlite3_wal_checkpoint(database,NULL); // 每次测试前先checkpoint,避免WAL文件过大而影响性能 测试结果为只读时平均每秒166次,只写时每秒244次,同时读写时每秒各97次。并发性增加了1倍有木有!更夸张的是写入比读取还快了。 在自编译的3.7.8版中,同时读写为每秒各102次,加上SQLITE_THREADSAFE=0参数后为每秒各104次,性能稍有提升。 第三种方式需要打开和关闭数据库连接,所以会额外消耗一些时间。此外还要维持各个连接间的互斥,事务也比较容易冲突,但能确保事务正确执行。 首先需要移除全局的database变量,并修改openDb()函数: static sqlite3 *openDb() { sqlite3 *database = NULL; if (sqlite3_open(dbPath,&database) != SQLITE_OK) { sqlite3_close(database); NSLog(@"Failed to open database: %s",sqlite3_errmsg(database)); } return database; } 再配置成多线程模式: sqlite3_config(SQLITE_CONFIG_MULTITHREAD); 队列改成可以乱序执行的: queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND,0); 然后是访问数据库: static void readData() { static const char *query = "SELECT value FROM test WHERE value < ? ORDER BY value DESC LIMIT 1;"; dispatch_async(queue,^{ sqlite3 *database = openDb(); sqlite3_stmt *stmt; if (sqlite3_prepare_v2(database,NULL) == SQLITE_OK) { while (YES) { sqlite3_bind_int(stmt,arc4random()); int returnCode = sqlite3_step(stmt); if (returnCode == SQLITE_ROW || returnCode == SQLITE_DONE) { ++readCount; } sqlite3_reset(stmt); } sqlite3_finalize(stmt); } else { NSLog(@"Failed to prepare statement: %s",sqlite3_errmsg(database)); } sqlite3_close(database); }); } static void writeData() { static const char *update = "UPDATE test SET value = ? WHERE id = ?;"; dispatch_async(queue,nil) == SQLITE_OK) { while (YES) { sqlite3_bind_int(stmt,arc4random()); sqlite3_bind_int(stmt,arc4random() % 1000 + 1); if (sqlite3_step(stmt) == SQLITE_DONE) { ++writeCount; } sqlite3_reset(stmt); } sqlite3_finalize(stmt); } else { NSLog(@"Failed to prepare statement: %s",sqlite3_errmsg(database)); } sqlite3_close(database); }); } 这里就无需递归调用了,直接在子线程中循环即可。 测试结果为只读时平均每秒164次,只写时每秒68次,同时读写时分别为每秒14和30次(波动很大)。此外,这种方式因为最初启动的几个线程持续访问数据库,后加入的线程会滞后几秒才启动,且很难打开数据库连接或创建prepare statement。调试时发现只会启用2个线程,但是随队列中block数目的增加,读性能增高,写性能降低。读写各3个block时分别为每秒35和14次。 WAL模式下甚至连初始时启动2个线程都会被lock,因此只能改成不断重试: static void readData() { static const char *query = "SELECT value FROM test WHERE value < ? ORDER BY value DESC LIMIT 1;"; dispatch_async(queue,^{ sqlite3 *database = openDb(); sqlite3_stmt *stmt; while (sqlite3_prepare_v2(database,NULL) != SQLITE_OK); while (YES) { sqlite3_bind_int(stmt,arc4random()); int returnCode = sqlite3_step(stmt); if (returnCode == SQLITE_ROW || returnCode == SQLITE_DONE) { ++readCount; } sqlite3_reset(stmt); } sqlite3_finalize(stmt); sqlite3_close(database); }); } static void writeData() { static const char *update = "UPDATE test SET value = ? WHERE id = ?;"; dispatch_async(queue,nil) != SQLITE_OK); while (YES) { sqlite3_bind_int(stmt,arc4random() % 1000 + 1); if (sqlite3_step(stmt) == SQLITE_DONE) { ++writeCount; } sqlite3_reset(stmt); } sqlite3_finalize(stmt); sqlite3_close(database); }); } 结果为只读时平均每秒169次,只写时每秒246次,同时读写时每秒分别为90和57次(波动较大)。并发效率有了显著提升,但仍不及第二种方式。 第四种方式相当于让SQLite来维护队列,只不过SQL的执行是乱序的,因此无法保证事务性。 先恢复全局的database变量,然后配置成串行模式: sqlite3_config(SQLITE_CONFIG_SERIALIZED); 再是访问数据库: if (sqlite3_prepare_v2(database,sqlite3_errmsg(database)); } }); } static void writeData() { static const char *update = "UPDATE test SET value = ? WHERE id = ?;"; dispatch_async(queue,sqlite3_errmsg(database)); } }); } 测试结果为只读时平均每秒164次,只写时每秒68次,同时读写时每秒分别为57和43次。读线程比写线程的速率更高,而且新线程的加入不需要等待。
转载自http://www.keakon.net/2011/10/25/SQLite%E5%9C%A8%E5%A4%9A%E7%BA%BF%E7%A8%8B%E7%8E%AF%E5%A2%83%E4%B8%8B%E7%9A%84%E5%BA%94%E7%94%A8 参考: http://sqlite.org/pragma.html#pragma_read_uncommitted http://www.sqlite.org/threadsafe.html (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |