golang+数据库定时任务
发布时间:2020-12-16 18:42:42 所属栏目:大数据 来源:网络整理
导读:golang+数据库定时任务 项目背景大致如下,楼主在用nodejs写项目时遇到一些需要定时去处理的事情,例如僵尸用户定时清除,一些产品定时下架,邮件定时发送等等! 期初使用nodejs setTimeOut递归嵌套实现,后来发现内存不断飙升,故而放弃,最终改用了性能不错
golang+数据库定时任务项目背景大致如下,楼主在用nodejs写项目时遇到一些需要定时去处理的事情,例如僵尸用户定时清除,一些产品定时下架,邮件定时发送等等! 期初使用nodejs setTimeOut递归嵌套实现,后来发现内存不断飙升,故而放弃,最终改用了性能不错的golang实现 数据库设计
大致实现流程
for { time.Sleep(10 * time.Second) go execTask(*db) //使用子进程执行,防止卡死主进程 }
rows,err := db.Query("SELECT id,name,status,type,fn,fn_type,separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());")
res,err := db.Exec(fn)
func setTaskNextExecTime(db sql.DB,taskId string,separateTime int64) error { next_exec_time := time.Now().Unix() + separateTime nextTime := time.Unix(next_exec_time,999) res,err := db.Exec("UPDATE tasks set status = 1,last_exec_time=now(),next_exec_time=$2 WHERE id = $1::uuid",taskId,nextTime) res = nil log.Println(res) return err; } 优缺点优点: 1. 所有任务执行状态都可以查询到,例如任务异常或者上次执行时间,下次执行时间 2. 增加一个定时任务,只需要在数据库插入一条记录就OK 缺点: 1. 如果要绑定非数据库可操作任务,需要自己扩展 项目源码// MTask project main.go package main import ( "database/sql" _ "github.com/lib/pq" "log" "time" "os" "io/ioutil" "encoding/json" ) //配置结构体 type Conf struct { Db map[string] string } //读取配置文件 func readConf(path string) (Conf,error) { var c Conf var err error fi,err := os.Open(path) if err != nil { return c,err } else { defer fi.Close() //读取配置文件 fd,err := ioutil.ReadAll(fi) if err != nil { return c,err } else { var c Conf err = json.Unmarshal(fd,&c) if err != nil { return c,err } else { return c,err } } } return c,err } func main() { c,err := readConf("./conf.json") if err != nil { log.Print(err) panic(err) } db,err := sql.Open("postgres",c.Db["postgres"]) if err != nil { log.Print(err) } else { defer db.Close() for { time.Sleep(10 * time.Second) go execTask(*db) } } } func execTask(db sql.DB) { defer func() { if err := recover(); err != nil { log.Print(err) log.Printf("执行任务时发生错误:%s",err) } }(); log.Println("开始执行任务.......") rows,separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());") if err != nil { log.Print(err) } else { defer rows.Close() for rows.Next() { var id string var name string var status int var taskType int var separateTime int64 var fn string var fnType string err = rows.Scan(&id,&name,&status,&taskType,&fn,&fnType,&separateTime) if err != nil { //记录错误,同时更新任务信息为异常 log.Print(err) err = setTaskExecFail(db,id) if err != nil { log.Print(err) } } else { if (fnType == "sql") { res,err := db.Exec(fn) if err != nil { log.Print(err) err = setTaskExecFail(db,id) if err != nil { log.Print(err) } log.Printf("任务:%s执行时出错",name) } else { res = nil log.Println(res) if taskType == 1 { err = setTaskExecSuccess(db,id) if err != nil { log.Print(err) } log.Printf("任务:%s执行完成",name) } else { err = setTaskNextExecTime(db,id,separateTime) if err != nil { log.Print(err) } } log.Printf("任务:%s执行成功",name) } } else if (fnType == "bash") { log.Printf("这是一个bash任务") } else if (fnType == "python") { log.Printf("这是一个python任务") } else if (fnType == "email") { //发送email任务 err = ExecEmailTask(db) if err != nil { handleFail(db,id) log.Println(err) } else { handleSuccess(db,id) } log.Printf("发送邮件任务") setTaskExecSuccess(db,id) setTaskNextExecTime(db,separateTime) } else if (fnType == "sms") { //发送短信任务 log.Printf("发送短信任务") } } } err = rows.Err() if err != nil { log.Print(err) } } log.Println("结束执行任务....") } func setTaskExecFail(db sql.DB,taskId string) error { res,err := db.Exec("UPDATE tasks set status = -2 WHERE id = $1::uuid",taskId) err = nil log.Println(res) return err } func setTaskExecSuccess(db sql.DB,err := db.Exec("UPDATE tasks set status = 2 WHERE id = $1::uuid",taskId) err = nil log.Println(res) return err } func setTaskNextExecTime(db sql.DB,nextTime) res = nil log.Println(res) return err; } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |