Go语言并发编程

目录并发介绍goroutineruntime包信道channelGoroutine池定时器select并发安全和锁系统监控sync原子操作GMP原理与调度爬虫并发介绍进程和线程进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。线程是

目录


并发介绍

进程和线程

  1. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。
  2. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
  3. 一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。

并发和并行

  1. 多线程程序在一个核的cpu上运行,就是并发。
  2. 多线程程序在多个核的cpu上运行,就是并行。

协程和线程

  1. 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。
  2. 线程:一个线程上可以跑多个协程,协程是轻量级的线程。

goroutine 只是由官方实现的超级”线程池”。

每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是go高并发的根本原因。goroutine 奉行通过通信来共享内存,而不是共享内存来通信。

goroutine

goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。

一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

启动单个goroutine

func hello() {
  fmt.Println("Hello Goroutine!")
}
func main() {
  go hello() // 启动另外一个goroutine去执行hello函数
  fmt.Println("main goroutine done!")
  time.Sleep(time.Second)
}

启动多个goroutine

var wg sync.WaitGroup

func hello(i int) {
  defer wg.Done() // goroutine结束就登记-1
  fmt.Println("Hello Goroutine!", i)
}
func main() {
  for i := 0; i < 10; i++ {
    wg.Add(1) // 启动一个goroutine就登记+1
    go hello(i)
  }
  wg.Wait() // 等待所有登记的goroutine都结束
}

goroutine与线程

可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  1. G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
  2. P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
  3. M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;

runtime包

package main

import (
    "fmt"
    "runtime"
)

// runtime.Gosched()
func main() {
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s)
        }
    }("world")
    // 主协程
    for i := 0; i < 2; i++ {
        // 切一下,再次分配任务
        runtime.Gosched()
        fmt.Println("hello")
    }
}

// runtime.Goexit() 退出当前协程
func main() {
    go func() {
        defer fmt.Println("A.defer")
        func() {
            defer fmt.Println("B.defer")
            // 结束协程
            runtime.Goexit()
            defer fmt.Println("C.defer")
            fmt.Println("B")
        }()
        fmt.Println("A")
    }()
    for {
    }
}

// runtime.GOMAXPROCS Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}  

信道channel

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel类型

channel是一种类型,一种引用类型。声明通道类型的格式如下:var 变量 chan 元素类型

var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道    

channel 实例

通道有发送(send)、接收(receive)和关闭(close)三种操作。

ch := make(chan int)     // 定义一个通道
ch <- 10 // 发送 把10发送到ch中
x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果  
close(ch)   //  关闭

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

无缓冲的通道

无缓冲的通道必须有接收才能发送。

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 启用goroutine从通道接收值
    ch <- 10
    fmt.Println("发送成功")
}  

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

有缓冲的通道

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。

  ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
  ch <- 10
  fmt.Println("发送成功")
} 

单向通道

func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}   

Goroutine池

worker pool(goroutine池)

  1. 本质上是生产者消费者模型
  2. 可以有效控制goroutine数量,防止暴涨
  3. 需求:
    • 计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6
    • 随机生成数字进行计算
      
      package main

import ( "fmt" "math/rand" )

type Job struct { // id Id int // 需要计算的随机数 RandNum int }

type Result struct { // 这里必须传对象实例 job *Job // 求和 sum int }

func main() { // 需要2个管道 // 1.job管道 jobChan := make(chan Job, 128) // 2.结果管道 resultChan := make(chan Result, 128) // 3.创建工作池 createPool(64, jobChan, resultChan) // 4.开个打印的协程 go func(resultChan chan *Result) { // 遍历结果管道打印 for result := range resultChan { fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id, result.job.RandNum, result.sum) } }(resultChan) var id int // 循环创建job,输入到管道 for { id++ // 生成随机数 r_num := rand.Int() job := &Job{ Id: id, RandNum: r_num, } jobChan <- job } }

// 创建工作池 // 参数1:开几个协程 func createPool(num int, jobChan chan Job, resultChan chan Result) { // 根据开协程个数,去跑运行 for i := 0; i < num; i++ { go func(jobChan chan Job, resultChan chan Result) { // 执行运算 // 遍历job管道所有数据,进行相加 for job := range jobChan { // 随机数接过来 r_num := job.RandNum // 随机数每一位相加 // 定义返回值 var sum int for r_num != 0 { tmp := r_num % 10 sum += tmp r_num /= 10 } // 想要的结果是Result r := &Result{ job: job, sum: sum, } //运算结果扔到管道 resultChan <- r } }(jobChan, resultChan) } }


## 定时器
```go
// Timer:时间到了,执行只执行1次
package main

import (
    "fmt"
    "time"
)

func main() {
    // 1.timer基本使用
    //timer1 := time.NewTimer(2 * time.Second)
    //t1 := time.Now()
    //fmt.Printf("t1:%v\n", t1)
    //t2 := &lt;-timer1.C
    //fmt.Printf("t2:%v\n", t2)

    // 2.验证timer只能响应1次
    //timer2 := time.NewTimer(time.Second)
    //for {
    // &lt;-timer2.C
    // fmt.Println("时间到")
    //}

    // 3.timer实现延时的功能
    //(1)
    //time.Sleep(time.Second)
    //(2)
    //timer3 := time.NewTimer(2 * time.Second)
    //&lt;-timer3.C
    //fmt.Println("2秒到")
    //(3)
    //&lt;-time.After(2*time.Second)
    //fmt.Println("2秒到")

    // 4.停止定时器
    //timer4 := time.NewTimer(2 * time.Second)
    //go func() {
    // &lt;-timer4.C
    // fmt.Println("定时器执行了")
    //}()
    //b := timer4.Stop()
    //if b {
    // fmt.Println("timer4已经关闭")
    //}

    // 5.重置定时器
    timer5 := time.NewTimer(3 * time.Second)
    timer5.Reset(1 * time.Second)
    fmt.Println(time.Now())
    fmt.Println(&lt;-timer5.C)

    for {
    }
}

// Timer:时间到了,多次执行
package main

import (
    "fmt"
    "time"
)

func main() {
    // 1.获取ticker对象
    ticker := time.NewTicker(1 * time.Second)
    i := 0
    // 子协程
    go func() {
        for {
            //&lt;-ticker.C
            i++
            fmt.Println(&lt;-ticker.C)
            if i == 5 {
                //停止
                ticker.Stop()
            }
        }
    }()
    for {
    }
}

select

select多路复用, Go内置了select关键字,可以同时响应多个通道的操作。

select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:

select {
  case &lt;-chan1:
      // 如果chan1成功读到数据,则进行该case处理语句
  case chan2 &lt;- 1:
      // 如果成功向chan2写入数据,则进行该case处理语句
  default:
      // 如果上面都没有成功,则进入default处理流程
}

select可以同时监听一个或多个channel,直到其中一个channel ready

package main

import (
   "fmt"
   "time"
)

func test1(ch chan string) {
   time.Sleep(time.Second * 5)
   ch &lt;- "test1"
}
func test2(ch chan string) {
   time.Sleep(time.Second * 2)
   ch &lt;- "test2"
}

func main() {
   // 2个管道
   output1 := make(chan string)
   output2 := make(chan string)
   // 跑2个子协程,写数据
   go test1(output1)
   go test2(output2)
   // 用select监控
   select {
   case s1 := &lt;-output1:
      fmt.Println("s1=", s1)
   case s2 := &lt;-output2:
      fmt.Println("s2=", s2)
   }
}

如果多个channel同时ready,则随机选择一个执行


package main

import (
   "fmt"
)

func main() {
   // 创建2个管道
   int_chan := make(chan int, 1)
   string_chan := make(chan string, 1)
   go func() {
      //time.Sleep(2 * time.Second)
      int_chan &lt;- 1
   }()
   go func() {
      string_chan &lt;- "hello"
   }()
   select {...

剩余50%的内容订阅专栏后可查看

点赞 1
收藏 2
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论