Skip to content

patterns.go

文件信息

  • 📄 原文件:03_patterns.go
  • 🔤 语言:go

完整代码

go
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// ============================================================
//                      常见并发模式
// ============================================================
// Go 的并发模式基于 goroutine 和 channel
// 这些模式可以组合使用来解决复杂的并发问题

func main03() {
	fmt.Println("\n==================== 03_patterns ====================")

	// ----------------------------------------------------------
	// 模式1: Worker Pool(工作池)
	// ----------------------------------------------------------
	fmt.Println("=== Worker Pool ===")

	jobs := make(chan int, 10)
	results := make(chan int, 10)

	// 启动 3 个 worker
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}

	// 发送 5 个任务
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)

	// 收集结果
	for r := 1; r <= 5; r++ {
		result := <-results
		fmt.Println("结果:", result)
	}

	// ----------------------------------------------------------
	// 模式2: Fan-out/Fan-in(扇出/扇入)
	// ----------------------------------------------------------
	fmt.Println("\n=== Fan-out/Fan-in ===")

	input := generateNumbers(1, 5)

	// Fan-out: 多个 goroutine 从同一个 channel 读取
	c1 := squareWorker(input)
	c2 := squareWorker(input)

	// Fan-in: 合并多个 channel 到一个
	merged := fanIn(c1, c2)

	// 收集结果
	for range 5 {
		fmt.Println("平方结果:", <-merged)
	}

	// ----------------------------------------------------------
	// 模式3: Pipeline(管道)
	// ----------------------------------------------------------
	fmt.Println("\n=== Pipeline ===")

	// 创建管道: 生成 -> 翻倍 -> 加10
	nums := generate(1, 2, 3, 4, 5)
	doubled := double(nums)
	final := addTen(doubled)

	for n := range final {
		fmt.Println("管道结果:", n)
	}

	// ----------------------------------------------------------
	// 模式4: Done Channel(退出信号)
	// ----------------------------------------------------------
	fmt.Println("\n=== Done Channel ===")

	done := make(chan struct{})

	go func() {
		for {
			select {
			case <-done:
				fmt.Println("收到退出信号,goroutine 退出")
				return
			default:
				fmt.Println("工作中...")
				time.Sleep(50 * time.Millisecond)
			}
		}
	}()

	time.Sleep(120 * time.Millisecond)
	close(done) // 发送退出信号
	time.Sleep(50 * time.Millisecond)

	// ----------------------------------------------------------
	// 模式5: Context(上下文控制)
	// ----------------------------------------------------------
	fmt.Println("\n=== Context ===")

	// 带超时的 context
	ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
	defer cancel()

	go longRunningTask(ctx)

	time.Sleep(200 * time.Millisecond)

	// 手动取消
	ctx2, cancel2 := context.WithCancel(context.Background())

	go func() {
		for {
			select {
			case <-ctx2.Done():
				fmt.Println("Context 被取消:", ctx2.Err())
				return
			default:
				time.Sleep(30 * time.Millisecond)
			}
		}
	}()

	time.Sleep(80 * time.Millisecond)
	cancel2() // 手动取消
	time.Sleep(50 * time.Millisecond)

	// ----------------------------------------------------------
	// 模式6: Semaphore(信号量,限制并发数)
	// ----------------------------------------------------------
	fmt.Println("\n=== Semaphore ===")

	// 使用带缓冲的 channel 作为信号量
	sem := make(chan struct{}, 2) // 最多 2 个并发

	var wg sync.WaitGroup
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			sem <- struct{}{}        // 获取信号量
			defer func() { <-sem }() // 释放信号量

			fmt.Printf("任务 %d 开始(并发限制)\n", id)
			time.Sleep(50 * time.Millisecond)
			fmt.Printf("任务 %d 完成\n", id)
		}(i)
	}
	wg.Wait()

	// ----------------------------------------------------------
	// 模式7: Once(只执行一次)
	// ----------------------------------------------------------
	fmt.Println("\n=== sync.Once ===")

	var once sync.Once
	var wg2 sync.WaitGroup

	initFunc := func() {
		fmt.Println("初始化只执行一次")
	}

	for i := range 5 {
		wg2.Add(1)
		go func(n int) {
			defer wg2.Done()
			fmt.Printf("Goroutine %d 尝试初始化\n", n)
			once.Do(initFunc) // 只有第一个执行
		}(i)
	}
	wg2.Wait()

	// ----------------------------------------------------------
	// 模式8: 超时与重试
	// ----------------------------------------------------------
	fmt.Println("\n=== 超时与重试 ===")

	result, err := doWithRetry(3, 50*time.Millisecond, func() error {
		// 模拟可能失败的操作
		return fmt.Errorf("操作失败")
	})

	if err != nil {
		fmt.Println("最终失败:", err)
	} else {
		fmt.Println("成功:", result)
	}

	// ----------------------------------------------------------
	// 模式9: Rate Limiting(限流)
	// ----------------------------------------------------------
	fmt.Println("\n=== Rate Limiting ===")

	// 简单限流:每 50ms 处理一个请求
	limiter := time.Tick(50 * time.Millisecond)

	start := time.Now()
	for i := 1; i <= 3; i++ {
		<-limiter // 等待令牌
		fmt.Printf("请求 %d 处理于 %v\n", i, time.Since(start).Round(time.Millisecond))
	}
}

// ============================================================
//                      辅助函数
// ============================================================

// Worker Pool worker
func worker(id int, jobs <-chan int, results chan<- int) {
	for job := range jobs {
		fmt.Printf("Worker %d 处理任务 %d\n", id, job)
		time.Sleep(20 * time.Millisecond)
		results <- job * 2
	}
}

// 生成数字
func generateNumbers(start, count int) <-chan int {
	out := make(chan int)
	go func() {
		for i := 0; i < count; i++ {
			out <- start + i
		}
		close(out)
	}()
	return out
}

// 计算平方
func squareWorker(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

// Fan-in: 合并多个 channel
func fanIn(channels ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup

	for _, ch := range channels {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for n := range c {
				out <- n
			}
		}(ch)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

// Pipeline: 生成
func generate(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// Pipeline: 翻倍
func double(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * 2
		}
		close(out)
	}()
	return out
}

// Pipeline: 加10
func addTen(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n + 10
		}
		close(out)
	}()
	return out
}

// 长时间运行的任务
func longRunningTask(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("任务超时:", ctx.Err())
			return
		default:
			fmt.Println("长任务执行中...")
			time.Sleep(50 * time.Millisecond)
		}
	}
}

// 带重试的操作
func doWithRetry(maxRetries int, delay time.Duration, fn func() error) (string, error) {
	var err error
	for i := range maxRetries {
		fmt.Printf("尝试 %d/%d\n", i+1, maxRetries)
		err = fn()
		if err == nil {
			return "成功", nil
		}
		time.Sleep(delay)
	}
	return "", fmt.Errorf("重试 %d 次后失败: %w", maxRetries, err)
}

// ============================================================
//                      重要注意事项
// ============================================================
//
// 1. 【Worker Pool】
//    - 控制并发数量
//    - 复用 goroutine,减少创建开销
//
// 2. 【Fan-out/Fan-in】
//    - Fan-out: 多个 goroutine 读取同一 channel
//    - Fan-in: 多个 channel 合并到一个
//
// 3. 【Pipeline】
//    - 数据流经多个处理阶段
//    - 每个阶段是独立的 goroutine
//    - 使用 channel 连接
//
// 4. 【Context】
//    - 传递取消信号
//    - 设置超时和截止时间
//    - 传递请求范围的值
//    - 始终调用 cancel 释放资源
//
// 5. 【信号量】
//    - 带缓冲 channel 可作为信号量
//    - 容量 = 最大并发数
//
// 6. 【sync.Once】
//    - 确保初始化只执行一次
//    - 线程安全
//
// 7. 【goroutine 泄漏】
//    - 确保所有 goroutine 有退出路径
//    - 使用 done channel 或 context

💬 讨论

使用 GitHub 账号登录后即可参与讨论

基于 MIT 许可发布