Go语言8-goroutine和channel
Goroutine
Go语言从语言层面上就支持了并发,这与其他语言大不一样。Go语言中有个概念叫做goroutine,这类似我们熟知的线程,但是更轻。
创新互联公司是网站建设技术企业,为成都企业提供专业的做网站、成都网站制作,网站设计,网站制作,网站改版等技术服务。拥有十载丰富建站经验和众多成功案例,为您定制适合企业的网站。十载品质,值得信赖!
进程、线程、协程
进程和线程
进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
一个进程可以创建和撤销多个线程,同一个进程中的多个线程之间可以并发执行。
所以程序的类型可以分为以下几种:
- 一个进程,它只有一个线程,就是单线程程序
- 一个进程,它又多个线程,就是多线程程序
- 一个进程,它可能还会fork多个子进程,就是多进程程序
并发和并行的区别
- 多线程程序在单核的cou上运行,这是并发(concurrency)。
- 多线程程序在多个核的cpu上运行(真正的同时运行),这才是并行(parallelism)。
并发,在微观上,任意时刻只有一个程序在运行。因为线程已经是CPU调度的最小单元,一个CPU一次只能处理一个线程。但是宏观上这些程序时同时在那里执行的,所以这个只是并发。
所以在python里,貌似讲的都是高并发,似乎没听过并行的概念。
协程和线程
协程,独一的栈空间,共享堆空间,调度由用户自己控制。本质上类似于用户级线程,这些用户级线程的调度也是自己实现的。
线程,一个线程上可以跑多个协程,协程是轻量级的线程。
goroutine 调度模型
Go的调度器模型:G-P-M模型。
- G代表goroutine,它通过go关键字调用runtime.newProc创建。
- P代表processer,可以理解为上下文。
- M表示machine,可以理解为操作系统的线程。
设置Golang运行的cpu核数
设置当前的程序运行在多少核上,下面的例子是获取CPU的核数,然后运行在所有核上:
package main
import (
"fmt"
"runtime"
)
func main() {
num := runtime.NumCPU()
runtim.GOMAXPROCS(num)
fmt.Println(num)
}
上面P的数目就是这里GOMAXPROCS设置的数目,通常设置为CPU核数。
1.8版本以上的Golang,是不需要做上面的设置的,默认就是运行在所有的核上。当然还是可以设置一下,比如限制只能使用多少核。
goroutine的示例:
package main
import (
"fmt"
"time"
)
func example() {
var i int
for {
fmt.Println(i)
i++
time.Sleep(time.Millisecond * 30)
}
}
func main() {
go example() // 起一个goroutine
var j int
for j > -100 {
fmt.Println(j)
j--
time.Sleep(time.Millisecond * 100)
}
fmt.Println("运行结束")
}
Channel
不同goroutine之间要进行通讯,有下面2种方法:
- 全局变量和锁同步
- Channel
先讲管道(channel),然后讲 goroutine 和 channel 结合的一些用法。
这篇的channel可以参考下:
https://www.jianshu.com/p/24ede9e90490
全局变量的实现示例
在下面的例子里定义了变量 m 来实现goroutine之间的通讯:
package main
import (
"fmt"
"time"
"sync"
)
var (
m = make(map[int]uint64)
lock sync.Mutex
)
type task struct {
n int
}
func calc(t *task) {
var res uint64
res = 1
for i := 1; i <= t.n; i++ {
res *= uint64(i)
}
lock.Lock()
m[t.n] = res // 变量m用来存放结果,这样主线程里就能拿到m的值,操作要加锁
lock.Unlock()
}
func main() {
for i := 0; i < 100; i++ {
t := &task{i}
go calc(t)
}
for j := 0; j < 10; j++ {
fmt.Printf("\r已运行%d秒", j)
time.Sleep(time.Second)
}
fmt.Println("\r运行完毕,输出结果:")
lock.Lock()
for k, v := range m {
if v != 0 {
fmt.Printf("%d! = %v\n", k, v)
}
}
lock.Unlock()
}
channel 概念
channel的概念如下:
- 类型Unix中的管道(Pipe)
- 先进先出
- 线程安全,多个goroutine同时访问,不需要加锁
- channel是有类型的,一个整数的channel只能存放整数
channel 声明
var 变量名 chan 类型
var test1 chan int
var test2 chan string
var tesr3 chan map[string]string
var test4 chan stu
var test5 chan *stu
只是声明还不够,使用前还要make,分配内存空间:
package main
import "fmt"
func main() {
var intChan chan int // 声明
intChan = make(chan int, 10) // 初始化,长度是10
intChan <- 10 // 存入管道
n := <- intChan // 取出
fmt.Println(n)
}
定义信号(空结构体)
有一些场景中,一些 goroutine 需要一直循环处理信息,直到收到 quit 信号。作为信号,只需要随便传点什么,并不关注具体的值。那么可以选择使用空结构体,像下面这样定义了2个信号:
msgCh := make(chan struct{})
quitCh := make(chan struct{})
// 传信号的方法
msgCh <- struct{}{} // 前面的 struct{} 是变量的类型,后面的 {} 则是做初始化传入空值生成实例
quitCh <- struct{}{}
通过channel实现通讯
起一个goroutine往管道里存,再起一个goroutine从管道里把数据取出:
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
var i int
for {
ch <- i
i ++
time.Sleep(time.Millisecond)
}
}
func read(ch chan int) {
for {
b := <- ch
fmt.Println(b)
}
}
func main() {
intChan := make(chan int, 10)
go write(intChan)
go read(intChan)
time.Sleep(time.Second * 5)
}
channel 的类型和阻塞
channel 分为不带缓存的 channel 和带缓存的 channel。
channel 一定要初始化后才能进行读写操作,否则会永久阻塞。这个不是这里要讲的重点,顺便带一下。
无缓存的channle
初始化make的时候不传入第二个参数设置容量就是:
ch := make(chan int)
从无缓存的 channel 中读取消息会阻塞,直到有 goroutine 向该 channel 中发送消息;同理,向无缓存的 channel 中发送消息也会阻塞,直到有 goroutine 从 channel 中读取消息。
有缓存的 channel
有缓存的 channel 的声明方式为指定 make 函数的第二个参数,该参数为 channel 缓存的容量:
ch := make(chan int, 10)
有缓存的 channel 类似一个阻塞队列(采用环形数组实现)。当缓存未满时,向 channel 中发送消息时不会阻塞,当缓存满时,发送操作将被阻塞,直到有其他 goroutine 从中读取消息;
相应的,当 channel 中消息不为空时,读取消息不会出现阻塞,当 channel 为空时,读取操作会造成阻塞,直到有 goroutine 向 channel 中写入消息。
缓冲区的大小
通过 len 函数可以获得 chan 中的元素个数,通过 cap 函数可以得到 channel 的缓冲区长度。
无缓存和缓冲区是1的差别
无缓存的 channel 的 len和cap 始终都是0。
通过无缓存的 channel 进行通信时,接收者收到数据 happens before 发送者 goroutine 唤醒
上面这句不好理解,不过可以先看下现象。
下面的这2行函数会报错,说是死锁。但是如果设置了 channel 的容量哪怕是1,就不会报错的:
func main() {
ch := make(chan int)
ch <- 1
}
虽然容量1的channel也只能存1个数,但是无缓冲区的channel似乎1个数都存不了,除非马上能取走:
func main() {
ch := make(chan int, 1)
// 要起一个goroutine可以马上接收channel里的数据
go func () {
fmt.Println(<- ch)
}()
ch <- 1
time.Sleep(time.Second) // 要给goroutine执行完成的时间
}
小结:无缓存的channel需要有一个goroutine可以把channel里的数据马上取走。
channel之间的同步
在学习关闭channel之前,先看下下面的例子。由于没有关闭channel,是会有问题的,不过例子里都解决了。先看下不用关闭channel可以怎么搞,然后再接着看关闭channel的用法:
package main
import (
"time"
"fmt"
)
func calc(taskChan chan int, resChan chan int) {
for v := range taskChan {
// 判断是不是素数
flag := true
for i := 2; i < v; i++ {
if v % i == 0 {
flag = false
break
}
}
if flag {
resChan <- v
}
}
}
func main() {
intChan := make(chan int, 1000)
// 这个也是个goroutine
go func(){
for i := 2; i < 100000; i++ {
intChan <- i
}
}() // 管道满了之后,这个匿名函数会阻塞,但是不影响程序继续往下走
resultChan := make(chan int, 1000)
// 同时起8个goroutine
for i := 0; i < 8; i++ {
go calc(intChan, resultChan)
}
// 再起一个取结果的goroutine,不阻塞主线程
go func(){
for v := range resultChan{
fmt.Println("素数:", v)
}
}()
// 给上面的匿名函数几秒钟来输出结果
time.Sleep(time.Second * 5)
}
上面的例子里用了2个匿名函数,也都是起的goroutine。如果是在主线程里直接for循环的话,那个for循环就会变成死锁,程序不会自己往下走。所以运行在goroutine里的死循环,在主线程退出后也就结束了,不会有问题。后一个匿名函数是对channel的进行遍历,channel取空后,会进入阻塞,如果是运行在主线程里的话也会形成死锁。
range 遍历
channel 也可以使用 range 取值,并且会一直从 channel 中读取数据,直到有 goroutine 对改 channel 执行 close 操作,循环才会结束。
关闭 channel
golang 提供了内置的 close 函数对 channel 进行关闭操作:
ch := make(chan int)
close(ch)
关于 channel 的关闭,有以下的特点:
- 关闭一个未初始化(nil) 的 channel 会产生 panic
- 重复关闭同一个 channel 会产生 panic
- 向一个已关闭的 channel 中发送消息会产生 panic
- 可以从已关闭的 channel 里继续读取消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息不会阻塞,并且会返回一个为 false 的 ok-idiom,可以用它来判断 channel 是否关闭
- 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息
有2种方式可以把管道里的数据都取出来,但是都需要把管道关闭:
- 判断管道已关闭并且取完了
- 遍历管道
关闭channel然后读取的示例:
package main
import "fmt"
func main() {
var ch chan int
ch = make(chan int, 5)
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
for {
var b int
b, ok := <- ch
fmt.Println(b, ok)
if ok == false {
break
}
}
}
/* 执行结果
PS H:\Go\src\go_dev\day8\channel\close_chan> go run main.go
0 true
1 true
2 true
3 true
4 true
0 false
PS H:\Go\src\go_dev\day8\channel\close_chan>
*/
上面输出的最后一条,就是channel已经空了,读出来的就是类型的0值,并且ok变false了。
遍历channel的示例:
package main
import "fmt"
func main() {
var ch chan int
ch = make(chan int) // 这个管道没有无缓存
// 这个goroutine一次存一个,再存会阻塞,直到主线程后面的for循环遍历的时候取走数据
// 存完100个数后,这里的for循环结束,会关闭管道。主线程后面的for循环的遍历就能正常结束了
go func () {
for i := 0; i < 100; i++ {
ch <- i
}
close(ch)
}()
for v := range ch {
fmt.Println(v)
}
}
判断子线程结束
学到这里,再也不需要用Sleep等待子线程结束了,可以通过管道实现。可以单独定义一个专门用来判断子线程结束的管道。子线程完成任务后,就传个值给管道,主线程就阻塞的读管道里的信息,一旦读到信息,就说明子线程完成了,可以继续执行或者退出了。如果起了多个子线程,则主线程就是用for循环多读几次,就能判断出有多少子线程已经结束了。
channel 只读、只写
声明只读的channel:
var ch <-chan int
声明只写的channel:
var ch chan<- int
应用场景,管道需要能够可读可写。但是可以限制它在某个函数里的功能,也就是在定义函数的参数的时候,把管道的类型设置为只读或只写。或者把管道传给结构体,结构体里限制管道的读写限制?
下面是之前的一个例子,仅仅只是把2个函数在设置参数类型的时候把管道的读写限制加上了:
package main
import (
"fmt"
"time"
)
func write(ch chan<- int) {
var i int
for {
ch <- i
i ++
time.Sleep(time.Millisecond)
}
}
func read(ch <-chan int) {
for {
b := <- ch
fmt.Println(b)
}
}
func main() {
intChan := make(chan int, 10)
go write(intChan)
go read(intChan)
time.Sleep(time.Second * 5)
}
配合 select 使用
select 用法类似IO多路复用,可以同时监听多个 channel 的消息状态,用法如下:
select {
case <- ch2:
...
case <- ch3:
...
case ch4 <- 10;
...
default:
...
}
select 可以同时监听多个 channel 的写入或读取:
- 若只有一个 case 通过(不阻塞),则执行这个 case 块
- 若有多个 case 通过,则随机挑选一个 case 执行
- 若所有 case 均阻塞,则执行 default 块。若未定义 default 块,则 select 语句阻塞,直到有 case 被唤醒
- 使用 break 会跳出 select 块
- select 不会循环,就只会执行一个块然后就继续往后执行了
select只会执行一次
这个例子只会输出一次,随机是1或者是2,然后接结束了:
package main
import "fmt"
func main() {
ch2 := make(chan int, 1)
ch2 <- 1
ch3 := make(chan int, 1)
ch3 <- 2
select {
case v := <- ch2:
fmt.Println(v)
case v := <- ch3:
fmt.Println(v)
default:
fmt.Println(0)
}
}
所以如果要把管道里的数取完,或者取多次,就要再套一层for循环。
for循环和break的效果
在select外面用for套了一层死循环,这样就是反复的执行select。不过break在这里就没效果了:
package main
import (
"fmt"
"time"
)
func main() {
var ch2, ch3 chan int
ch2 = make(chan int, 10)
ch3 = make(chan int, 10)
for i := 0; i < cap(ch2); i++ {
ch2 <- i
ch3 <- i * i
}
// LABEL1:
for {
select {
case v := <- ch2:
fmt.Println("ch2", v)
case v := <- ch3:
fmt.Println("ch3", v)
default:
fmt.Println("所有元素都已经取完")
break // 这个break没有意义,因为值是跳出select,而不是for循环
// break LABEL1 // 这个break可以直接跳出for循环
}
time.Sleep(time.Second)
}
}
如果要跳出for循环,可以配合标签。上面的代码里已经写好了只是注释掉了。
定时器
定时器是在 time 包里的,
package main
import (
"fmt"
"time"
)
func main() {
t := time.NewTicker(time.Second)
for v := range t.C {
fmt.Println(v)
}
}
上面调用的NewTicker()方法返回的是个结构体,如下:
type Ticker struct {
C <-chan Time // The channel on which the ticks are delivered.
// contains filtered or unexported fields
}
上面的例子里遍历了 t.C 就是一个channel。time包内部应该是会产生一个goroutine,每隔一段时间就传一个数据进去。
设置超时时间
还有一个After()方法,和上面的方法是一样的。不过这个方法直接返回管道,即 NewTimer(d).C 。而NewTimer()方法的管道在返回的结构体的属性C里。这个After()方法用起来更方便。结合select正好可以做成一个设置任务超时时间的功能:
package main
import (
"fmt"
"time"
)
func task(ch chan struct{}) {
time.Sleep(time.Second * 3)
ch <- struct{}{}
}
func main() {
ch := make(chan struct{}) // 定义好信号的管道,传递空结构体
go task(ch) // 启动一个任务
select {
case <- ch:
fmt.Println("任务执行结束")
case <- time.After(time.Second * 2): // 2秒后超时
fmt.Println("任务超时")
}
}
goroutine 中使用 recover
程序里起的gorountine中如果panic了,并且这个goroutine里面没有捕获错误的话,整个程序就会挂掉。
下面的程序会报错(Panic),是gorountine里的产生的错误:
package main
func divideZero(ch chan int) {
zero := 0
ch <- 1 / zero
}
func main() {
ch := make(chan int)
go divideZero(ch)
<- ch
}
在gorountine中运行错误了,是可以不影响其他线程和主线程的继续执行的。所以,好的习惯是每当产生一个goroutine,就在开头用defer插入recover, 这样在出现panic的时候,就只是自己退出而不影响整个程序。下面是优化后的代码,加入了recover来捕获错误:
package main
import "fmt"
func divideZero(ch chan int) {
defer func () {
if err := recover(); err != nil {
fmt.Println(err)
// 要给管道传值,否则主线程从空管道里取值会阻塞,形成死锁
ch <- 0
}
}()
zero := 0
ch <- 1 / zero
}
func main() {
ch := make(chan int)
go divideZero(ch)
<- ch
}
单元测试
测试用例的文件名必须以_test.go结尾,测试的函数也必须以Test开头。符合命名规则,使用 go test 命令的时候就能自动运行测试用例。
这篇的单元测试比较粗糙,不过基本怎么用,以及用法示例都简单记下来了。
被测试的函数
先准备一个需要被测试的函数:
package main
import "fmt"
func get_fullname(first, last string) (fullname string) {
fullname = first + " " + last
return
}
func main() {
fullname := get_fullname("Barry", "Allen")
fmt.Println(fullname)
}
上面的 get_fullname() 函数就是接下来要进行单元测试的函数。
测试用例
package main
import "testing"
func TestName(t *testing.T) {
r := get_fullname("Sara", "Lance")
expect := "Sara Lance"
if r != expect {
t.Fatalf("ERROR: get_fullname expect: %s actual: %s", expect, r)
}
t.Log("测试成功")
}
执行测试
写完测试用例,就可以执行测试了,使用命令 go test。输出如下:
PS H:\Go\src\go_dev\day8\unit_test\name> go test
PASS
ok go_dev/day8/unit_test/name 0.058s
PS H:\Go\src\go_dev\day8\unit_test\name>
看到PASS了,但是t.Log()并没有输出,要看到更多信息,要用带上-v参数。使用命令 go test -v ,输出如下:
PS H:\Go\src\go_dev\day8\unit_test\name> go test -v
=== RUN TestName
--- PASS: TestName (0.00s)
name_test.go:11: 测试成功
PASS
ok go_dev/day8/unit_test/name 0.053s
PS H:\Go\src\go_dev\day8\unit_test\name>
直接用go test命令,只显示测试的结果。如果有多个测试用例,也只有一个结果。可以用-v参数看到详细的信息,每个测试用例的的结果都会打印出来。
如果某个测试失败了,就会直接退出,不会继续测试下去。
网页标题:Go语言8-goroutine和channel
地址分享:http://scyanting.com/article/pjidoo.html