并发爬虫实战
本篇是你 Go 并发学习的 第 14 天:实战项目——并发爬虫。
目标:用
goroutine + channel + WaitGroup + context实现一个简易的 worker pool 爬虫。对标:Java 里的
ThreadPoolExecutor + BlockingQueue + Future。你的背景:Java 程序员
操作系统:Linux Mint XFCE
Go 版本:go1.22.2 linux/amd64
项目目录示例:
/home/liumangmang/GolandProjects/go-crawler
📌 标题
Go 并发实战:用 goroutine + channel 写一个简易并发爬虫(对标 Java ThreadPoolExecutor)
✅ 步骤 1:创建项目
cd /home/liumangmang/GolandProjects
mkdir go-crawler && cd go-crawler
go mod init go-crawler✅ 步骤 2:设计思路(Java 对比版)
Java 常见写法:
- 使用
ExecutorService pool = Executors.newFixedThreadPool(N); - 提交一批 URL 爬取任务
pool.submit(() -> fetch(url)); - 通过
Future或回调收集结果。
Go 对应思路:
- 一个
jobschannel:放要爬的 URL(像 Java 的任务队列)。 - 若干 worker goroutine:从
jobs里取 URL,执行fetch,把结果写到resultschannel。 WaitGroup等待所有 worker 结束;- 可选:
context控制总超时 / 取消。
✅ 步骤 3:最小可用版并发爬虫
创建 crawler_basic.go:
nano crawler_basic.go粘贴:
package main
import (
"fmt"
"io"
"net/http"
"regexp"
"sync"
)
// 抓取页面并提取 <title>
func fetchTitle(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
re := regexp.MustCompile("(?is)<title>(.*?)</title>")
matches := re.FindSubmatch(body)
if len(matches) >= 2 {
return string(matches[1]), nil
}
return "(no title)", nil
}
// worker 从 jobs 读取 URL,写结果到 results
func worker(id int, jobs <-chan string, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for url := range jobs { // channel 关闭后,range 自动结束
title, err := fetchTitle(url)
if err != nil {
results <- fmt.Sprintf("[worker-%d] %s ERROR: %v", id, url, err)
continue
}
results <- fmt.Sprintf("[worker-%d] %s => %s", id, url, title)
}
}
func main() {
urls := []string{
"https://golang.org",
"https://go.dev",
"https://www.baidu.com",
"https://www.bing.com",
}
jobs := make(chan string)
results := make(chan string)
var wg sync.WaitGroup
workerCount := 3
// 启动 worker
for i := 1; i <= workerCount; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
go func() {
for _, url := range urls {
jobs <- url
}
close(jobs) // 不再有新任务
}()
// 单独 goroutine 负责在所有 worker 结束后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 主 goroutine 消费结果
for res := range results {
fmt.Println(res)
}
}运行:
go run crawler_basic.go你会看到多个 worker-x 交错打印各个 URL 的 Title,这就是最基本的“并发爬虫 + worker pool”模型。
✅ 步骤 4:加入 context 控制总超时
有时你不希望爬虫无限等待,而是设置一个总超时时间,到了就整体停止。
创建 crawler_with_context.go:
nano crawler_with_context.go粘贴:
package main
import (
"context"
"fmt"
"io"
"net/http"
"regexp"
"sync"
"time"
)
func fetchTitleCtx(ctx context.Context, url string) (string, error) {
// 使用带 context 的请求
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
re := regexp.MustCompile("(?is)<title>(.*?)</title>")
matches := re.FindSubmatch(body)
if len(matches) >= 2 {
return string(matches[1]), nil
}
return "(no title)", nil
}
func workerCtx(ctx context.Context, id int, jobs <-chan string, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
results <- fmt.Sprintf("[worker-%d] 收到取消信号: %v", id, ctx.Err())
return
case url, ok := <-jobs:
if !ok {
return
}
// 为每个请求单独派生一个带超时的 ctx(例如单个请求 3 秒)
reqCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
title, err := fetchTitleCtx(reqCtx, url)
cancel()
if err != nil {
results <- fmt.Sprintf("[worker-%d] %s ERROR: %v", id, url, err)
continue
}
results <- fmt.Sprintf("[worker-%d] %s => %s", id, url, title)
}
}
}
func main() {
urls := []string{
"https://golang.org",
"https://go.dev",
"https://www.baidu.com",
"https://www.bing.com",
}
jobs := make(chan string)
results := make(chan string)
var wg sync.WaitGroup
workerCount := 3
// 整体爬虫最多运行 5 秒
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for i := 1; i <= workerCount; i++ {
wg.Add(1)
go workerCtx(ctx, i, jobs, results, &wg)
}
go func() {
for _, url := range urls {
select {
case <-ctx.Done():
return
case jobs <- url:
}
}
close(jobs)
}()
go func() {
wg.Wait()
close(results)
}()
for res := range results {
fmt.Println(res)
}
}如果你刻意加入一些“很慢甚至不响应”的 URL,就能看到 context 超时导致 worker 退出 的效果。
✅ 步骤 5:与 Java ThreadPoolExecutor 的对照
worker pool = 固定大小线程池:
- Java:
newFixedThreadPool(N); - Go:启动 N 个 goroutine,统一从
jobschannel 取任务。
- Java:
任务队列:
- Java:
BlockingQueue<Runnable>; - Go:
chan string(放 URL),chan Result(放结果)。
- Java:
关闭流程:
- Java:
pool.shutdown();+awaitTermination(...); - Go:关闭
jobschannel +WaitGroup.Wait(),再关闭results。
- Java:
超时 / 取消:
- Java:
Future.get(timeout)/ 自己维护取消令牌; - Go:
context.WithTimeout/WithCancel+ 在 goroutine 中select <-ctx.Done()。
- Java:
心法:把 ThreadPoolExecutor 想成“封装好的 worker pool + 队列 + 管理接口”,而在 Go 里你是直接用 goroutine + channel 原材料自己搭一个。
📚 今日小结与扩展练习
你已经完成:
- 用 goroutine + channel + WaitGroup 实现了一个简易并发爬虫;
- 用 context 控制了总超时时间和单个请求超时时间;
- 理解了 worker pool 模式与 Java 线程池的对应关系。
扩展练习:
- 给爬虫增加“深度”:解析页面中的链接,做一层或两层的递归抓取(注意不要无限爬)。
- 给每个 URL 定义一个结果结构体
struct { URL, Title string; Err error },通过results <- result返回,更贴近真实项目。 - 增加最大并发限制(例如最多同时 5 个请求),通过调整 worker 数量和 channel 容量体会性能差异。
到这里,你已经把 Go 并发从概念(goroutine、GPM、channel、select)一路练到实战(worker pool 并发爬虫)。接下来可以根据兴趣,深入某块(比如高性能网络、微服务、消息队列消费者等),用相同的并发模型继续扩展。
