目录并发介绍goroutineruntime包信道channelGoroutine池定时器select并发安全和锁系统监控sync原子操作GMP原理与调度爬虫并发介绍进程和线程进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。线程是
每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是go高并发的根本原因。goroutine 奉行通过通信来共享内存,而不是共享内存来通信。
goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单粗暴。
Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。
一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}
可增长的栈
OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
goroutine调度
GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。
package main
import (
"fmt"
"runtime"
)
// runtime.Gosched()
func main() {
go func(s string) {
for i := 0; i < 2; i++ {
fmt.Println(s)
}
}("world")
// 主协程
for i := 0; i < 2; i++ {
// 切一下,再次分配任务
runtime.Gosched()
fmt.Println("hello")
}
}
// runtime.Goexit() 退出当前协程
func main() {
go func() {
defer fmt.Println("A.defer")
func() {
defer fmt.Println("B.defer")
// 结束协程
runtime.Goexit()
defer fmt.Println("C.defer")
fmt.Println("B")
}()
fmt.Println("A")
}()
for {
}
}
// runtime.GOMAXPROCS Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(time.Second)
}
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel是一种类型,一种引用类型。声明通道类型的格式如下:var 变量 chan 元素类型
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道
通道有发送(send)、接收(receive)和关闭(close)三种操作。
ch := make(chan int) // 定义一个通道
ch <- 10 // 发送 把10发送到ch中
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
close(ch) // 关闭
关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
无缓冲的通道必须有接收才能发送。
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}
无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
}
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
worker pool(goroutine池)
package main
import ( "fmt" "math/rand" )
type Job struct { // id Id int // 需要计算的随机数 RandNum int }
type Result struct { // 这里必须传对象实例 job *Job // 求和 sum int }
func main() { // 需要2个管道 // 1.job管道 jobChan := make(chan Job, 128) // 2.结果管道 resultChan := make(chan Result, 128) // 3.创建工作池 createPool(64, jobChan, resultChan) // 4.开个打印的协程 go func(resultChan chan *Result) { // 遍历结果管道打印 for result := range resultChan { fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id, result.job.RandNum, result.sum) } }(resultChan) var id int // 循环创建job,输入到管道 for { id++ // 生成随机数 r_num := rand.Int() job := &Job{ Id: id, RandNum: r_num, } jobChan <- job } }
// 创建工作池 // 参数1:开几个协程 func createPool(num int, jobChan chan Job, resultChan chan Result) { // 根据开协程个数,去跑运行 for i := 0; i < num; i++ { go func(jobChan chan Job, resultChan chan Result) { // 执行运算 // 遍历job管道所有数据,进行相加 for job := range jobChan { // 随机数接过来 r_num := job.RandNum // 随机数每一位相加 // 定义返回值 var sum int for r_num != 0 { tmp := r_num % 10 sum += tmp r_num /= 10 } // 想要的结果是Result r := &Result{ job: job, sum: sum, } //运算结果扔到管道 resultChan <- r } }(jobChan, resultChan) } }
## 定时器
```go
// Timer:时间到了,执行只执行1次
package main
import (
"fmt"
"time"
)
func main() {
// 1.timer基本使用
//timer1 := time.NewTimer(2 * time.Second)
//t1 := time.Now()
//fmt.Printf("t1:%v\n", t1)
//t2 := <-timer1.C
//fmt.Printf("t2:%v\n", t2)
// 2.验证timer只能响应1次
//timer2 := time.NewTimer(time.Second)
//for {
// <-timer2.C
// fmt.Println("时间到")
//}
// 3.timer实现延时的功能
//(1)
//time.Sleep(time.Second)
//(2)
//timer3 := time.NewTimer(2 * time.Second)
//<-timer3.C
//fmt.Println("2秒到")
//(3)
//<-time.After(2*time.Second)
//fmt.Println("2秒到")
// 4.停止定时器
//timer4 := time.NewTimer(2 * time.Second)
//go func() {
// <-timer4.C
// fmt.Println("定时器执行了")
//}()
//b := timer4.Stop()
//if b {
// fmt.Println("timer4已经关闭")
//}
// 5.重置定时器
timer5 := time.NewTimer(3 * time.Second)
timer5.Reset(1 * time.Second)
fmt.Println(time.Now())
fmt.Println(<-timer5.C)
for {
}
}
// Timer:时间到了,多次执行
package main
import (
"fmt"
"time"
)
func main() {
// 1.获取ticker对象
ticker := time.NewTicker(1 * time.Second)
i := 0
// 子协程
go func() {
for {
//<-ticker.C
i++
fmt.Println(<-ticker.C)
if i == 5 {
//停止
ticker.Stop()
}
}
}()
for {
}
}
select多路复用, Go内置了select关键字,可以同时响应多个通道的操作。
select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
select可以同时监听一个或多个channel,直到其中一个channel ready
package main
import (
"fmt"
"time"
)
func test1(ch chan string) {
time.Sleep(time.Second * 5)
ch <- "test1"
}
func test2(ch chan string) {
time.Sleep(time.Second * 2)
ch <- "test2"
}
func main() {
// 2个管道
output1 := make(chan string)
output2 := make(chan string)
// 跑2个子协程,写数据
go test1(output1)
go test2(output2)
// 用select监控
select {
case s1 := <-output1:
fmt.Println("s1=", s1)
case s2 := <-output2:
fmt.Println("s2=", s2)
}
}
如果多个channel同时ready,则随机选择一个执行
package main
import (
"fmt"
)
func main() {
// 创建2个管道
int_chan := make(chan int, 1)
string_chan := make(chan string, 1)
go func() {
//time.Sleep(2 * time.Second)
int_chan <- 1
}()
go func() {
string_chan <- "hello"
}()
select {
case value := <-int_chan:
fmt.Println("int:", value)
case value := <-string_chan:
fmt.Println("string:", value)
}
fmt.Println("main结束")
}
可以用于判断管道是否存满
package main
import (
"fmt"
"time"
)
// 判断管道有没有存满
func main() {
// 创建管道
output1 := make(chan string, 10)
// 子协程写数据
go write(output1)
// 取数据
for s := range output1 {
fmt.Println("res:", s)
time.Sleep(time.Second)
}
}
func write(ch chan string) {
for {
select {
// 写数据
case ch <- "hello":
fmt.Println("write hello")
default:
fmt.Println("channel full")
}
time.Sleep(time.Millisecond * 500)
}
}
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
// lock.Lock() // 加互斥锁
rwlock.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
rwlock.Unlock() // 解写锁
// lock.Unlock() // 解互斥锁
wg.Done()
}
func read() {
// lock.Lock() // 加互斥锁
rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwlock.RUnlock() // 解读锁
// lock.Unlock() // 解互斥锁
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
package main
import (
"fmt"
"net/http"
_ "net/http/pprof"
"runtime"
"time"
)
func worker() {
for {
time.Sleep(time.Second)
}
}
func main() {
go http.ListenAndServe(":8080", nil)
for i := 0; i < 1000; i++ {
go worker()
}
for {
fmt.Printf("当前 Goroutine 数量: %d\n", runtime.NumGoroutine())
time.Sleep(time.Second)
}
}
package main
import (
"fmt"
"runtime"
"runtime/pprof"
"time"
)
func main() {
f, err := os.Create("cpu.pprof")
if err != nil {
log.Fatal(err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal(err)
}
defer pprof.StopCPUProfile()
for i := 0; i < 1000; i++ {
go func() {
for j := 0; j < 1000000; j++ {
x := make([]byte, 1024*1024)
}
}()
}
time.Sleep(5 * time.Second)
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
fmt.Printf("总分配内存: %v\n", memStats.TotalAlloc)
fmt.Printf("当前使用内存: %v\n", memStats.Sys-memStats.Frees)
}
package main
import (
"fmt"
"runtime"
"time"
)
func worker(ch chan int) {
for {
<-ch
fmt.Println("Worker running")
}
}
func main() {
ch := make(chan int, 1)
go worker(ch)
time.Sleep(time.Second)
fmt.Printf("当前阻塞 Goroutine 数量: %d\n", len(runtime.BlockingProfileRate()))
}
Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:
方法名 功能
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}
sync.Once 说在前面的话:这是一个进阶知识点。
在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。
Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once。
sync.Once只有一个Do方法,其签名如下:
func (o *Once) Do(f func()) {}
延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
使用sync.Once改造的示例代码如下:
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n)
value, _ := m.Load(key)
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。
var x int64
var l sync.Mutex
var wg sync.WaitGroup
// 普通版加函数
func add() {
// x = x + 1
x++ // 等价于上面的操作
wg.Done()
}
// 互斥锁版加函数
func mutexAdd() {
l.Lock()
x++
l.Unlock()
wg.Done()
}
// 原子操作版加函数
func atomicAdd() {
atomic.AddInt64(&x, 1)
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
// go add() // 普通版add函数 不是并发安全的
// go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大
go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
}
wg.Wait()
end := time.Now()
fmt.Println(x)
fmt.Println(end.Sub(start))
}
package main
import ( "fmt" "io/ioutil" "net/http" "regexp" )
//这个只是一个简单的版本只是获取QQ邮箱并且没有进行封装操作,另外爬出来的数据也没有进行去重操作
var (
// \d是数字
reQQEmail = (\d+)@qq.com
)
// 爬邮箱 func GetEmail() { // 1.去网站拿数据 resp, err := http.Get("https://tieba.baidu.com/p/6051076813?red_tag=1573533731") HandleError(err, "http.Get url") defer resp.Body.Close() // 2.读取页面内容 pageBytes, err := ioutil.ReadAll(resp.Body) HandleError(err, "ioutil.ReadAll") // 字节转字符串 pageStr := string(pageBytes) //fmt.Println(pageStr) // 3.过滤数据,过滤qq邮箱 re := regexp.MustCompile(reQQEmail) // -1代表取全部 results := re.FindAllStringSubmatch(pageStr, -1) //fmt.Println(results)
// 遍历结果 for _, result := range results { fmt.Println("email:", result[0]) fmt.Println("qq:", result[1]) } }
// 处理异常 func HandleError(err error, why string) { if err != nil { fmt.Println(why, err) } } func main() { GetEmail() }
### 正则表达式
- 文档:https://studygolang.com/pkgdoc
- API
- re := regexp.MustCompile(reStr),传入正则表达式,得到正则表达式对象
- ret := re.FindAllStringSubmatch(srcStr,-1):用正则对象,获取页面页面,srcStr是页面内容,-1代表取全部
- 爬邮箱
- 方法抽取
- 爬超链接
- 爬手机号
- http://www.zhaohaowang.com/ 如果连接失效了自己找一个有手机号的就好了
- 爬身份证号
- http://henan.qq.com/a/20171107/069413.htm 如果连接失效了自己找一个就好了
- 爬图片链接
```go
package main
import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
)
var (
// w代表大小写字母+数字+下划线
reEmail = `\w+@\w+\.\w+`
// s?有或者没有s
// +代表出1次或多次
//\s\S各种字符
// +?代表贪婪模式
reLinke = `href="(https?://[\s\S]+?)"`
rePhone = `1[3456789]\d\s?\d{4}\s?\d{4}`
reIdcard = `[123456789]\d{5}((19\d{2})|(20[01]\d))((0[1-9])|(1[012]))((0[1-9])|([12]\d)|(3[01]))\d{3}[\dXx]`
reImg = `https?://[^"]+?(\.((jpg)|(png)|(jpeg)|(gif)|(bmp)))`
)
// 处理异常
func HandleError(err error, why string) {
if err != nil {
fmt.Println(why, err)
}
}
func GetEmail2(url string) {
pageStr := GetPageStr(url)
re := regexp.MustCompile(reEmail)
results := re.FindAllStringSubmatch(pageStr, -1)
for _, result := range results {
fmt.Println(result)
}
}
// 抽取根据url获取内容
func GetPageStr(url string) (pageStr string) {
resp, err := http.Get(url)
HandleError(err, "http.Get url")
defer resp.Body.Close()
// 2.读取页面内容
pageBytes, err := ioutil.ReadAll(resp.Body)
HandleError(err, "ioutil.ReadAll")
// 字节转字符串
pageStr = string(pageBytes)
return pageStr
}
func main() {
// 2.抽取的爬邮箱
// GetEmail2("https://tieba.baidu.com/p/6051076813?red_tag=1573533731")
// 3.爬链接
//GetLink("http://www.baidu.com/s?wd=%E8%B4%B4%E5%90%A7%20%E7%95%99%E4%B8%8B%E9%82%AE%E7%AE%B1&rsv_spt=1&rsv_iqid=0x98ace53400003985&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&tn=baiduhome_pg&rsv_enter=1&rsv_dl=ib&rsv_sug2=0&inputT=5197&rsv_sug4=6345")
// 4.爬手机号
//GetPhone("https://www.zhaohaowang.com/")
// 5.爬身份证号
//GetIdCard("https://henan.qq.com/a/20171107/069413.htm")
// 6.爬图片
// GetImg("http://image.baidu.com/search/index?tn=baiduimage&ps=1&ct=201326592&lm=-1&cl=2&nc=1&ie=utf-8&word=%E7%BE%8E%E5%A5%B3")
}
func GetIdCard(url string) {
pageStr := GetPageStr(url)
re := regexp.MustCompile(reIdcard)
results := re.FindAllStringSubmatch(pageStr, -1)
for _, result := range results {
fmt.Println(result)
}
}
// 爬链接
func GetLink(url string) {
pageStr := GetPageStr(url)
re := regexp.MustCompile(reLinke)
results := re.FindAllStringSubmatch(pageStr, -1)
for _, result := range results {
fmt.Println(result[1])
}
}
//爬手机号
func GetPhone(url string) {
pageStr := GetPageStr(url)
re := regexp.MustCompile(rePhone)
results := re.FindAllStringSubmatch(pageStr, -1)
for _, result := range results {
fmt.Println(result)
}
}
func GetImg(url string) {
pageStr := GetPageStr(url)
re := regexp.MustCompile(reImg)
results := re.FindAllStringSubmatch(pageStr, -1)
for _, result := range results {
fmt.Println(result[0])
}
}
下面的两个是即将要爬的网站,如果网址失效自己换一个就好了
https://www.bizhizu.cn/shouji/tag-%E5%8F%AF%E7%88%B1/1.html
package main
import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
func HandleError(err error, why string) {
if err != nil {
fmt.Println(why, err)
}
}
// 下载图片,传入的是图片叫什么
func DownloadFile(url string, filename string) (ok bool) {
resp, err := http.Get(url)
HandleError(err, "http.get.url")
defer resp.Body.Close()
bytes, err := ioutil.ReadAll(resp.Body)
HandleError(err, "resp.body")
filename = "E:/topgoer.com/src/github.com/student/3.0/img/" + filename
// 写出数据
err = ioutil.WriteFile(filename, bytes, 0666)
if err != nil {
return false
} else {
return true
}
}
// 并发爬思路:
// 1.初始化数据管道
// 2.爬虫写出:26个协程向管道中添加图片链接
// 3.任务统计协程:检查26个任务是否都完成,完成则关闭数据管道
// 4.下载协程:从管道里读取链接并下载
var (
// 存放图片链接的数据管道
chanImageUrls chan string
waitGroup sync.WaitGroup
// 用于监控协程
chanTask chan string
reImg = `https?://[^"]+?(\.((jpg)|(png)|(jpeg)|(gif)|(bmp)))`
)
func main() {
// myTest()
// DownloadFile("http://i1.shaodiyejin.com/uploads/tu/201909/10242/e5794daf58_4.jpg", "1.jpg")
// 1.初始化管道
chanImageUrls = make(chan string, 1000000)
chanTask = make(chan string, 26)
// 2.爬虫协程
for i := 1; i < 27; i++ {
waitGroup.Add(1)
go getImgUrls("https://www.bizhizu.cn/shouji/tag-%E5%8F%AF%E7%88%B1/" + strconv.Itoa(i) + ".html")
}
// 3.任务统计协程,统计26个任务是否都完成,完成则关闭管道
waitGroup.Add(1)
go CheckOK()
// 4.下载协程:从管道中读取链接并下载
for i := 0; i < 5; i++ {
waitGroup.Add(1)
go DownloadImg()
}
waitGroup.Wait()
}
// 下载图片
func DownloadImg() {
for url := range chanImageUrls {
filename := GetFilenameFromUrl(url)
ok := DownloadFile(url, filename)
if ok {
fmt.Printf("%s 下载成功\n", filename)
} else {
fmt.Printf("%s 下载失败\n", filename)
}
}
waitGroup.Done()
}
// 截取url名字
func GetFilenameFromUrl(url string) (filename string) {
// 返回最后一个/的位置
lastIndex := strings.LastIndex(url, "/")
// 切出来
filename = url[lastIndex+1:]
// 时间戳解决重名
timePrefix := strconv.Itoa(int(time.Now().UnixNano()))
filename = timePrefix + "_" + filename
return
}
// 任务统计协程
func CheckOK() {
var count int
for {
url := <-chanTask
fmt.Printf("%s 完成了爬取任务\n", url)
count++
if count == 26 {
close(chanImageUrls)
break
}
}
waitGroup.Done()
}
// 爬图片链接到管道
// url是传的整页链接
func getImgUrls(url string) {
urls := getImgs(url)
// 遍历切片里所有链接,存入数据管道
for _, url := range urls {
chanImageUrls <- url
}
// 标识当前协程完成
// 每完成一个任务,写一条数据
// 用于监控协程知道已经完成了几个任务
chanTask <- url
waitGroup.Done()
}
// 获取当前页图片链接
func getImgs(url string) (urls []string) {
pageStr := GetPageStr(url)
re := regexp.MustCompile(reImg)
results := re.FindAllStringSubmatch(pageStr, -1)
fmt.Printf("共找到%d条结果\n", len(results))
for _, result := range results {
url := result[0]
urls = append(urls, url)
}
return
}
// 抽取根据url获取内容
func GetPageStr(url string) (pageStr string) {
resp, err := http.Get(url)
HandleError(err, "http.Get url")
defer resp.Body.Close()
// 2.读取页面内容
pageBytes, err := ioutil.ReadAll(resp.Body)
HandleError(err, "ioutil.ReadAll")
// 字节转字符串
pageStr = string(pageBytes)
return pageStr
}
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!