Go語言學習之12 etcd、contex、kafka消費實例、logagent

本節內容:
    1. etcd介紹與使用
    2. ElastcSearch介紹與使用node

1. etcd介紹與使用
    概念:高可用的分佈式key-value存儲,能夠使用配置共享和服務發現
    相似項目:zookeeper和consul
    開發語言:Go
    接口:提供restful的http接口,使用簡單
    實現算法:基於raft算法的強一致性、高可用的服務存儲目錄nginx

2. etcd的應用場景
    a. 服務發現和服務註冊
    b. 配置中心
    c. 分佈式存儲
    d. master選舉git

3. etcd搭建
    a. 下載etcd release版本:https://github.com/etcd-io/etcd/releases 版本
    b. 解壓後,進入到etcd的根目錄,直接執行./etcd 能夠啓動etcd
    c. 使用etcdctl工具更改配置github

4. context使用介紹
    a. 如何控制goroutine
    b. 如何保存上下文數據算法

 (1)使用context處理超時
    ctx, cancel = context.With.Timeout(context.Background(), 2*time.Second)restful

    示例是經過設置ctx超時時間爲2s,若是2s類沒法接收到baidu的請求返回,則超時異常。session

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "io/ioutil"
 7     "net/http"
 8     "time"
 9 )
10 type Result struct {
11     r   *http.Response
12     err error
13 }
14 
15 func process() {
16     ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
17     defer cancel()
18     tr := &http.Transport{}
19     client := &http.Client{Transport: tr}
20     c := make(chan Result, 1)
21     req, err := http.NewRequest("GET", "http://www.baidu.com", nil)
22     if err != nil {
23         fmt.Println("http request failed, err:", err)
24         return
25     }
26     go func() {
27         resp, err := client.Do(req)
28         pack := Result{r: resp, err: err}
29         c <- pack
30     }()
31     select {
32     case <-ctx.Done():
33         tr.CancelRequest(req)
34         res := <-c
35         fmt.Println("Timeout! err:", res.err)
36     case res := <-c:
37         defer res.r.Body.Close()
38         out, _ := ioutil.ReadAll(res.r.Body)
39         fmt.Printf("Server Response: %s", out)
40     }
41     return
42 }
43 func main() {
44     process()
45 }
ctx_timeout

(2) 使用context保存上下文
    利用context來保存上下文值:app

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6 )
 7 
 8 func process(ctx context.Context) {
 9     ret,ok := ctx.Value("trace_id").(int)
10     if !ok {
11         ret = 21342423
12     }
13 
14     fmt.Printf("ret:%d\n", ret)
15 
16     s , _ := ctx.Value("session").(string)
17     fmt.Printf("session:%s\n", s)
18 }
19 
20 func main() {
21     ctx := context.WithValue(context.Background(), "trace_id", 13483434)
22     ctx = context.WithValue(ctx, "session", "sdlkfjkaslfsalfsafjalskfj")
23     process(ctx)
24 }
ctx_value

    同時還有context ctx_cancel 和 ctx_deadlinejvm

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "time"
 7 )
 8 
 9 func gen(ctx context.Context) <-chan int {
10     dst := make(chan int)
11     n := 1
12     go func() {
13         for {
14             select {
15             case <-ctx.Done():  //當43行的test函數執行結束以後,執行defer cancel(),則會觸發該行
16                 fmt.Println("i exited")
17                 return // returning not to leak the goroutine
18             case dst <- n:
19                 n++
20             }
21         }
22     }()
23     return dst
24 }
25 
26 func test() {
27     // gen generates integers in a separate goroutine and
28     // sends them to the returned channel.
29     // The callers of gen need to cancel the context once
30     // they are done consuming generated integers not to leak
31     // the internal goroutine started by gen.
32     ctx, cancel := context.WithCancel(context.Background())
33     defer cancel() // cancel when we are finished consuming integers
34     intChan := gen(ctx)
35     for n := range intChan {
36         fmt.Println(n)
37         if n == 5 {
38             break
39         }
40     }
41 }
42 func main() {
43     test()
44     time.Sleep(time.Hour)
45 }
ctx_cancel
 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "time"
 7 )
 8 
 9 func main() {
10     d := time.Now().Add(50 * time.Millisecond)
11     ctx, cancel := context.WithDeadline(context.Background(), d)
12 
13     // Even though ctx will be expired, it is good practice to call its
14     // cancelation function in any case. Failure to do so may keep the
15     // context and its parent alive longer than necessary.
16     defer cancel()
17 
18     select {
19     case <-time.After(1 * time.Second):
20         fmt.Println("overslept")
21     case <-ctx.Done():
22         fmt.Println(ctx.Err())  //context deadline exceeded
23     }
24 
25 }
ctx_deadline

5. etcd介紹與使用
    etcd使用示例 (因爲虛擬機出現問題,後面的程序全在Windows上面操做):elasticsearch

(1)客戶端鏈接 etcd server端

 1 package main
 2 
 3 import (
 4     "fmt"
 5     //etcd_client "github.com/coreos/etcd/clientv3"
 6     etcd_client "go.etcd.io/etcd/clientv3"
 7     "time"
 8 )
 9 
10 func main() {
11 
12     cli, err := etcd_client.New(etcd_client.Config{
13         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
14         DialTimeout: 5 * time.Second,
15     })
16     if err != nil {
17         fmt.Println("connect failed, err:", err)
18         return
19     }
20 
21     fmt.Println("connect succ")
22     defer cli.Close()
23 }
etcd_conn

(2)put 和 get

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "go.etcd.io/etcd/clientv3"
 7     "time"
 8 )
 9 
10 func main() {
11 
12     cli, err := clientv3.New(clientv3.Config{
13         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
14         DialTimeout: 5 * time.Second,
15     })
16     if err != nil {
17         fmt.Println("connect failed, err:", err)
18         return
19     }
20 
21     fmt.Println("connect succ")
22     defer cli.Close()
23     //設置1秒超時,訪問etcd有超時控制
24     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
25     //操做etcd
26     _, err = cli.Put(ctx, "/logagent/conf/", "192.168.0.1")
27     //操做完畢,取消etcd
28     cancel()
29     if err != nil {
30         fmt.Println("put failed, err:", err)
31         return
32     }
33     //取值,設置超時爲1秒
34     ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
35     resp, err := cli.Get(ctx, "/logagent/conf/")
36     cancel()
37     if err != nil {
38         fmt.Println("get failed, err:", err)
39         return
40     }
41     for _, ev := range resp.Kvs {
42         fmt.Printf("%s : %s\n", ev.Key, ev.Value)
43     }
44 }
put_get

(3)watch(觀測key值發生變化)

 1 package main
 2 
 3 import (
 4     "context"
 5     "fmt"
 6     "time"
 7 
 8     etcd_client "go.etcd.io/etcd/clientv3"
 9 )
10 
11 func main() {
12 
13     cli, err := etcd_client.New(etcd_client.Config{
14         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
15         DialTimeout: 5 * time.Second,
16     })
17     if err != nil {
18         fmt.Println("connect failed, err:", err)
19         return
20     }
21     defer cli.Close()
22 
23     fmt.Println("connect succ")
24     
25     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
26     _, err = cli.Put(ctx, "/logagent/conf/", "99999")
27     if err != nil {
28         fmt.Println("put failed, err:", err)
29         return
30     }
31     cancel()
32     
33     fmt.Println("put succ")
34 
35     for {
36         rch := cli.Watch(context.Background(), "/logagent/conf/")
37         for wresp := range rch {
38             for _, ev := range wresp.Events {
39                 fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
40             }
41         }
42     }
43 }
watch

    運行上面的watch程序監控key(/logagent/conf/)操做的變化,而後再運行(2)的程序,結果以下:

    kafka消費示例代碼:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "strings"
 6     "sync"
 7 
 8     "github.com/Shopify/sarama"
 9 )
10 
11 var (
12     wg sync.WaitGroup
13 )
14 
15 func main() {
16 
17     consumer, err := sarama.NewConsumer(strings.Split("192.168:30.136:9092", ","), nil)
18     if err != nil {
19         fmt.Println("Failed to start consumer: %s", err)
20         return
21     }
22     partitionList, err := consumer.Partitions("nginx_log")
23     if err != nil {
24         fmt.Println("Failed to get the list of partitions: ", err)
25         return
26     }
27 
28     fmt.Println(partitionList)
29 
30     for partition := range partitionList {
31         pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
32         if err != nil {
33             fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
34             return
35         }
36         defer pc.AsyncClose()
37         
38         go func(pc sarama.PartitionConsumer) {
39             wg.Add(1)
40             for msg := range pc.Messages() {
41                 fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
42                 fmt.Println()
43             }
44             wg.Done()
45         }(pc)
46     }
47     //time.Sleep(time.Hour)
48     wg.Wait()
49     consumer.Close()
50 }
kafka消費示例代碼

6. sync.WaitGroup介紹
1)等待一組groutine結束
2)使用Add方法設置等待的數量加1
3)使用Delete方法設置等待的數量減1
4)當等待的數量等於0,Wait函數返回

sync.WaitGroup實例:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "sync"
 6     "time"
 7 )
 8 
 9 func main() {
10     wg := sync.WaitGroup{}
11     for i := 0; i < 10; i++ {
12         wg.Add(1)
13         go calc(&wg, i)
14     }
15 
16     wg.Wait() //阻塞,等待全部groutine結束
17     fmt.Println("all goroutine finish")
18 }
19 
20 func calc(w *sync.WaitGroup, i int) {
21     //注意: wg.Add(1) 放到這會有問題,也就是main函數結束比wg.Add(1)要快
22     fmt.Println("calc:", i)
23     time.Sleep(time.Second)
24     w.Done()
25 }
waitGroup示例

7. ElastcSearch安裝及go操做es

(1)安裝 es
   1)下載ES,下載地址:https://www.elastic.co/products/elasticsearch,我下載的是 elasticsearch-6.7.1.zip。
   2)修改在解壓後根目錄下的 /config/elasticsearch.yml 配置:

    放開註釋並將 youIP換成你本身機器的 ip

cluster.name: my-application
node.name: node-1
network.host: youIP
http.port: 9200

   3)修改 /config/jvm.options 文件,固然若是機器性能好也能夠不用修改:

-Xms512m
-Xmx512m

   4)進入根目錄,啓動es,.\bin\elasticsearch.bat

(2)go 操做 es 示例

    安裝第三方插件:

go get gopkg.in/olivere/elastic.v2

    示例:注意將程序裏面的 url = "http://yourIP:9200/",yourIP替換爲你安裝es機器的 ip:

 1 package main
 2 
 3 import (
 4     "fmt"
 5 
 6     elastic "gopkg.in/olivere/elastic.v2"
 7 )
 8 
 9 type Tweet struct {
10     User    string
11     Message string
12 }
13 
14 var (
15     url = "http://yourIP:9200/"
16 )
17 
18 func main() {
19     client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(url))
20     if err != nil {
21         fmt.Println("connect es error", err)
22         return
23     }
24 
25     fmt.Println("conn es succ")
26 
27     tweet := Tweet{User: "olivere", Message: "Take Five"}
28     _, err = client.Index().
29         Index("twitter").
30         Type("tweet").
31         Id("1").
32         BodyJson(tweet).
33         Do()
34     if err != nil {
35         // Handle error
36         panic(err)
37         return
38     }
39 
40     fmt.Println("insert succ")
41 }
es示例

    鏈式存儲:

 1 package main
 2 
 3 import "fmt"
 4 
 5 type Stu struct {
 6     Name string
 7     Age  int
 8 }
 9 
10 func (p *Stu) SetName(name string) *Stu {
11     p.Name = name
12     return p
13 }
14 
15 func (p *Stu) SetAge(age int) *Stu {
16     p.Age = age
17     return p
18 }
19 
20 func (p *Stu) Print() {
21     fmt.Printf("age:%d name:%s\n", p.Age, p.Name)
22 }
23 
24 func main() {
25     stu := &Stu{}
26     stu.SetAge(12).SetName("stu01").Print()
27     //stu.SetName("stu01")
28     //stu.Print()
29 }
鏈式存儲示例
相關文章
相關標籤/搜索