–
慕课视频 go远程调试 https://blog.csdn.net/fkewksai/article/details/83062157 https://coding.imooc.com/learn/list/281.html 1-1 知识铺垫 1、执行shell命令 2、cron表达式 3、etcd协调服务 4、mongodb分布式 实战 1、分布式crontab架构 2、master内部设计 3、worker内部设计 环境 windows10 golang 1.10 mongodb 4.0.0 etcd 3.3.8 centos 7.1部署 需要有一定的go基础 2-1 原理介绍 bash执行过程讲解 2-2 执行任务 windows需要下载,然后才能执行bash命令 https://cygwin.com/install.html 代码 package main import ( "os/exec" "fmt" ) func main() { var ( cmd *exec.Cmd err error ) //cmd = exec.Command("/bin/bash", "-c", "echo 1;echo2;") cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "echo 1") err = cmd.Run() fmt.Println(err) } 2-3 捕获任务输出 代码,output是字节类型,要转换为str类型 package main import ( "os/exec" "fmt" ) func main() { var ( cmd *exec.Cmd output []byte err error ) // 生成Cmd //cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "/usr/bin/python xxx.py") cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "sleep 5; ls -l") // 执行了命令, 捕获了子进程的输出( pipe ) if output, err = cmd.CombinedOutput(); err != nil { fmt.Println(err) return } // 打印子进程的输出 fmt.Println(string(output)) } 2-4 强制结束任务 代码 package main import ( "os/exec" "context" "time" "fmt" ) type result struct { err error output []byte } func main() { // 执行1个cmd, 让它在一个协程里去执行, 让它执行2秒: sleep 2; echo hello; // 1秒的时候, 我们杀死cmd var ( ctx context.Context cancelFunc context.CancelFunc cmd *exec.Cmd resultChan chan *result res *result ) // 创建了一个结果队列 resultChan = make(chan *result, 1000) // context: chan byte // cancelFunc: close(chan byte) ctx, cancelFunc = context.WithCancel(context.TODO()) go func() { var ( output []byte err error ) cmd = exec.CommandContext(ctx, "C:\\cygwin64\\bin\\bash.exe", "-c", "sleep 2;echo hello;") // 执行任务, 捕获输出 output, err = cmd.CombinedOutput() // 把任务输出结果, 传给main协程 resultChan <- &result{ err: err, output: output, } }() // 继续往下走 time.Sleep(1 * time.Second) // 取消上下文 cancelFunc() // 在main协程里, 等待子协程的退出,并打印任务执行结果 res = <- resultChan // 打印任务执行结果 fmt.Println(res.err, string(res.output)) } 3-1 cron表达式原理 开源的cronexpr库 Parse():解析与校验cron表达式 Next():根据当前时间,计算下次调度时间 3-2 开源cron解析库 代码,最小时间单位是s, package main import ( "github.com/gorhill/cronexpr" "fmt" "time" ) func main() { var ( expr *cronexpr.Expression err error now time.Time nextTime time.Time ) // linux crontab // 秒粒度, 年配置(2018-2099) // 哪一分钟(0-59),哪小时(0-23),哪天(1-31),哪月(1-12),星期几(0-6) // 每隔5分钟执行1次 if expr, err = cronexpr.Parse("*/5 * * * * * *"); err != nil { fmt.Println(err) return } // 0, 6, 12, 18, .. 48.. // 当前时间 now = time.Now() // 下次调度时间 nextTime = expr.Next(now) // 等待这个定时器超时 time.AfterFunc(nextTime.Sub(now), func() { fmt.Println("被调度了:", nextTime) }) time.Sleep(5 * time.Second) } 3-3 调度多个cron 代码 package main import ( "github.com/gorhill/cronexpr" "time" "fmt" ) // 代表一个任务 type CronJob struct { expr *cronexpr.Expression nextTime time.Time // expr.Next(now) } func main() { // 需要有1个调度协程, 它定时检查所有的Cron任务, 谁过期了就执行谁 var ( cronJob *CronJob expr *cronexpr.Expression now time.Time scheduleTable map[string]*CronJob // key: 任务的名字, ) scheduleTable = make(map[string]*CronJob) // 当前时间 now = time.Now() // 1, 我们定义2个cronjob expr = cronexpr.MustParse("*/5 * * * * * *") cronJob = &CronJob{ expr: expr, nextTime: expr.Next(now), } // 任务注册到调度表 scheduleTable["job1"] = cronJob expr = cronexpr.MustParse("*/5 * * * * * *") cronJob = &CronJob{ expr: expr, nextTime: expr.Next(now), } // 任务注册到调度表 scheduleTable["job2"] = cronJob // 启动一个调度协程 go func() { var ( jobName string cronJob *CronJob now time.Time ) // 定时检查一下任务调度表 for { now = time.Now() for jobName, cronJob = range scheduleTable { // 判断是否过期 if cronJob.nextTime.Before(now) || cronJob.nextTime.Equal(now) { // 启动一个协程, 执行这个任务 go func(jobName string) { fmt.Println("执行:", jobName) }(jobName) // 计算下一次调度时间 cronJob.nextTime = cronJob.expr.Next(now) fmt.Println(jobName, "下次执行时间:", cronJob.nextTime) } } // 睡眠100毫秒 select { case <- time.NewTimer(100 * time.Millisecond).C: // 将在100毫秒可读,返回 } } }() time.Sleep(100 * time.Second) } 4-1 etcd功能和原理上 核心特性 1、将数据存储在集群中的高可用k-v存储 2、允许应用实时监听存储中的k-v变化 3、能够容忍单点故障,能够应对网络分区 raft是强一致的集群日志同步算法 etcd是一个分布式kv存储 etcd利用raft算法在集群中同步kv quorum模型,集群需要2N+1个节点 4-2 etcd功能和原理 1、选举leader需要半数以上节点参与 2、节点commit日志最多的允许选举为leader 3、commit日志同样多,则term,index越大的允许选举为leader 重要特性 1、底层存储是按key有序排列的,可以顺序遍历 2、因为key有序,etcd支持按目录结构高效遍历 3、支持复杂事务,提供类似if...then...else...的事务 4、基于租约机制实现key的ttl过期 每个key的更改都会有提交版本,在etcd中单调递增 可以监听key的变化,通过watch机制 4-3 搭建和连接etcd 直接使用老师的依赖包,下载不方便 使用v3版本的api来操作 ETCDCTL_API=3 etcdctl put "name" "andy" OK ETCDCTL_API=3 etcdctl get "name" name andy ETCDCTL_API=3 etcdctl del "name" 1 ETCDCTL_API=3 etcdctl get "name" ETCDCTL_API=3 etcdctl get "/cron" --prefix 查找以/cron开头的所有key ETCDCTL_API=3 etcdctl watch "/cron" --prefix 监听以/cron开头的所有key事件 4-4 put写入kv 代码 package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { var ( config clientv3.Config client *clientv3.Client err error kv clientv3.KV putResp *clientv3.PutResponse ) config = clientv3.Config{ Endpoints: []string{"192.168.170.20:2379"}, // 集群列表 DialTimeout: 5 * time.Second, } // 建立一个客户端 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // 用于读写etcd的键值对 kv = clientv3.NewKV(client) if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "bye", clientv3.WithPrevKV()); err != nil { fmt.Println(err) } else { fmt.Println("Revision:", putResp.Header.Revision) if putResp.PrevKv != nil { // 打印hello fmt.Println("PrevValue:", string(putResp.PrevKv.Value)) } } } 4-5 get读取kv 学习方法:godoc.org里面有库的详细说明,以及常用的例子,看上面的是最好的 代码 // 用于读写etcd的键值对 kv = clientv3.NewKV(client) if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1", /*clientv3.WithCountOnly()*/); err != nil { fmt.Println(err) } else { fmt.Println(getResp.Kvs, getResp.Count) } 4-6 get目录下所有kv 代码 // 用于读写etcd的键值对 kv = clientv3.NewKV(client) // 写入另外一个Job kv.Put(context.TODO(), "/cron/jobs/job2", "{...}") // 读取/cron/jobs/为前缀的所有key if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil { fmt.Println(err) } else { // 获取成功, 我们遍历所有的kvs fmt.Println(getResp.Kvs) } 4-7 delete删除key 代码 // 用于读写etcd的键值对 kv = clientv3.NewKV(client) // 删除KV if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2)); err != nil { fmt.Println(err) return } // 被删除之前的value是什么 if len(delResp.PrevKvs) != 0 { for _, kvpair = range delResp.PrevKvs { fmt.Println("删除了:", string(kvpair.Key), string(kvpair.Value)) } } 4-8 lease租约实现kv过期 代码 package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { var ( config clientv3.Config client *clientv3.Client err error lease clientv3.Lease leaseGrantResp *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID putResp *clientv3.PutResponse getResp *clientv3.GetResponse keepResp *clientv3.LeaseKeepAliveResponse keepRespChan <-chan *clientv3.LeaseKeepAliveResponse kv clientv3.KV ) config = clientv3.Config{ Endpoints: []string{"192.168.170.20:2379"}, // 集群列表 DialTimeout: 5 * time.Second, } // 建立一个客户端 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // 申请一个lease(租约) lease = clientv3.NewLease(client) // 申请一个10秒的租约 if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil { fmt.Println(err) return } // 拿到租约的ID leaseId = leaseGrantResp.ID // 5秒后会取消自动续租 if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil { fmt.Println(err) return } // 处理续约应答的协程 go func() { for { select { case keepResp = <- keepRespChan: if keepRespChan == nil { fmt.Println("租约已经失效了") goto END } else { // 每秒会续租一次, 所以就会受到一次应答 fmt.Println("收到自动续租应答:", keepResp.ID) } } } END: }() // 获得kv API子集 kv = clientv3.NewKV(client) // Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期 if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil { fmt.Println(err) return } fmt.Println("写入成功:", putResp.Header.Revision) // 定时的看一下key过期了没有 for { if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil { fmt.Println(err) return } if getResp.Count == 0 { fmt.Println("kv过期了") break } fmt.Println("还没过期:", getResp.Kvs) time.Sleep(2 * time.Second) } } 4-9 watch监听目录变化 4-10 op取代get,put操作 4-11 事务txn实现分布式锁(上) 4-12 事务txn实现分布式锁 (下) 6-1 架构分析 6-2 master-worker整体架构 9-6 分布式部署到linux服务器(上) 1、配置要修改好 2、配置systemd文件 3、master通过nginx+keepalived代理和高可用 9-7 分布式部署到linux服务器(中) 9-8 分布式部署到linux服务器(下) 服务器ip:192.168.170.20 系统:centos7.2 需要首先安装软件:mongodb-server,etcd,安装好后启动服务 [root@master-20 ~]#go get github.com/owenliang/crontab/master [root@master-20 /data/gopath/src/github.com/owenliang/crontab]#ll total 28 drwxr-xr-x 2 root root 4096 Aug 28 23:08 common -rw-r--r-- 1 root root 527 Aug 28 23:08 cron-master.service -rw-r--r-- 1 root root 383 Aug 28 23:08 crontab.conf -rw-r--r-- 1 root root 527 Aug 28 23:08 cron-worker.service drwxr-xr-x 3 root root 4096 Aug 28 23:08 master -rw-r--r-- 1 root root 9 Aug 28 23:08 README.md drwxr-xr-x 3 root root 4096 Aug 28 23:08 worker 将etcd以及其他的依赖库放到gopath当中 [root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#ls master.go master.json webroot [root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#go build [root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#ll -h total 20M -rwxr-xr-x 1 root root 20M Aug 28 23:15 main -rw-r--r-- 1 root root 1.1K Aug 28 23:08 master.go -rw-r--r-- 1 root root 661 Aug 28 23:08 master.json drwxr-xr-x 2 root root 4.0K Aug 28 23:08 webroot [root@master-20 /data/gopath/src/github.com/owenliang/crontab/master/main]#cd ../../worker/main/ [root@master-20 /data/gopath/src/github.com/owenliang/crontab/worker/main]#go build [root@master-20 /data/gopath/src/github.com/owenliang/crontab/worker/main]#ll -h total 20M -rwxr-xr-x 1 root root 20M Aug 28 23:15 main -rw-r--r-- 1 root root 1.1K Aug 28 23:08 worker.go -rw-r--r-- 1 root root 581 Aug 28 23:08 worker.json [root@master-20 /data/gopath/src/github.com/owenliang/crontab]#mkdir -p /usr/local/crontab/{master,worker} [root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cp -r master/main/{main,master.json,webroot} /usr/local/crontab/master/ [root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cp -r worker/main/{main,worker.json} /usr/local/crontab/worker/ 修改配置文件为自己机器的ip地址 [root@master-20 /usr/local/crontab]#cat master/master.json { "API接口服务端口": "提供任务增删改查服务", "apiPort": 8070, "API接口读超时": "单位是毫秒", "apiReadTimeout": 5000, "API接口写超时": "单位是毫秒", "apiWriteTimeout": 5000, "etcd的集群列表": "配置多个, 避免单点故障", "etcdEndpoints": ["127.0.0.1:2379"], "etcd的连接超时": "单位毫秒", "etcdDialTimeout": 5000, "web页面根目录": "静态页面,前后端分离开发", "webroot": "./webroot", "mongodb地址": "采用mongodb URI", "mongodbUri": "mongodb://127.0.0.1:27017", "mongodb连接超时时间": "单位毫秒", "mongodbConnectTimeout": 5000 } [root@master-20 /usr/local/crontab]#cat worker/worker.json { "etcd的集群列表": "配置多个, 避免单点故障", "etcdEndpoints": ["127.0.0.1:2379"], "etcd的连接超时": "单位毫秒", "etcdDialTimeout": 5000, "mongodb地址": "采用mongodb URI", "mongodbUri": "mongodb://127.0.0.1:27017", "mongodb连接超时时间": "单位毫秒", "mongodbConnectTimeout": 5000, "日志批次大小": "为了减少mongodb网络往返, 打包成一批写入", "jobLogBatchSize": 100, "日志自动提交超时": "在批次未达到阀值之前, 超时会自动提交batch", "jobLogCommitTimeout": 1000 } 修改system文件中路径 [root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cat cron-master.service [Unit] # 服务描述 Description=crontab master # 要求必须执行网络 Requires=network-online.target # 在网络启动之后启动 After=network-online.target [Service] # 简单服务 Type=simple # 运行用户与用户组 User=work Group=work # 进程退出立即重启 Restart=always # 执行命令 ExecStart=/usr/local/crontab/master/main -config=/usr/local/crontab/master/master.json # 进程工作目录 WorkingDirectory=/usr/local/crontab/master [Install] # 在系统启动后加载UNIT WantedBy=multi-user.target [root@master-20 /data/gopath/src/github.com/owenliang/crontab]#cat cron-worker.service [Unit] # 服务描述 Description=crontab worker # 要求必须执行网络 Requires=network-online.target # 在网络启动之后启动 After=network-online.target [Service] # 简单服务 Type=simple # 运行用户与用户组 User=work Group=work # 进程退出立即重启 Restart=always # 执行命令 ExecStart=/usr/local/crontab/worker/main -config=/usr/local/crontab/worker/worker.json # 进程工作目录 WorkingDirectory=/usr/local/crontab/worker [Install] # 在系统启动后加载UNIT WantedBy=multi-user.target 添加用户 [root@master-20 ~]#useradd worker 启动服务 [root@master-20 ~]#systemctl daemon-reload && systemctl enable cron-master && systemctl restart cron-master Created symlink from /etc/systemd/system/multi-user.target.wants/cron-master.service to /etc/systemd/system/cron-master.service. [root@master-20 ~]#systemctl daemon-reload && systemctl enable cron-worker && systemctl restart cron-worker Created symlink from /etc/systemd/system/multi-user.target.wants/cron-worker.service to /etc/systemd/system/cron-worker.service. [root@master-20 ~]#systemctl status cron-master ● cron-master.service - crontab master Loaded: loaded (/etc/systemd/system/cron-master.service; enabled; vendor preset: disabled) Active: active (running) since Wed 2019-08-28 23:32:02 UTC; 7s ago Main PID: 54072 (main) Memory: 6.4M CGroup: /system.slice/cron-master.service └─54072 /usr/local/crontab/master/main -config=/usr/local/crontab/master/master.json Aug 28 23:32:02 master-20 systemd[1]: Started crontab master. Aug 28 23:32:02 master-20 systemd[1]: Starting crontab master... 访问网页:http://192.168.170.20:8070/ 高可用查看nginx配置文件
–
–
评论前必须登录!
注册