如何利用 Goroutines 和 Channels 构建强大的并发数据处理流程
liebian365 2024-12-31 12:46 16 浏览 0 评论
最近在研究高性能数据处理时,我们发现 Go 语言的并发机制是一个强大的工具。今天,我们就来一起深入探讨如何使用 Goroutines 和 Channels 构建高效的数据处理管道,让你的程序在并发的世界里如鱼得水。
并发编程的重要性
在当今多核处理器盛行的时代,充分利用并发能力是提升程序性能的关键。Go 语言以其轻量级的 Goroutines 和强大的 Channels 而闻名,为开发者提供了构建高效并发程序的强大工具。
本文的核心:数据处理管道
我们将通过一个具体的代码示例,演示如何使用 Goroutines 和 Channels 构建一个简单但实用的数据处理管道。这个管道包含三个主要阶段:
- 输入处理 (InputHandler): 负责从预定义的数据源读取数据,并将数据发送到处理管道中。
- 计算 (Calculator): 从输入通道接收数据,执行计算操作,并将结果发送到输出通道。
- 结果收集 (ResultCollector): 接收计算结果,并将它们显示到控制台。
代码示例:构建并发数据处理管道
首先,我们来看一下整个程序的结构:
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
// MasterFunction orchestrates all tasks with goroutines.
func MasterFunction() {
runtime.GOMAXPROCS(runtime.NumCPU()) // Optimize for available CPU cores.
fmt.Printf(" Using %d CPU cores\n", runtime.NumCPU())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var wg sync.WaitGroup
inputChan := make(chan [2]int)
resultChan := make(chan string)
// Launch InputHandler.
wg.Add(1)
go InputHandler(ctx, inputChan, &wg)
// Launch Calculator.
wg.Add(1)
go Calculator(ctx, inputChan, resultChan, &wg)
// Launch ResultCollector.
wg.Add(1)
go ResultCollector(resultChan, &wg)
// Wait for all goroutines to complete.
wg.Wait()
close(resultChan) // Close result channel after all processing is done.
fmt.Println("? All tasks completed!")
}
func main() {
MasterFunction()
}
在 main 函数中,我们调用 MasterFunction 来启动整个数据处理管道。MasterFunction 做了这些事情:
- 设置 GOMAXPROCS 以利用所有可用的 CPU 核心。
- 创建一个带超时时间的 context。
- 创建 inputChan 和 resultChan 用于 Goroutines 之间的数据传递。
- 启动三个 Goroutines:InputHandler、Calculator 和 ResultCollector。
- 等待所有 Goroutines 完成后,关闭 resultChan 并打印完成消息。
数据输入:InputHandler
InputHandler 的职责是将数据发送到 inputChan。 关键代码如下:
func InputHandler(ctx context.Context, inputChan chan<- [2]int, wg *sync.WaitGroup) {
defer wg.Done()
defer close(inputChan) // Close the channel after sending all inputs.
inputs := [][]int{
{4, 2}, {6, 3}, {9, 5}, {10, 7},
}
for _, input := range inputs {
select {
case inputChan <- input:
fmt.Printf(" Sent input: %v\n", input)
case <-ctx.Done():
fmt.Println("X InputHandler stopped: Timeout reached")
return
}
}
}
- defer close(inputChan): 在函数结束时关闭 inputChan,这很重要,因为接收端(Calculator)会通过通道是否关闭来判断是否还有数据。
- select 语句: 这是 Go 中处理并发的强大工具。我们使用它来:
- 将数据发送到 inputChan。
- 监听 ctx.Done(),在超时或取消时优雅地退出。
数据处理:Calculator
Calculator Goroutine 从 inputChan 接收数据,进行简单的加减运算,并将结果发送到 resultChan:
func Calculator(ctx context.Context, inputChan <-chan [2]int, resultChan chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case input, ok := <-inputChan:
if !ok {
return // Input channel closed; exit the loop.
}
a, b := input[0], input[1]
add := a + b
sub := a - b
result := fmt.Sprintf("Input: %d %d | Add: %d | Sub: %d", a, b, add, sub)
select {
case resultChan <- result:
case <-ctx.Done():
fmt.Println("X Calculator stopped: Timeout reached")
return
}
case <-ctx.Done():
fmt.Println("X Calculator stopped: Timeout reached")
return
}
}
}
- 使用 for 循环:不断地从 inputChan 读取数据。
- input, ok := <-inputChan: 从 inputChan 中接收数据,并且检查通道是否已关闭。如果通道关闭,退出循环。
- 嵌套 select:
- 尝试将结果写入 resultChan。
- 监听 ctx.Done(),处理超时或取消的情况。
结果收集:ResultCollector
ResultCollector Goroutine 从 resultChan 接收结果,并将它们打印到控制台:
func ResultCollector(resultChan <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for result := range resultChan {
fmt.Println("M Result:", result)
}
}
- 使用 range resultChan 循环,可以不断地从 resultChan 接收结果,直到通道被关闭。
WaitGroup 和 context 的作用
- sync.WaitGroup: 用于等待所有 Goroutines 执行完成。我们通过 wg.Add(1) 来增加计数器,通过 wg.Done() 来减少计数器,最后使用 wg.Wait() 来阻塞,直到计数器归零。
- context.Context: 用于控制 Goroutines 的生命周期,特别是在处理超时和取消信号时非常有用。我们可以使用 select 语句监听 ctx.Done() 来优雅地退出 Goroutines。
总结
通过本文的示例,我们深入了解了如何使用 Go 语言的 Goroutines 和 Channels 构建强大的并发数据处理管道。
- Channels 是 Goroutines 之间通信的桥梁,它们安全地传递数据,避免了竞争条件。
- WaitGroup 管理 Goroutines 的生命周期,确保所有 Goroutines 完成后主程序才能退出。
- Context 用于处理超时和取消信号,确保程序的健壮性。
通过这种方式,我们可以构建出高效、可靠的并发数据处理流程。现在,请你思考以下问题:
- 如何增加数据处理的阶段?比如在 Calculator 后增加一个数据清洗的环节
- 如何处理 Calculator 中出现的错误?
- 如果数据源非常庞大,如何设计更高效的数据读取方式?
希望你能通过这篇文章,掌握在 Go 中进行并发编程的精髓。
相关推荐
- 4万多吨豪华游轮遇险 竟是因为这个原因……
-
(观察者网讯)4.7万吨豪华游轮搁浅,竟是因为油量太低?据观察者网此前报道,挪威游轮“维京天空”号上周六(23日)在挪威近海发生引擎故障搁浅。船上载有1300多人,其中28人受伤住院。经过数天的调...
- “菜鸟黑客”必用兵器之“渗透测试篇二”
-
"菜鸟黑客"必用兵器之"渗透测试篇二"上篇文章主要针对伙伴们对"渗透测试"应该如何学习?"渗透测试"的基本流程?本篇文章继续上次的分享,接着介绍一下黑客们常用的渗透测试工具有哪些?以及用实验环境让大家...
- 科幻春晚丨《震动羽翼说“Hello”》两万年星间飞行,探测器对地球的最终告白
-
作者|藤井太洋译者|祝力新【编者按】2021年科幻春晚的最后一篇小说,来自大家喜爱的日本科幻作家藤井太洋。小说将视角放在一颗太空探测器上,延续了他一贯的浪漫风格。...
- 麦子陪你做作业(二):KEGG通路数据库的正确打开姿势
-
作者:麦子KEGG是通路数据库中最庞大的,涵盖基因组网络信息,主要注释基因的功能和调控关系。当我们选到了合适的候选分子,单变量研究也已做完,接着研究机制的时便可使用到它。你需要了解你的分子目前已有哪些...
- 知存科技王绍迪:突破存储墙瓶颈,详解存算一体架构优势
-
智东西(公众号:zhidxcom)编辑|韦世玮智东西6月5日消息,近日,在落幕不久的GTIC2021嵌入式AI创新峰会上,知存科技CEO王绍迪博士以《存算一体AI芯片:AIoT设备的算力新选择》...
- 每日新闻播报(September 14)_每日新闻播报英文
-
AnOscarstatuestandscoveredwithplasticduringpreparationsleadinguptothe87thAcademyAward...
- 香港新巴城巴开放实时到站数据 供科技界研发使用
-
中新网3月22日电据香港《明报》报道,香港特区政府致力推动智慧城市,鼓励公私营机构开放数据,以便科技界研发使用。香港运输署21日与新巴及城巴(两巴)公司签署谅解备忘录,两巴将于2019年第3季度,开...
- 5款不容错过的APP: Red Bull Alert,Flipagram,WifiMapper
-
本周有不少非常出色的app推出,鸵鸟电台做了一个小合集。亮相本周榜单的有WifiMapper's安卓版的app,其中包含了RedBull的一款新型闹钟,还有一款可爱的怪物主题益智游戏。一起来看看我...
- Qt动画效果展示_qt显示图片
-
今天在这篇博文中,主要实践Qt动画,做一个实例来讲解Qt动画使用,其界面如下图所示(由于没有录制为gif动画图片,所以请各位下载查看效果):该程序使用应用程序单窗口,主窗口继承于QMainWindow...
- 如何从0到1设计实现一门自己的脚本语言
-
作者:dong...
- 三年级语文上册 仿写句子 需要的直接下载打印吧
-
描写秋天的好句好段1.秋天来了,山野变成了美丽的图画。苹果露出红红的脸庞,梨树挂起金黄的灯笼,高粱举起了燃烧的火把。大雁在天空一会儿写“人”字,一会儿写“一”字。2.花园里,菊花争奇斗艳,红的似火,粉...
- C++|那些一看就很简洁、优雅、经典的小代码段
-
目录0等概率随机洗牌:1大小写转换2字符串复制...
- 二年级上册语文必考句子仿写,家长打印,孩子照着练
-
二年级上册语文必考句子仿写,家长打印,孩子照着练。具体如下:...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- wireshark怎么抓包 (75)
- qt sleep (64)
- cs1.6指令代码大全 (55)
- factory-method (60)
- sqlite3_bind_blob (52)
- hibernate update (63)
- c++ base64 (70)
- nc 命令 (52)
- wm_close (51)
- epollin (51)
- sqlca.sqlcode (57)
- lua ipairs (60)
- tv_usec (64)
- 命令行进入文件夹 (53)
- postgresql array (57)
- statfs函数 (57)
- .project文件 (54)
- lua require (56)
- for_each (67)
- c#工厂模式 (57)
- wxsqlite3 (66)
- dmesg -c (58)
- fopen参数 (53)
- tar -zxvf -c (55)
- 速递查询 (52)