目录并发介绍goroutineruntime包信道channelGoroutine池定时器select并发安全和锁系统监控sync原子操作GMP原理与调度爬虫并发介绍进程和线程进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。线程是
每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是go高并发的根本原因。goroutine 奉行通过通信来共享内存,而不是共享内存来通信。
goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。
Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。
一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}
可增长的栈
OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
goroutine调度
GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。
package main
import (
"fmt"
"runtime"
)
// runtime.Gosched()
func main() {
go func(s string) {
for i := 0; i < 2; i++ {
fmt.Println(s)
}
}("world")
// 主协程
for i := 0; i < 2; i++ {
// 切一下,再次分配任务
runtime.Gosched()
fmt.Println("hello")
}
}
// runtime.Goexit() 退出当前协程
func main() {
go func() {
defer fmt.Println("A.defer")
func() {
defer fmt.Println("B.defer")
// 结束协程
runtime.Goexit()
defer fmt.Println("C.defer")
fmt.Println("B")
}()
fmt.Println("A")
}()
for {
}
}
// runtime.GOMAXPROCS Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(time.Second)
}
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel是一种类型,一种引用类型。声明通道类型的格式如下:var 变量 chan 元素类型
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道
通道有发送(send)、接收(receive)和关闭(close)三种操作。
ch := make(chan int) // 定义一个通道
ch <- 10 // 发送 把10发送到ch中
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
close(ch) // 关闭
关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
无缓冲的通道必须有接收才能发送。
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
}
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
worker pool(goroutine池)
package mainimport ( "fmt" "math/rand" )
type Job struct { // id Id int // 需要计算的随机数 RandNum int }
type Result struct { // 这里必须传对象实例 job *Job // 求和 sum int }
func main() { // 需要2个管道 // 1.job管道 jobChan := make(chan Job, 128) // 2.结果管道 resultChan := make(chan Result, 128) // 3.创建工作池 createPool(64, jobChan, resultChan) // 4.开个打印的协程 go func(resultChan chan *Result) { // 遍历结果管道打印 for result := range resultChan { fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id, result.job.RandNum, result.sum) } }(resultChan) var id int // 循环创建job,输入到管道 for { id++ // 生成随机数 r_num := rand.Int() job := &Job{ Id: id, RandNum: r_num, } jobChan <- job } }
// 创建工作池 // 参数1:开几个协程 func createPool(num int, jobChan chan Job, resultChan chan Result) { // 根据开协程个数,去跑运行 for i := 0; i < num; i++ { go func(jobChan chan Job, resultChan chan Result) { // 执行运算 // 遍历job管道所有数据,进行相加 for job := range jobChan { // 随机数接过来 r_num := job.RandNum // 随机数每一位相加 // 定义返回值 var sum int for r_num != 0 { tmp := r_num % 10 sum += tmp r_num /= 10 } // 想要的结果是Result r := &Result{ job: job, sum: sum, } //运算结果扔到管道 resultChan <- r } }(jobChan, resultChan) } }
## 定时器
```go
// Timer:时间到了,执行只执行1次
package main
import (
"fmt"
"time"
)
func main() {
// 1.timer基本使用
//timer1 := time.NewTimer(2 * time.Second)
//t1 := time.Now()
//fmt.Printf("t1:%v\n", t1)
//t2 := <-timer1.C
//fmt.Printf("t2:%v\n", t2)
// 2.验证timer只能响应1次
//timer2 := time.NewTimer(time.Second)
//for {
// <-timer2.C
// fmt.Println("时间到")
//}
// 3.timer实现延时的功能
//(1)
//time.Sleep(time.Second)
//(2)
//timer3 := time.NewTimer(2 * time.Second)
//<-timer3.C
//fmt.Println("2秒到")
//(3)
//<-time.After(2*time.Second)
//fmt.Println("2秒到")
// 4.停止定时器
//timer4 := time.NewTimer(2 * time.Second)
//go func() {
// <-timer4.C
// fmt.Println("定时器执行了")
//}()
//b := timer4.Stop()
//if b {
// fmt.Println("timer4已经关闭")
//}
// 5.重置定时器
timer5 := time.NewTimer(3 * time.Second)
timer5.Reset(1 * time.Second)
fmt.Println(time.Now())
fmt.Println(<-timer5.C)
for {
}
}
// Timer:时间到了,多次执行
package main
import (
"fmt"
"time"
)
func main() {
// 1.获取ticker对象
ticker := time.NewTicker(1 * time.Second)
i := 0
// 子协程
go func() {
for {
//<-ticker.C
i++
fmt.Println(<-ticker.C)
if i == 5 {
//停止
ticker.Stop()
}
}
}()
for {
}
}
select多路复用, Go内置了select关键字,可以同时响应多个通道的操作。
select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
select可以同时监听一个或多个channel,直到其中一个channel ready
package main
import (
"fmt"
"time"
)
func test1(ch chan string) {
time.Sleep(time.Second * 5)
ch <- "test1"
}
func test2(ch chan string) {
time.Sleep(time.Second * 2)
ch <- "test2"
}
func main() {
// 2个管道
output1 := make(chan string)
output2 := make(chan string)
// 跑2个子协程,写数据
go test1(output1)
go test2(output2)
// 用select监控
select {
case s1 := <-output1:
fmt.Println("s1=", s1)
case s2 := <-output2:
fmt.Println("s2=", s2)
}
}
如果多个channel同时ready,则随机选择一个执行
package main
import (
"fmt"
)
func main() {
// 创建2个管道
int_chan := make(chan int, 1)
string_chan := make(chan string, 1)
go func() {
//time.Sleep(2 * time.Second)
int_chan <- 1
}()
go func() {
string_chan <- "hello"
}()
select {... 如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!