喵♂呜 的博客

一个刚毕业就当爹的程序猿 正在迷雾中寻找道路...

通过 Channel 实现 Goroutine Pool

最近用到了 Go 从 Excel 导数据到服务器内部 用的是 http 请求
但是发现一个问题 从文件读取之后 新开 Goroutine 会无限制新增
导致全部卡在初始化请求 于是乎就卡死了

问题模拟

  • 模拟代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
pool := sync.WaitGroup{}
for i := 0; i < 500; i++ {
pool.Add(1)
go func(i int) {
resp, err := http.Get("http://ip.3322.org")
if err != nil {
fmt.Println(i, err)
} else {
defer resp.Body.Close()
result, _ := ioutil.ReadAll(resp.Body)
fmt.Println(i, string(result))
}
pool.Done()
}(i)
}
pool.Wait()
}
  • 数量小的情况下 没有问题 但是数量比较大的情况 就会发现程序直接卡死 一段时间之后报错 并且没有发出任何请求

问题解决

  • 实际上看的出来 是应为同时发起了太多的HTTP请求 导致系统卡死 数据没有发送
  • 想到我在Java中用Thread提交请求 我就考虑 可不可限制 Goroutine 的数量
  • 使用强大的百度 果然找到了大佬已经写好的协程池
  • 代码如下 我加上了注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package gopool

import (
"sync"
)

// Pool Goroutine Pool
type Pool struct {
queue chan int
wg *sync.WaitGroup
}

// New 新建一个协程池
func New(size int) *Pool {
if size <= 0 {
size = 1
}
return &Pool{
queue: make(chan int, size),
wg: &sync.WaitGroup{},
}
}

// Add 新增一个执行
func (p *Pool) Add(delta int) {
// delta为正数就添加
for i := 0; i < delta; i++ {
p.queue <- 1
}
// delta为负数就减少
for i := 0; i > delta; i-- {
<-p.queue
}
p.wg.Add(delta)
}

// Done 执行完成减一
func (p *Pool) Done() {
<-p.queue
p.wg.Done()
}

// Wait 等待Goroutine执行完毕
func (p *Pool) Wait() {
p.wg.Wait()
}
  • 然后修改刚才的测试方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"io/ioutil"
"log"
"net/http"
"yumc.pw/cloud/lib/gopool"
)

func main() {
// 这里限制5个并发
pool := gopool.New(5)// sync.WaitGroup{}
for i := 0; i < 500; i++ {
pool.Add(1)
go func(i int) {
resp, err := http.Get("http://ip.3322.org")
if err != nil {
fmt.Println(i, err)
} else {
defer resp.Body.Close()
result, _ := ioutil.ReadAll(resp.Body)
fmt.Println(i, string(result))
}
pool.Done()
}(i)
}
pool.Wait()
}
  • 完美解决

欢迎关注我的其它发布渠道