路漫漫其修远兮
吾将上下而求索

go学习:分布式任务调度crontab

慕课视频
go远程调试
https://blog.csdn.net/fkewksai/article/details/83062157
https://coding.imooc.com/learn/list/281.html




1-1
知识铺垫
1、执行shell命令
2、cron表达式
3、etcd协调服务
4、mongodb分布式
实战
1、分布式crontab架构
2、master内部设计
3、worker内部设计

环境
windows10
golang 1.10
mongodb 4.0.0
etcd 3.3.8
centos 7.1部署
需要有一定的go基础




2-1 原理介绍
bash执行过程讲解




2-2 执行任务
windows需要下载,然后才能执行bash命令
https://cygwin.com/install.html

代码
package main

import (
    "os/exec"
    "fmt"
)

func main() {
    var (
        cmd *exec.Cmd
        err error
    )

    //cmd = exec.Command("/bin/bash", "-c", "echo 1;echo2;")

    cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "echo 1")

    err = cmd.Run()

    fmt.Println(err)
}




2-3 捕获任务输出
代码,output是字节类型,要转换为str类型
package main

import (
    "os/exec"
    "fmt"
)

func main() {
    var (
        cmd *exec.Cmd
        output []byte
        err error
    )

    // 生成Cmd
    //cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "/usr/bin/python xxx.py")
    cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "sleep 5; ls -l")

    // 执行了命令, 捕获了子进程的输出( pipe )
    if output, err = cmd.CombinedOutput(); err != nil {
        fmt.Println(err)
        return
    }

    // 打印子进程的输出
    fmt.Println(string(output))
}




2-4 强制结束任务
代码
package main

import (
    "os/exec"
    "context"
    "time"
    "fmt"
)

type result struct {
    err error
    output []byte
}

func main() {
    //  执行1个cmd, 让它在一个协程里去执行, 让它执行2秒: sleep 2; echo hello;
    // 1秒的时候, 我们杀死cmd
    var (
        ctx context.Context
        cancelFunc context.CancelFunc
        cmd *exec.Cmd
        resultChan chan *result
        res *result
    )

    // 创建了一个结果队列
    resultChan = make(chan *result, 1000)

    // context:   chan byte
    // cancelFunc:  close(chan byte)

    ctx, cancelFunc = context.WithCancel(context.TODO())

    go func() {
        var (
            output []byte
            err error
        )
        cmd = exec.CommandContext(ctx, "C:\\cygwin64\\bin\\bash.exe", "-c", "sleep 2;echo hello;")

        // 执行任务, 捕获输出
        output, err = cmd.CombinedOutput()

        // 把任务输出结果, 传给main协程
        resultChan <- &result{
            err: err,
            output: output,
        }
    }()

    // 继续往下走
    time.Sleep(1 * time.Second)

    // 取消上下文
    cancelFunc()

    // 在main协程里, 等待子协程的退出,并打印任务执行结果
    res = <- resultChan

    // 打印任务执行结果
    fmt.Println(res.err, string(res.output))
}




3-1 cron表达式原理

开源的cronexpr库
Parse():解析与校验cron表达式
Next():根据当前时间,计算下次调度时间




3-2 开源cron解析库
代码,最小时间单位是s,
package main

import (
    "github.com/gorhill/cronexpr"
    "fmt"
    "time"
)

func main() {
    var (
        expr *cronexpr.Expression
        err error
        now time.Time
        nextTime time.Time
    )

    // linux crontab
    // 秒粒度, 年配置(2018-2099)
    // 哪一分钟(0-59),哪小时(0-23),哪天(1-31),哪月(1-12),星期几(0-6)

    // 每隔5分钟执行1次
    if expr, err = cronexpr.Parse("*/5 * * * * * *"); err != nil {
        fmt.Println(err)
        return
    }

    // 0, 6, 12, 18, .. 48..

    // 当前时间
    now = time.Now()
    // 下次调度时间
    nextTime = expr.Next(now)

    // 等待这个定时器超时
    time.AfterFunc(nextTime.Sub(now), func() {
        fmt.Println("被调度了:", nextTime)
    })

    time.Sleep(5 * time.Second)
}




3-3 调度多个cron
代码
package main

import (
    "github.com/gorhill/cronexpr"
    "time"
    "fmt"
)

// 代表一个任务
type CronJob struct {
    expr *cronexpr.Expression
    nextTime time.Time  // expr.Next(now)
}

func main() {
    // 需要有1个调度协程, 它定时检查所有的Cron任务, 谁过期了就执行谁

    var (
        cronJob *CronJob
        expr *cronexpr.Expression
        now time.Time
        scheduleTable map[string]*CronJob   // key: 任务的名字,
    )

    scheduleTable = make(map[string]*CronJob)

    // 当前时间
    now = time.Now()

    // 1, 我们定义2个cronjob
    expr = cronexpr.MustParse("*/5 * * * * * *")
    cronJob = &CronJob{
        expr: expr,
        nextTime: expr.Next(now),
    }
    // 任务注册到调度表
    scheduleTable["job1"] = cronJob

    expr = cronexpr.MustParse("*/5 * * * * * *")
    cronJob = &CronJob{
        expr: expr,
        nextTime: expr.Next(now),
    }
    // 任务注册到调度表
    scheduleTable["job2"] = cronJob

    // 启动一个调度协程
    go func() {
        var (
            jobName string
            cronJob *CronJob
            now time.Time
        )
        // 定时检查一下任务调度表
        for {
            now = time.Now()

            for jobName, cronJob = range scheduleTable {
                // 判断是否过期
                if cronJob.nextTime.Before(now) || cronJob.nextTime.Equal(now) {
                    // 启动一个协程, 执行这个任务
                    go func(jobName string) {
                        fmt.Println("执行:", jobName)
                    }(jobName)

                    // 计算下一次调度时间
                    cronJob.nextTime = cronJob.expr.Next(now)
                    fmt.Println(jobName, "下次执行时间:", cronJob.nextTime)
                }
            }

            // 睡眠100毫秒
            select {
            case <- time.NewTimer(100 * time.Millisecond).C:    // 将在100毫秒可读,返回
            }
        }
    }()

    time.Sleep(100 * time.Second)
}




4-1 etcd功能和原理上
核心特性
1、将数据存储在集群中的高可用k-v存储
2、允许应用实时监听存储中的k-v变化
3、能够容忍单点故障,能够应对网络分区

raft是强一致的集群日志同步算法
etcd是一个分布式kv存储
etcd利用raft算法在集群中同步kv

quorum模型,集群需要2N+1个节点




4-2 etcd功能和原理

1、选举leader需要半数以上节点参与
2、节点commit日志最多的允许选举为leader
3、commit日志同样多,则term,index越大的允许选举为leader


重要特性
1、底层存储是按key有序排列的,可以顺序遍历
2、因为key有序,etcd支持按目录结构高效遍历
3、支持复杂事务,提供类似if...then...else...的事务
4、基于租约机制实现key的ttl过期

每个key的更改都会有提交版本,在etcd中单调递增
可以监听key的变化,通过watch机制




4-3 搭建和连接etcd
直接使用老师的依赖包,下载不方便
 
使用v3版本的api来操作
ETCDCTL_API=3 etcdctl put "name" "andy"
OK
ETCDCTL_API=3 etcdctl get "name"
name
andy
ETCDCTL_API=3 etcdctl del "name"
1
ETCDCTL_API=3 etcdctl get "name"

ETCDCTL_API=3 etcdctl get "/cron"  --prefix 查找以/cron开头的所有key

ETCDCTL_API=3 etcdctl watch "/cron" --prefix 监听以/cron开头的所有key事件



4-4 put写入kv
代码
package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        kv clientv3.KV
        putResp *clientv3.PutResponse
    )

    config = clientv3.Config{
        Endpoints: []string{"192.168.170.20:2379"}, // 集群列表
        DialTimeout: 5 * time.Second,
    }

    // 建立一个客户端
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    // 用于读写etcd的键值对
    kv = clientv3.NewKV(client)

    if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "bye", clientv3.WithPrevKV()); err != nil {
        fmt.Println(err)
    } else {
        fmt.Println("Revision:", putResp.Header.Revision)
        if putResp.PrevKv != nil {  // 打印hello
            fmt.Println("PrevValue:", string(putResp.PrevKv.Value))
        }
    }
}




4-5 get读取kv
学习方法:godoc.org里面有库的详细说明,以及常用的例子,看上面的是最好的
代码
// 用于读写etcd的键值对
    kv = clientv3.NewKV(client)

    if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1", /*clientv3.WithCountOnly()*/); err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(getResp.Kvs, getResp.Count)
    }




4-6 get目录下所有kv
代码 
// 用于读写etcd的键值对
    kv = clientv3.NewKV(client)

    // 写入另外一个Job
    kv.Put(context.TODO(), "/cron/jobs/job2", "{...}")

    // 读取/cron/jobs/为前缀的所有key
    if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
        fmt.Println(err)
    } else {    // 获取成功, 我们遍历所有的kvs
        fmt.Println(getResp.Kvs)
    }




4-7 delete删除key
代码
// 用于读写etcd的键值对
    kv = clientv3.NewKV(client)

    // 删除KV
    if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2)); err != nil {
        fmt.Println(err)
        return
    }

    // 被删除之前的value是什么
    if len(delResp.PrevKvs) != 0 {
        for _, kvpair = range delResp.PrevKvs {
            fmt.Println("删除了:", string(kvpair.Key), string(kvpair.Value))
        }
    }




4-8 lease租约实现kv过期
代码
package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        lease clientv3.Lease
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseId clientv3.LeaseID
        putResp *clientv3.PutResponse
        getResp *clientv3.GetResponse
        keepResp *clientv3.LeaseKeepAliveResponse
        keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
        kv clientv3.KV
    )

    config = clientv3.Config{
        Endpoints: []string{"192.168.170.20:2379"}, // 集群列表
        DialTimeout: 5 * time.Second,
    }

    // 建立一个客户端
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    // 申请一个lease(租约)
    lease = clientv3.NewLease(client)

    // 申请一个10秒的租约
    if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
        fmt.Println(err)
        return
    }

    // 拿到租约的ID
    leaseId = leaseGrantResp.ID

    // 5秒后会取消自动续租
    if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
        fmt.Println(err)
        return
    }

    // 处理续约应答的协程
    go func() {
        for {
            select {
            case keepResp = <- keepRespChan:
                if keepRespChan == nil {
                    fmt.Println("租约已经失效了")
                    goto END
                } else {    // 每秒会续租一次, 所以就会受到一次应答
                    fmt.Println("收到自动续租应答:", keepResp.ID)
                }
            }
        }
        END:
    }()

    // 获得kv API子集
    kv = clientv3.NewKV(client)

    // Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
    if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("写入成功:", putResp.Header.Revision)

    // 定时的看一下key过期了没有
    for {
        if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
            fmt.Println(err)
            return
        }
        if getResp.Count == 0 {
            fmt.Println("kv过期了")
            break
        }
        fmt.Println("还没过期:", getResp.Kvs)
        time.Sleep(2 * time.Second)
    }
}




4-9 watch监听目录变化

4-10 op取代get,put操作

4-11 事务txn实现分布式锁(上)

4-12 事务txn实现分布式锁 (下)






6-1 架构分析

6-2 master-worker整体架构




9-6 分布式部署到linux服务器(上)
1、配置要修改好
2、配置systemd文件
3、master通过nginx+keepalived代理和高可用




9-7 分布式部署到linux服务器(中)
9-8 分布式部署到linux服务器(下)
服务器ip:192.168.170.20
系统:centos7.2
需要首先安装软件:mongodb-server,etcd,安装好后启动服务

[root@master-20 ~]#go get github.com/owenliang/crontab/master
[root@master-20 /data/gopath/src/github.com/owenliang/crontab]#ll
total 28
drwxr-xr-x 2 root root 4096 Aug 28 23:08 common
-rw-r--r-- 1 root root  527 Aug 28 23:08 cron-master.service
-rw-r--r-- 1 root root  383 Aug 28 23:08 crontab.conf
-rw-r--r-- 1 root root  527 Aug 28 23:08 cron-worker.service
drwxr-xr-x 3 root root 4096 Aug 28 23:08 master
-rw-r--r-- 1 root root    9 Aug 28 23:08 README.md
drwxr-xr-x 3 root root 4096 Aug 28 23:08 worker

将etcd以及其他的依赖库放到gopath当中

[root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#ls
master.go  master.json  webroot
[root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#go build
[root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#ll -h
total 20M
-rwxr-xr-x 1 root root  20M Aug 28 23:15 main
-rw-r--r-- 1 root root 1.1K Aug 28 23:08 master.go
-rw-r--r-- 1 root root  661 Aug 28 23:08 master.json
drwxr-xr-x 2 root root 4.0K Aug 28 23:08 webroot

[root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#cd ../../worker/main/
[root@master-20 /data/gopath/src/github.com/owenliang/crontab/worker/main]#go build 
[root@master-20 /data/gopath/src/github.com/owenliang/crontab/worker/main]#ll -h
total 20M
-rwxr-xr-x 1 root root  20M Aug 28 23:15 main
-rw-r--r-- 1 root root 1.1K Aug 28 23:08 worker.go
-rw-r--r-- 1 root root  581 Aug 28 23:08 worker.json



[root@master-20 /data/gopath/src/github.com/owenliang/crontab]#mkdir -p /usr/local/crontab/{master,worker}
[root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cp -r master/main/{main,master.json,webroot} /usr/local/crontab/master/
[root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cp -r worker/main/{main,worker.json} /usr/local/crontab/worker/


修改配置文件为自己机器的ip地址
[root@master-20 /usr/local/crontab]#cat master/master.json 
{
  "API接口服务端口": "提供任务增删改查服务",
  "apiPort": 8070,

  "API接口读超时": "单位是毫秒",
  "apiReadTimeout": 5000,

  "API接口写超时": "单位是毫秒",
  "apiWriteTimeout": 5000,

  "etcd的集群列表": "配置多个, 避免单点故障",
  "etcdEndpoints": ["127.0.0.1:2379"],

  "etcd的连接超时": "单位毫秒",
  "etcdDialTimeout": 5000,

  "web页面根目录": "静态页面,前后端分离开发",
  "webroot": "./webroot",

  "mongodb地址": "采用mongodb URI",
  "mongodbUri": "mongodb://127.0.0.1:27017",

  "mongodb连接超时时间": "单位毫秒",
  "mongodbConnectTimeout": 5000
}
[root@master-20 /usr/local/crontab]#cat worker/worker.json 
{
  "etcd的集群列表": "配置多个, 避免单点故障",
  "etcdEndpoints": ["127.0.0.1:2379"],

  "etcd的连接超时": "单位毫秒",
  "etcdDialTimeout": 5000,

  "mongodb地址": "采用mongodb URI",
  "mongodbUri": "mongodb://127.0.0.1:27017",

  "mongodb连接超时时间": "单位毫秒",
  "mongodbConnectTimeout": 5000,

  "日志批次大小": "为了减少mongodb网络往返, 打包成一批写入",
  "jobLogBatchSize": 100,

  "日志自动提交超时": "在批次未达到阀值之前, 超时会自动提交batch",
  "jobLogCommitTimeout": 1000
}



修改system文件中路径
[root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cat cron-master.service 
[Unit]
# 服务描述
Description=crontab master
# 要求必须执行网络
Requires=network-online.target
# 在网络启动之后启动
After=network-online.target

[Service]
# 简单服务
Type=simple
# 运行用户与用户组
User=work
Group=work
# 进程退出立即重启
Restart=always
# 执行命令
ExecStart=/usr/local/crontab/master/main -config=/usr/local/crontab/master/master.json
# 进程工作目录
WorkingDirectory=/usr/local/crontab/master

[Install]
# 在系统启动后加载UNIT
WantedBy=multi-user.target
[root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cat cron-worker.service 
[Unit]
# 服务描述
Description=crontab worker
# 要求必须执行网络
Requires=network-online.target
# 在网络启动之后启动
After=network-online.target

[Service]
# 简单服务
Type=simple
# 运行用户与用户组
User=work
Group=work
# 进程退出立即重启
Restart=always
# 执行命令
ExecStart=/usr/local/crontab/worker/main -config=/usr/local/crontab/worker/worker.json
# 进程工作目录
WorkingDirectory=/usr/local/crontab/worker

[Install]
# 在系统启动后加载UNIT
WantedBy=multi-user.target


添加用户
[root@master-20 ~]#useradd worker


启动服务
[root@master-20 ~]#systemctl daemon-reload && systemctl enable cron-master && systemctl restart cron-master
Created symlink from /etc/systemd/system/multi-user.target.wants/cron-master.service to /etc/systemd/system/cron-master.service.
[root@master-20 ~]#systemctl daemon-reload && systemctl enable cron-worker && systemctl restart cron-worker
Created symlink from /etc/systemd/system/multi-user.target.wants/cron-worker.service to /etc/systemd/system/cron-worker.service.


[root@master-20 ~]#systemctl status cron-master
● cron-master.service - crontab master
   Loaded: loaded (/etc/systemd/system/cron-master.service; enabled; vendor preset: disabled)
   Active: active (running) since Wed 2019-08-28 23:32:02 UTC; 7s ago
 Main PID: 54072 (main)
   Memory: 6.4M
   CGroup: /system.slice/cron-master.service
           └─54072 /usr/local/crontab/master/main -config=/usr/local/crontab/master/master.json

Aug 28 23:32:02 master-20 systemd[1]: Started crontab master.
Aug 28 23:32:02 master-20 systemd[1]: Starting crontab master...


访问网页:http://192.168.170.20:8070/

高可用查看nginx配置文件

未经允许不得转载:江哥架构师笔记 » go学习:分布式任务调度crontab

分享到:更多 ()

评论 抢沙发

评论前必须登录!