python基于mysql实现的简单队列以及跨进程锁实例详解
通常在我们进行多进程应用开发的过程中,不可避免的会遇到多个进程访问同一个资源(临界资源)的状况,这时候必须通过加一个全局性的锁,来实现资源的同步访问(即:同一时间里只能有一个进程访问资源)。 举个例子如下: 假设我们用mysql来实现一个任务队列,实现的过程如下: 1. 在Mysql中创建Job表,用于储存队列任务,如下: create table jobs( id auto_increment not null primary key,message text not null,job_status not null default 0 ); message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列 insert into jobs(message) values('msg1'); 3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下: select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id 4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。 =========================分割线======================================= 说到跨进程的锁实现,我们主要有几种实现方式: (1)信号量 #!/usr/bin/env python2.7 # # -*- coding:utf-8 -*- # # Desc : # import logging,time import MySQLdb class Glock: def __init__(self,db): self.db = db def _execute(self,sql): cursor = self.db.cursor() try: ret = None cursor.execute(sql) if cursor.rowcount != 1: logging.error("Multiple rows returned in mysql lock function.") ret = None else: ret = cursor.fetchone() cursor.close() return ret except Exception,ex: logging.error("Execute sql "%s" failed! Exception: %s",sql,str(ex)) cursor.close() return None def lock(self,lockstr,timeout): sql = "SELECT GET_LOCK('%s',%s)" % (lockstr,timeout) ret = self._execute(sql) if ret[0] == 0: logging.debug("Another client has previously locked '%s'.",lockstr) return False elif ret[0] == 1: logging.debug("The lock '%s' was obtained successfully.",lockstr) return True else: logging.error("Error occurred!") return None def unlock(self,lockstr): sql = "SELECT RELEASE_LOCK('%s')" % (lockstr) ret = self._execute(sql) if ret[0] == 0: logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).",lockstr) return False elif ret[0] == 1: logging.debug("The lock '%s' the lock was released.",lockstr) return True else: logging.error("The lock '%s' did not exist.",lockstr) return None #Init logging def init_logging(): sh = logging.StreamHandler() logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) def main(): init_logging() db = MySQLdb.connect(host='localhost',user='root',passwd='') lock_name = 'queue' l = Glock(db) ret = l.lock(lock_name,10) if ret != True: logging.error("Can't get lock! exit!") quit() time.sleep(10) logging.info("You can do some synchronization work across processes!") ##TODO ## you can do something in here ## l.unlock(lock_name) if __name__ == "__main__": main() 在main函数里: 2.假设有多个消费者进程,从job表中取排队信息,要做的操作如下: select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id 这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。 [@tj-10-47 test]# ./glock.py 2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. 可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。 [@tj-10-47 test]# ./glock.py 2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. [@tj-10-47 test]# (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |