goroutine学习
Flow

学习参考自:

goroutine是什么

理解为go的一个机制,类似线程,如果是其他语言,程序员需要去包装,编排一个又一个的线程,这个过程也花费心力。

而goroutine中的任务可以被go智能合理地分配到各个GPU,也就是说程序员不需要再一个一个去维护,只需要把想要执行的任务定义成一个goroutine

Go 会自动把 goroutine 分配到多个核心上跑(由 GOMAXPROCS 控制),所以多核 CPU 下确实能“同时执行”。

tips:一个goroutine必定对应一个函数,可以创建多个goroutine执行同一个函数

启动单个goroutine

原来的代码,可以知道是串行的(一个一个来),会按顺序打印两个句子

1
2
3
4
5
6
7
8
func helloWorld() {
fmt.Println("Hello, World!")
}

func main() {
helloWorld()
fmt.Println("main goroutine done!")
}

现在把代码改成

1
2
3
4
func main() {
helloWorld()
fmt.Println("main goroutine done!")
}

这次执行指挥输出”main goroutine done!”

原因是:程序启动时,go程序会为main()创建一个默认的goroutine。go helloWorld()理解为它启动一个子 goroutine,但主 goroutine 立刻执行并结束,进程随之退出,子 goroutine 来不及打印。因此只看到 “main goroutine done!”。

如果非要看,那就需要使用sleep

1
2
3
4
5
func main() {
go helloWorld()
fmt.Println("main goroutine done!")
time.Sleep(1 * time.Second)
}

这次会看到先输出”main goroutine done!”再输出”Hello, World!”,因为在等待

多个goroutine

直接看下面的代码,注释解释的还是挺清晰的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 定义一个全局的 WaitGroup 变量wg,用于控制等待一组goroutine完成。
var wg sync.WaitGroup

func hello(i int) {
// defer语句表示在当前函数返回前执行 wg.Done()。
// wg.Done()会将WaitGroup的计数器减1,表示一个goroutine完成了。
defer wg.Done()
fmt.Println("Hello Goroutine!", i)
}
func main() {
// 启动10个goroutine,每个处理不同的i值。
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
// wg.Wait()会阻塞主协程,直到WaitGroup计数器归零(即所有goroutine都调用了wg.Done())。
wg.Wait()
}

运行打印会看到输出的数字不是按顺序的,因为10个goroutine是并发执行的,而goroutine的调度是随机的。

channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

go的channel(通道)当成一个特殊的类型,像一个传输通道或队列,遵循先入先出FIFO,可以保证数据收发的顺序,每一个通道都是一个具体类型的导管,所以定义一个channel需要写好对应的类型

声明channel

1
2
3
var 变量名 chan 元素类型
eg:
var ch1 chan int // 声明一个传递整型的通道

声明好chennel后需要make函数初始化创建channel后才能使用,写好类型,缓冲大小(可选)

1
make(chan 类型, [缓冲大小])

channel操作分为send,receive,close

1
2
3
4
5
ch := make(chan int) // 先定义
ch <- 10 // 把10发送到ch中(这个箭头好形象)
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
close(ch) // 关闭

关于关闭有几个要点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。
关于缓冲

这两个图片超形象,可以理解一下,我觉得区别就是通道允不允许同时有多个事情的区别

  • 无缓冲:发送与接收必须“同时”发生(同步点),常用于严格的交接。
  • 有缓冲:发送在缓冲未满时不会阻塞,接收在缓冲为空时阻塞。
select:在多个 channel 间等待

类似switch case,有多个case,只要其中有一个接收到信心,就选择执行然后结束

1
2
3
4
5
6
7
8
9
10
select {
case v := <-ch1:
fmt.Println("got", v)
case ch2 <- 42:
fmt.Println("sent 42")
case <-time.After(500 * time.Millisecond):
fmt.Println("timeout")
default:
fmt.Println("no op (non-blocking)")
}
  • select 会随机选择就绪的分支执行。
  • 配合 time.After 可做超时控制。
  • default 分支让 select 变为非阻塞操作(谨慎使用,避免忙轮询)。

并发安全和锁

有时候会出现多个goroutine同时操作一块资源的情况,比如下面的例子同时开两个goroutine去操作x的值,两个goroutine访问x的时候会存在数据竞争

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var x int64
var wg sync.WaitGroup

func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
互斥锁Mutex

使用互斥锁实现同一时间只有一个goroutine进入临界区,其他goroutine等待锁

当前goroutine解锁后,其他goroutine才会进入执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
读写互斥锁

前面介绍的互斥锁是完全互斥的,但是大多是时候是读多写少,有时候是并发读取一个资源,并不会修改所以没有必要加那么严格的锁,那么读写锁是更好的选择,sync包中的RWMutex类型

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)

func write() {
// lock.Lock() // 加互斥锁
rwlock.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
rwlock.Unlock() // 解写锁
// lock.Unlock() // 解互斥锁
wg.Done()
}

func read() {
// lock.Lock() // 加互斥锁
rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwlock.RUnlock() // 解读锁
// lock.Unlock() // 解互斥锁
wg.Done()
}

func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}

for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}

wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}

sync.WaitGroup

有时候不知道一个goroutine要执行多久才结束,又担心会和其他线程冲突,直接生硬地用time.sleep是不合理的

所以在go里面使用sync.WaitGroup来实现并发任务的同步

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

后面写个例子更加深入理解WaitGroup的作用

1
2
3
4
5
6
7
8
9
10
11
12
var wg sync.WaitGroup

func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}

go写爬虫

主要参考:https://drun1baby.top/2023/03/08/Golang-%E7%BC%96%E5%86%99%E7%88%AC%E8%99%AB

基础爬豆瓣top250的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main  

import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
"time")

func fetch(url string) string {
fmt.Println("Fetch Url", url)
client := &http.Client{}
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("User-Agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)")
// 发送请求
resp, err := client.Do(req) // 获取响应
if err != nil {
fmt.Println("Http get err:", err) // 打印错误
return ""
}
if resp.StatusCode != 200 {
fmt.Println("Http status code:", resp.StatusCode)
return ""
}
defer resp.Body.Close()
// 读取响应体
body, err := ioutil.ReadAll(resp.Body) // 读取响应体
if err != nil {
fmt.Println("Read error", err) // 打印错误
return ""
}
return string(body)
}

func parseUrls(url string) {
// 使用正则提取想要的信息
body := fetch(url)
body = strings.Replace(body, "\n", "", -1)
rp := regexp.MustCompile(`<div class="hd">(.*?)</div>`)
titleRe := regexp.MustCompile(`<span class="title">(.*?)</span>`)
idRe := regexp.MustCompile(`<a href="https://movie.douban.com/subject/(\d+)/"`)
items := rp.FindAllStringSubmatch(body, -1)
// 遍历打印
for _, item := range items {
fmt.Println(idRe.FindStringSubmatch(item[1])[1],
titleRe.FindStringSubmatch(item[1])[1])
}
}

func main() {
start := time.Now()
for i := 0; i < 10; i++ {
parseUrls("https://movie.douban.com/top250?start=" + strconv.Itoa(25*i)) // 传入基础url
}
elapsed := time.Since(start) // 计时
fmt.Printf("Took %s", elapsed)
}

目前这样写还是10个循环一个一个来,效率低,换成goroutine来实现并发

1
2
3
4
5
6
7
8
9
func main() {  
start := time.Now()
for i := 0; i < 10; i++ {
go parseUrls("https://movie.douban.com/top250?start=" + strconv.Itoa(25*i)) // 传入基础url
}
time.Sleep(time.Second * 4) // 等待4秒
elapsed := time.Since(start) // 计时
fmt.Printf("Took %s", elapsed)
}

除了在parseUrls调用前添加go,还要sleep等待一下,因为可能10个线程发送完main执行结束,但是还没收到结果

现在这样就可以看到10个请求并发发出,最终运行时间

1
Took 4.001206041s%    

但是这样就很蠢,处理不一样的任务你也不知道要sleep多久,sleep多了浪费,所以这里有两种方法

使用channel通信

用channel通信,10个通道都接受完信息才能结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func parseUrls(url string, ch chan bool) {
...
ch <- true
}

func main() {
start := time.Now()
ch := make(chan bool)
for i := 0; i < 10; i++ {
go parseUrls("https://movie.douban.com/top250?start="+strconv.Itoa(25*i), ch)
}

for i := 0; i < 10; i++ {
<-ch
}

elapsed := time.Since(start)
fmt.Printf("Took %s", elapsed)
}

parseUrl的签名也改了,多传递一个ch,for循环里面接受通信结果,10个都接受到了(相当于任务完成)再结束

运行结果快了很多

1
Took 367.481917ms%
使用sync.WaitGroup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {  
start := time.Now()
var wg sync.WaitGroup
wg.Add(10) // 添加要等待10个goroutine
for i := 0; i < 10; i++ {
go func() {
defer wg.Done() // 完成一个goroutine后,调用Done(),确保有10个goroutine完成
parseUrls("https://movie.douban.com/top250?start=" + strconv.Itoa(25*i))
}()
}
wg.Wait() // 等待所有goroutine完成
elapsed := time.Since(start) // 计时
fmt.Printf("Took %s", elapsed)
}

由于要执行 wg.Done 和 parseUrls2 件事,所以不能直接用 go 关键字,需要把语句包一下

所以WaitGroup 相当于是一个协程安全的并发计数器:调用 Add 增加计数,调用 Done 减少计数。调用 Wait 会阻塞并等待至计数器归零。这样也实现了并发和等待全部 goroutine 执行完成

大概就给自己实际记录这些知识,期待后面用go写东西的时候遇到新的知识点再运用

 评论
评论插件加载失败
正在加载评论插件
由 Hexo 驱动 & 主题 Keep