百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

如何利用 Goroutines 和 Channels 构建强大的并发数据处理流程

liebian365 2024-12-31 12:46 27 浏览 0 评论

最近在研究高性能数据处理时,我们发现 Go 语言的并发机制是一个强大的工具。今天,我们就来一起深入探讨如何使用 Goroutines 和 Channels 构建高效的数据处理管道,让你的程序在并发的世界里如鱼得水。

并发编程的重要性

在当今多核处理器盛行的时代,充分利用并发能力是提升程序性能的关键。Go 语言以其轻量级的 Goroutines 和强大的 Channels 而闻名,为开发者提供了构建高效并发程序的强大工具。

本文的核心:数据处理管道

我们将通过一个具体的代码示例,演示如何使用 Goroutines 和 Channels 构建一个简单但实用的数据处理管道。这个管道包含三个主要阶段:

  1. 输入处理 (InputHandler): 负责从预定义的数据源读取数据,并将数据发送到处理管道中。
  2. 计算 (Calculator): 从输入通道接收数据,执行计算操作,并将结果发送到输出通道。
  3. 结果收集 (ResultCollector): 接收计算结果,并将它们显示到控制台。

代码示例:构建并发数据处理管道

首先,我们来看一下整个程序的结构:

package main

import (
	"context"
	"fmt"
	"runtime"
	"sync"
	"time"
)

// MasterFunction orchestrates all tasks with goroutines.
func MasterFunction() {
	runtime.GOMAXPROCS(runtime.NumCPU()) // Optimize for available CPU cores.
	fmt.Printf(" Using %d CPU cores\n", runtime.NumCPU())

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	var wg sync.WaitGroup
	inputChan := make(chan [2]int)
	resultChan := make(chan string)

	// Launch InputHandler.
	wg.Add(1)
	go InputHandler(ctx, inputChan, &wg)

	// Launch Calculator.
	wg.Add(1)
	go Calculator(ctx, inputChan, resultChan, &wg)

	// Launch ResultCollector.
	wg.Add(1)
	go ResultCollector(resultChan, &wg)

	// Wait for all goroutines to complete.
	wg.Wait()
	close(resultChan) // Close result channel after all processing is done.
	fmt.Println("? All tasks completed!")
}

func main() {
	MasterFunction()
}

在 main 函数中,我们调用 MasterFunction 来启动整个数据处理管道。MasterFunction 做了这些事情:

  • 设置 GOMAXPROCS 以利用所有可用的 CPU 核心。
  • 创建一个带超时时间的 context。
  • 创建 inputChan 和 resultChan 用于 Goroutines 之间的数据传递。
  • 启动三个 Goroutines:InputHandler、Calculator 和 ResultCollector。
  • 等待所有 Goroutines 完成后,关闭 resultChan 并打印完成消息。

数据输入:InputHandler

InputHandler 的职责是将数据发送到 inputChan。 关键代码如下:

func InputHandler(ctx context.Context, inputChan chan<- [2]int, wg *sync.WaitGroup) {
	defer wg.Done()
	defer close(inputChan) // Close the channel after sending all inputs.

	inputs := [][]int{
		{4, 2}, {6, 3}, {9, 5}, {10, 7},
	}

	for _, input := range inputs {
		select {
		case inputChan <- input:
			fmt.Printf(" Sent input: %v\n", input)
		case <-ctx.Done():
			fmt.Println("X InputHandler stopped: Timeout reached")
			return
		}
	}
}
  • defer close(inputChan): 在函数结束时关闭 inputChan,这很重要,因为接收端(Calculator)会通过通道是否关闭来判断是否还有数据。
  • select 语句: 这是 Go 中处理并发的强大工具。我们使用它来:
    • 将数据发送到 inputChan。
    • 监听 ctx.Done(),在超时或取消时优雅地退出。

数据处理:Calculator

Calculator Goroutine 从 inputChan 接收数据,进行简单的加减运算,并将结果发送到 resultChan:

func Calculator(ctx context.Context, inputChan <-chan [2]int, resultChan chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		select {
		case input, ok := <-inputChan:
			if !ok {
				return // Input channel closed; exit the loop.
			}
			a, b := input[0], input[1]
			add := a + b
			sub := a - b
			result := fmt.Sprintf("Input: %d %d | Add: %d | Sub: %d", a, b, add, sub)
			select {
			case resultChan <- result:
			case <-ctx.Done():
				fmt.Println("X Calculator stopped: Timeout reached")
				return
			}
		case <-ctx.Done():
			fmt.Println("X Calculator stopped: Timeout reached")
			return
		}
	}
}
  • 使用 for 循环:不断地从 inputChan 读取数据。
  • input, ok := <-inputChan: 从 inputChan 中接收数据,并且检查通道是否已关闭。如果通道关闭,退出循环。
  • 嵌套 select:
    • 尝试将结果写入 resultChan。
    • 监听 ctx.Done(),处理超时或取消的情况。

结果收集:ResultCollector

ResultCollector Goroutine 从 resultChan 接收结果,并将它们打印到控制台:

func ResultCollector(resultChan <-chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for result := range resultChan {
		fmt.Println("M Result:", result)
	}
}
  • 使用 range resultChan 循环,可以不断地从 resultChan 接收结果,直到通道被关闭。

WaitGroup 和 context 的作用

  • sync.WaitGroup: 用于等待所有 Goroutines 执行完成。我们通过 wg.Add(1) 来增加计数器,通过 wg.Done() 来减少计数器,最后使用 wg.Wait() 来阻塞,直到计数器归零。
  • context.Context: 用于控制 Goroutines 的生命周期,特别是在处理超时和取消信号时非常有用。我们可以使用 select 语句监听 ctx.Done() 来优雅地退出 Goroutines。

总结

通过本文的示例,我们深入了解了如何使用 Go 语言的 Goroutines 和 Channels 构建强大的并发数据处理管道。

  • Channels 是 Goroutines 之间通信的桥梁,它们安全地传递数据,避免了竞争条件。
  • WaitGroup 管理 Goroutines 的生命周期,确保所有 Goroutines 完成后主程序才能退出。
  • Context 用于处理超时和取消信号,确保程序的健壮性。

通过这种方式,我们可以构建出高效、可靠的并发数据处理流程。现在,请你思考以下问题:

  1. 如何增加数据处理的阶段?比如在 Calculator 后增加一个数据清洗的环节
  2. 如何处理 Calculator 中出现的错误?
  3. 如果数据源非常庞大,如何设计更高效的数据读取方式?

希望你能通过这篇文章,掌握在 Go 中进行并发编程的精髓。

相关推荐

go语言也可以做gui,go-fltk让你做出c++级别的桌面应用

大家都知道go语言生态并没有什么好的gui开发框架,“能用”的一个手就能数的清,好用的就更是少之又少。今天为大家推荐一个go的gui库go-fltk。它是通过cgo调用了c++的fltk库,性能非常高...

旧电脑的首选系统:TinyCore!体积小+精简+速度极快,你敢安装吗

这几天老毛桃整理了几个微型Linux发行版,准备分享给大家。要知道可供我们日常使用的Linux发行版有很多,但其中的一些发行版经常会被大家忽视。其实这些微型Linux发行版是一种非常强大的创新:在一台...

codeblocks和VS2019下的fltk使用中文

在fltk中用中文有点问题。英文是这样。中文就成这个样子了。我查了查资料,说用UTF-8编码就行了。edit->Fileencoding->UTF-8然后保存文件。看下下边的编码指示确...

FLTK(Fast Light Toolkit)一个轻量级的跨平台Python GUI库

FLTK(FastLightToolkit)是一个轻量级的跨平台GUI库,特别适用于开发需要快速、高效且简单界面的应用程序。本文将介绍Python中的FLTK库,包括其特性、应用场景以及如何通过代...

中科院开源 RISC-V 处理器“香山”流片,已成功运行 Linux

IT之家1月29日消息,去年6月份,中科院大学教授、中科院计算所研究员包云岗,发布了开源高性能RISC-V处理器核心——香山。近日,包云岗在社交平台晒出图片,香山芯片已流片,回片后...

Linux 5.13内核有望合并对苹果M1处理器支持的初步代码

预计Linux5.13将初步支持苹果SiliconM1处理器,不过完整的支持工作可能还需要几年时间才能完全完成。虽然Linux已经可以在苹果SiliconM1上运行,但这需要通过一系列的补丁才能...

Ubuntu系统下COM口测试教程(ubuntu port)

1、在待测试的板上下载minicom,下载minicom有两种方法:方法一:在Ubuntu软件中心里面搜索下载方法二:按“Ctrl+Alt+T”打开终端,打开终端后输入“sudosu”回车;在下...

湖北嵌入式软件工程师培训怎么选,让自己脱颖而出

很多年轻人毕业即失业、面试总是不如意、薪酬不满意、在家躺平。“就业难”该如何应对,参加培训是否能改变自己的职业走向,在湖北,有哪些嵌入式软件工程师培训怎么选值得推荐?粤嵌科技在嵌入式培训领域有十几年经...

新阁上位机开发---10年工程师的Modbus总结

前言我算了一下,今年是我跟Modbus相识的第10年,从最开始的简单应用到协议了解,从协议开发到协议讲解,这个陪伴了10年的协议,它一直没变,变的只是我对它的理解和认识。我一直认为Modbus协议的存...

创建你的第一个可运行的嵌入式Linux系统-5

@ZHangZMo在MicrochipBuildroot中配置QT5选择Graphic配置文件增加QT5的配置修改根文件系统支持QT5修改output/target/etc/profile配置文件...

如何在Linux下给zigbee CC2530实现上位机

0、前言网友提问如下:粉丝提问项目框架汇总下这个网友的问题,其实就是实现一个网关程序,内容分为几块:下位机,通过串口与上位机相连;下位机要能够接收上位机下发的命令,并解析这些命令;下位机能够根据这些命...

Python实现串口助手 - 03串口功能实现

 串口调试助手是最核心的当然是串口数据收发与显示的功能,pzh-py-com借助的是pySerial库实现串口收发功能,今天痞子衡为大家介绍pySerial是如何在pzh-py-com发挥功能的。一、...

为什么选择UART(串口)作为调试接口,而不是I2C、SPI等其他接口

UART(通用异步收发传输器)通常被选作调试接口有以下几个原因:简单性:协议简单:UART的协议非常简单,只需设置波特率、数据位、停止位和校验位就可以进行通信。相比之下,I2C和SPI需要处理更多的通...

同一个类,不同代码,Qt 串口类QSerialPort 与各种外设通讯处理

串口通讯在各种外设通讯中是常见接口,因为各种嵌入式CPU中串口标配,工业控制中如果不够还通过各种串口芯片进行扩展。比如spi接口的W25Q128FV.对于软件而言,因为驱动接口固定,软件也相对好写,因...

嵌入式linux为什么可以通过PC上的串口去执行命令?

1、uboot(负责初始化基本硬bai件,如串口,网卡,usb口等,然du后引导系统zhi运行)2、linux系统(真正的操作系统)3、你的应用程序(基于操作系统的软件应用)当你开发板上电时,u...

取消回复欢迎 发表评论: