加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang+数据库定时任务

发布时间:2020-12-16 18:42:42 所属栏目:大数据 来源:网络整理
导读:golang+数据库定时任务 项目背景大致如下,楼主在用nodejs写项目时遇到一些需要定时去处理的事情,例如僵尸用户定时清除,一些产品定时下架,邮件定时发送等等! 期初使用nodejs setTimeOut递归嵌套实现,后来发现内存不断飙升,故而放弃,最终改用了性能不错

golang+数据库定时任务


项目背景大致如下,楼主在用nodejs写项目时遇到一些需要定时去处理的事情,例如僵尸用户定时清除,一些产品定时下架,邮件定时发送等等! 期初使用nodejs setTimeOut递归嵌套实现,后来发现内存不断飙升,故而放弃,最终改用了性能不错的golang实现

数据库设计


字段名称 含义
id 编号
name 任务名称
create_at 创建时间
type 1. 执行一次 2.循环执行
separate_time 执行间隔
status 执行状态 0.未开始 1. 执行中 -1.执行失败 -2.手动暂停
remark 备注信息
fn 要执行的数据库存储过程或函数
start_time 开始执行时间
next_exec_time 下次执行时间
last_exec_time 上次执行时间
fn_type email,sql 等等

大致实现流程


  1. 需要有一个死循环,sleep 10s启动然后sleep 10 ...
for {
			time.Sleep(10 * time.Second)
			go execTask(*db) //使用子进程执行,防止卡死主进程
		}
  1. 开始执行,查找需要执行的任务
rows,err := db.Query("SELECT id,name,status,type,fn,fn_type,separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());")
  1. 执行任务
res,err := db.Exec(fn)
  1. 执行任务成功后,更新下次执行时间
func setTaskNextExecTime(db sql.DB,taskId string,separateTime int64) error {
	next_exec_time := time.Now().Unix() + separateTime
	nextTime := time.Unix(next_exec_time,999)
	res,err := db.Exec("UPDATE tasks set status = 1,last_exec_time=now(),next_exec_time=$2 WHERE id = $1::uuid",taskId,nextTime)
	res = nil
	log.Println(res)
	return err;
}

优缺点


优点:
        1. 所有任务执行状态都可以查询到,例如任务异常或者上次执行时间,下次执行时间
        2. 增加一个定时任务,只需要在数据库插入一条记录就OK
    缺点:
        1. 如果要绑定非数据库可操作任务,需要自己扩展

项目源码


// MTask project main.go
package main

import (
	"database/sql"
	_ "github.com/lib/pq"
	"log"
	"time"
	"os"
	"io/ioutil"
	"encoding/json"
)

//配置结构体
type Conf struct {
	Db map[string] string
}

//读取配置文件
func readConf(path string) (Conf,error) {
	var c Conf
	var err error
	
	fi,err := os.Open(path)
	if err != nil {
		return c,err 
	} else {
		defer fi.Close()
		
		//读取配置文件
		fd,err := ioutil.ReadAll(fi)
		if err != nil {
			return c,err
		} else {
			var c Conf
			err = json.Unmarshal(fd,&c)
			if err != nil {
				return c,err
			} else {
				return c,err
			}
		}
	}
	return c,err
}


func main() {
	c,err := readConf("./conf.json")
	if err != nil {
		log.Print(err)
		panic(err)
	}
	db,err := sql.Open("postgres",c.Db["postgres"])
	if err != nil {
		log.Print(err)
	} else {
		defer db.Close()
		for {
			time.Sleep(10 * time.Second)
			go execTask(*db)
		}
	}
}

func execTask(db sql.DB) {
	defer func() {
		if err := recover(); err != nil {
			log.Print(err)
			log.Printf("执行任务时发生错误:%s",err)
		}
	}();
	
	log.Println("开始执行任务.......")
	rows,separate_time FROM public.tasks where (status = 0 and start_time < now()) or (status = 1 and next_exec_time < now());")
	if err != nil {
		log.Print(err)
	} else {
		defer rows.Close()
		for rows.Next() {
			var id string
			var name string
			var status int
			var taskType int
			var separateTime int64
			var fn string
			var fnType string

			err = rows.Scan(&id,&name,&status,&taskType,&fn,&fnType,&separateTime)

			if err != nil {
				//记录错误,同时更新任务信息为异常
				log.Print(err)
				err = setTaskExecFail(db,id)
				if err != nil {
					log.Print(err)
				}
			} else {
				if (fnType == "sql") {
					res,err := db.Exec(fn)
					if err != nil {
						log.Print(err)
						err = setTaskExecFail(db,id)
						if err != nil {
							log.Print(err)
						}
						log.Printf("任务:%s执行时出错",name)
					} else {
						res = nil
						log.Println(res)
						
						if taskType == 1 {
							err = setTaskExecSuccess(db,id)
							if err != nil {
								log.Print(err)
							}
							log.Printf("任务:%s执行完成",name)
						} else {
							err = setTaskNextExecTime(db,id,separateTime)
							if err != nil {
								log.Print(err)
							}
						}
						log.Printf("任务:%s执行成功",name)
					}
				} else if (fnType == "bash") {
					log.Printf("这是一个bash任务")
				} else if (fnType == "python") {
					log.Printf("这是一个python任务")
				} else if (fnType == "email") {
					//发送email任务
					err = ExecEmailTask(db)
					if err != nil {
						handleFail(db,id)
						log.Println(err)
					} else {
						handleSuccess(db,id)
					}
					log.Printf("发送邮件任务")
					setTaskExecSuccess(db,id)
					setTaskNextExecTime(db,separateTime)
				} else if (fnType == "sms") {
					//发送短信任务
					log.Printf("发送短信任务")
				}
				
			}
		}

		err = rows.Err()
		if err != nil {
			log.Print(err)
		}
	}
	log.Println("结束执行任务....")
}

func setTaskExecFail(db sql.DB,taskId string) error {
	res,err := db.Exec("UPDATE tasks set status = -2 WHERE id = $1::uuid",taskId)
	err = nil
	log.Println(res)
	return err
}

func setTaskExecSuccess(db sql.DB,err := db.Exec("UPDATE tasks set status = 2 WHERE id = $1::uuid",taskId)
	err = nil
	log.Println(res)
	return err
}

func setTaskNextExecTime(db sql.DB,nextTime)
	res = nil
	log.Println(res)
	return err;
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读