[每周一更]-(第133期):Go中MapReduce架构思想的使用场景

在这里插入图片描述

文章目录

      • **MapReduce 工作流程**
      • Go 中使用 MapReduce 的实现方式:
      • **Go MapReduce 的特点**
      • **哪些场景适合使用 MapReduce?**
      • 使用场景
        • 1. 数据聚合
        • 2. 数据过滤
        • 3. 数据排序
        • 4. 数据转换
        • 5. 数据去重
        • 6. 数据分组
        • 7. 数据统计
        • 8.**统计文本中单词出现次数**
          • **代码实现**
      • MapReduce vs. 扇入/扇出
        • 示例1:爬取多个网页
        • 示例2:多个 goroutine 计算结果,并聚合
      • 参考
      • 注意事项

新年开工,2025重新出发

为什么需要 MapReduce

在 Go 中,虽然没有内置的 MapReduce 框架,但我们可以利用 Go 的并发特性(如 goroutines 和 channels)来实现 MapReduce。

在 Go 语言中,MapReduce 是一种编程模型,用于处理和生成大规模数据集。它将任务分解为两个主要阶段:Map(映射)和 Reduce(归约),并通过并行处理提高效率。MapReduce 模型最初由 Google 提出,广泛应用于大数据处理、分布式计算等领域。

它的核心思想是将问题分解成多个较小的子问题并行处理,然后将结果合并。MapReduce 分为两个主要步骤:

  1. Map 阶段:将输入数据映射到中间结果。这个阶段将输入数据拆分成小块,分配给不同的处理单元,并对每个数据项应用一个映射函数。
  2. Reduce 阶段:将 Map 阶段的中间结果进行合并。通常是通过聚合或汇总中间结果,生成最终输出。

MapReduce 工作流程

  1. 输入数据:将大规模数据分成多个小块。
  2. Map(映射):对数据进行并行处理,并生成中间结果。
  3. Shuffle(洗牌,可选):对中间结果进行归类,按 key 组织数据。
  4. Reduce(归约):合并和处理 Map 阶段的中间结果,得出最终结果。

Go 中使用 MapReduce 的实现方式:

Go 提供了 goroutine 和 channel,这使得它非常适合实现并行计算的场景。一个简单的 Go 实现通常会使用以下步骤:

  1. Map:通过 goroutine 处理每个数据块。
  2. Shuffle(可选):将中间结果通过 channel 或其他方式传递到 Reduce 阶段。
  3. Reduce:聚合结果,得到最终输出。

通过 Go 的并发模型,可以利用多个 CPU 核心实现 MapReduce 的并行计算。

Go MapReduce 的特点

  1. 高并发
    • 通过 goroutine 并行执行 Map 和 Reduce 操作,提升计算效率。
    • Go 的 goroutine 轻量级,支持大规模并发执行 Map 任务,不会像 Java 线程那样占用大量内存。
  2. 无锁数据传输
    • channel 作为数据流通管道,避免手动加锁,提高代码可读性和安全性。
    • Go 提供了 sync.WaitGroupsync.Map 等并发工具,可以更简单地管理 MapReduce 任务。
  3. 适用于大规模数据处理
    • 适合处理日志分析、数据聚合、分布式计算等任务。

哪些场景适合使用 MapReduce?

场景Map 阶段Reduce 阶段
日志分析读取大量日志,提取关键字段统计访问次数、错误率等
搜索引擎索引解析网页,提取关键词统计关键词出现次数
基因数据分析解析 DNA 序列,计算某个基因的出现频率归并统计结果,得出全局基因分布
机器学习计算训练数据的特征训练模型,计算最终的回归参数
推荐系统计算用户的浏览、点击数据归并计算得到推荐结果
并行图像处理处理图像的每个区域合并所有区域结果,生成完整图像

常见使用场景:

  • 大规模数据处理: MapReduce 适用于批量处理大量数据,例如日志分析。

  • 并发数据处理: 在需要并发处理的场景中,例如查询数据库,MapReduce 可以将任务拆分成并发请求,从而减少处理时间并提高性能。处理结果可以被聚合起来。

  • 分布式数据处理和合并: MapReduce 用于以分布式方式处理和合并数据。大型数据集被分成较小的部分,由不同的机器或线程处理,然后合并。

使用场景

1. 数据聚合

场景:统计日志文件中不同状态码的出现次数。

拆解

  • Map阶段:读取日志文件,提取状态码,生成键值对(状态码, 1)。
  • Reduce阶段:汇总相同状态码的计数,生成最终结果(状态码, 总次数)。
func mapFunc(line string) map[string]int {parts := strings.Split(line, " ")statusCode := parts[8] // 假设状态码在第9个字段return map[string]int{statusCode: 1}
}func reduceFunc(statusCode string, counts []int) int {return sum(counts)
}
2. 数据过滤

场景:从大量数据中筛选出符合特定条件的记录。

拆解

  • Map阶段:检查每条记录是否满足条件,满足则输出(记录, 1)。
  • Reduce阶段:汇总符合条件的记录。
func mapFunc(record Record) map[Record]int {if record.Age > 30 {return map[Record]int{record: 1}}return nil
}func reduceFunc(record Record, counts []int) Record {return record
}
3. 数据排序

场景:对大规模数据集进行排序。

拆解

  • Map阶段:将数据分片并局部排序。
  • Reduce阶段:合并各分片的排序结果。
func mapFunc(data []int) []int {sort.Ints(data)return data
}func reduceFunc(sortedSlices [][]int) []int {return mergeSortedSlices(sortedSlices)
}
4. 数据转换

场景:将数据从一种格式转换为另一种格式。

拆解

  • Map阶段:将原始数据转换为目标格式。
  • Reduce阶段:合并转换后的数据。
func mapFunc(input InputType) OutputType {return transform(input)
}func reduceFunc(outputs []OutputType) OutputType {return combine(outputs)
}
5. 数据去重

场景:去除数据集中的重复记录。

拆解

  • Map阶段:将每条记录作为键输出(记录, 1)。
  • Reduce阶段:合并相同记录,输出唯一记录。
func mapFunc(record Record) map[Record]int {return map[Record]int{record: 1}
}func reduceFunc(record Record, counts []int) Record {return record
}
6. 数据分组

场景:按某个字段对数据进行分组。

拆解

  • Map阶段:根据分组字段生成键值对(分组字段, 记录)。
  • Reduce阶段:将相同分组字段的记录合并。
func mapFunc(record Record) map[string]Record {return map[string]Record{record.GroupField: record}
}func reduceFunc(groupField string, records []Record) []Record {return records
}
7. 数据统计

场景:计算数据集的平均值、最大值、最小值等统计信息。

拆解

  • Map阶段:计算局部统计信息。
  • Reduce阶段:合并局部统计信息,生成全局统计结果。
func mapFunc(data []int) Stat {return calculateLocalStat(data)
}func reduceFunc(stats []Stat) Stat {return combineStats(stats)
}
8.统计文本中单词出现次数
  • 同步 Map 阶段
    • 通过 sync.WaitGroup 确保所有 mapFunction 任务完成后才关闭 mapChannel,避免 Reduce 过早读取导致数据丢失。
  • 使用 go func() 异步关闭 channel
    • mapWG.Wait() 结束后,关闭 mapChannel,确保 Reduce 读取完整数据。
  • Reduce 处理改进
    • reduceFunction 直接从 channel 读取数据,并合并为最终的 map[string]int 结果。
代码实现
package mainimport ("fmt""strings""sync"
)// Map 阶段:统计部分数据中的单词频率
func mapFunction(text string, out chan<- map[string]int, wg *sync.WaitGroup) {defer wg.Done()wordCount := make(map[string]int)words := strings.Fields(text)for _, word := range words {wordCount[word]++}out <- wordCount
}// Reduce 阶段:合并多个 map 结果
func reduceFunction(in <-chan map[string]int) map[string]int {result := make(map[string]int)for partialMap := range in {for word, count := range partialMap {result[word] += count}}return result
}func main() {// 输入数据texts := []string{"hello world","go is great","hello go","map reduce in go","go go go",}// 创建 channel 传输 map 结果mapChannel := make(chan map[string]int, len(texts))var mapWG sync.WaitGroup// 启动多个 Map 任务for _, text := range texts {mapWG.Add(1)go mapFunction(text, mapChannel, &mapWG)}// 确保所有 map 任务完成后再关闭 channelgo func() {mapWG.Wait()close(mapChannel)}()// Reduce 阶段:合并 map 结果result := reduceFunction(mapChannel)// 输出最终结果fmt.Println("Word Count Result:", result)
}

MapReduce vs. 扇入/扇出

历史文章:[每周一更]-(第24期):Go的并发模型,提到过Go 并发模式:扇入、扇出,这里简单对比一下

MapReduce 和 Go 的 扇入(Fan-in)/扇出(Fan-out) 在并发模型上是类似的,但它们的侧重点和应用场景有所不同

  • 如果只是单机并发任务(如 API 调用、爬虫),用 扇入/扇出

  • 如果要处理大数据(如日志分析、搜索索引),用 MapReduce

特性MapReduce扇入(Fan-in)/扇出(Fan-out)
核心思想拆分任务并行计算,再归并结果并行处理任务,聚合结果到一个 channel
Map 阶段 / 扇出并发执行多个子任务启动多个 goroutine 处理任务
Reduce 阶段 / 扇入归并多个子任务的结果读取多个 goroutine 结果并处理
数据流动方式Map → Reduce多个 goroutine → 单个 channel
适用场景大规模数据计算(如日志分析、搜索引擎索引)并发任务管理(如爬虫、API 并发请求)
是否涉及分布式适用于分布式计算主要用于单机并发任务
示例1:爬取多个网页
package mainimport ("fmt""net/http""sync"
)var urls = []string{"https://golang.org","https://go.dev","https://gophercises.com",
}// 扇出:启动多个 goroutine 并发爬取网页
func fetch(url string, wg *sync.WaitGroup) {defer wg.Done()resp, err := http.Get(url)if err != nil {fmt.Println("Error:", err)return}fmt.Println("Fetched:", url, "Status:", resp.Status)
}func main() {var wg sync.WaitGroupfor _, url := range urls {wg.Add(1)go fetch(url, &wg)}wg.Wait()fmt.Println("All requests finished!")
}
示例2:多个 goroutine 计算结果,并聚合
package mainimport ("fmt""sync"
)func worker(id int, out chan<- int, wg *sync.WaitGroup) {defer wg.Done()out <- id * id // 计算平方并发送
}func main() {out := make(chan int, 5)var wg sync.WaitGroup// 扇出:启动多个 goroutinefor i := 1; i <= 5; i++ {wg.Add(1)go worker(i, out, &wg)}// 等待所有任务完成后关闭 channelgo func() {wg.Wait()close(out)}()// 扇入:聚合所有 goroutine 的结果sum := 0for result := range out {sum += result}fmt.Println("Total Sum:", sum) // 计算最终结果
}

参考

  • go-zero中介绍MapReduce使用场景:
    • 介绍原理:go-zero/core/mr/readme-cn.md at master · zeromicro/go-zero
    • 示例:zero-examples/mapreduce at main · zeromicro/zero-examples

注意事项

  • 数据并行性: MapReduce适合数据并行处理的任务,即任务可以分解为多个独立的子任务。
  • 数据规模: 对于小规模数据,MapReduce可能引入不必要的开销,应根据数据规模选择合适的处理方式。
  • 实时性要求: MapReduce不适合实时处理要求很高的任务,因为它通常用于批处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/69328.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Pandas】pandas Series var

Pandas2.2 Series Computations descriptive stats 方法描述Series.abs()用于计算 Series 中每个元素的绝对值Series.all()用于检查 Series 中的所有元素是否都为 True 或非零值&#xff08;对于数值型数据&#xff09;Series.any()用于检查 Series 中是否至少有一个元素为 T…

Http 的响应码有哪些? 分别代表的是什么?

HTTP 状态码分为多个类别&#xff0c;下面是常见的 HTTP 状态码及其含义&#xff0c;包括 3xx 重定向状态码的详细区别&#xff1a; &#x1f4cc; HTTP 状态码分类 分类状态码范围说明1xx100-199信息性状态码&#xff0c;表示请求已被接收&#xff0c;继续处理2xx200-299成功…

【工具篇】深度剖析 Veo2 工具:解锁 AI 视频创作新境界

在当下这个 AI 技术日新月异的时代,各种 AI 工具如雨后春笋般涌现,让人目不暇接。今天,我就来给大家好好说道说道谷歌旗下的 Veo2,这可是一款在 AI 视频创作领域相当有分量的工具。好多朋友都在问,Veo2 到底厉害在哪?好不好上手?能在哪些地方派上用场?别着急,今天我就…

slam学习笔记8---fastlio2运行效率高缘由

前言&#xff1a;lio里面&#xff0c;fastlio2的精度和速度表现很显眼。有必要总结一下运行效果高的缘由。参考各大家&#xff0c;从个人对fastlio2理解&#xff0c;汇总所得。 Fast-LIO2 运行速度快的主要原因可以归结为以下几个方面&#xff1a; &#x1f539; 1. 采用增量…

【C++高并发服务器WebServer】-13:多线程服务器开发

本文目录 一、多线程服务器开发二、TCP状态转换三、端口复用 一、多线程服务器开发 服务端代码如下。 #include <stdio.h> #include <arpa/inet.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <pthread.h>s…

SpringCloud面试题----Nacos和Eureka的区别

功能特性 服务发现 Nacos&#xff1a;支持基于 DNS 和 RPC 的服务发现&#xff0c;提供了更为灵活的服务发现机制&#xff0c;能满足不同场景下的服务发现需求。Eureka&#xff1a;主要基于 HTTP 的 RESTful 接口进行服务发现&#xff0c;客户端通过向 Eureka Server 发送 HT…

在 Open WebUI+Ollama 上运行 DeepSeek-R1-70B 实现调用

在 Open WebUI Ollama 上运行 DeepSeek-R1-70B 实现调用 您可以使用 Open WebUI 结合 Ollama 来运行 DeepSeek-R1-70B 模型&#xff0c;并通过 Web 界面进行交互。以下是完整的部署步骤。 1. 安装 Ollama Ollama 是一个本地化的大模型管理工具&#xff0c;它可以在本地运行 …

免费地理位置信息查询接口

地理位置信息查询接口V1 1. 接口简介 本接口用于查询指定经纬度的地理位置信息&#xff0c;包括省、市、区、街道等详细信息。 报文编码格式&#xff1a;UTF-8接口分组&#xff1a;交通地理创建者&#xff1a;何生最后编辑人&#xff1a;何生更新时间&#xff1a;2025-01-16…

使用 Axios 进行高效的数据交互

一、前言 1. 项目背景与目标 Axios 的重要性: Axios 是一个基于 Promise 的 HTTP 客户端,用于浏览器和 Node.js,简化了与服务器的通信。Axios 提供了丰富的功能,如拦截器、并发请求管理、取消请求等。2. 环境搭建 开发工具准备: 推荐使用 VSCode 或 WebStorm。安装必要的…

「vue3-element-admin」告别 vite-plugin-svg-icons!用 @unocss/preset-icons 加载本地 SVG 图标

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall ︱vue3-element-admin︱youlai-boot︱vue-uniapp-template &#x1f33a; 仓库主页&#xff1a; GitCode︱ Gitee ︱ Github &#x1f496; 欢迎点赞 &#x1f44d; 收藏 ⭐评论 …

C#中深度解析BinaryFormatter序列化生成的二进制文件

C#中深度解析BinaryFormatter序列化生成的二进制文件 BinaryFormatter序列化时,对象必须有 可序列化特性[Serializable] 一.新建窗体测试程序BinaryDeepAnalysisDemo,将默认的Form1重命名为FormBinaryDeepAnalysis 二.新建测试类Test Test.cs源程序如下: using System; us…

Python进阶-在Ubuntu上部署Flask应用

随着云计算和容器化技术的普及&#xff0c;Linux 服务器已成为部署 Web 应用程序的主流平台之一。Python 作为一种简单易用的编程语言&#xff0c;适用于开发各种应用程序。本文将详细介绍如何在 Ubuntu 服务器上部署 Python 应用&#xff0c;包括环境准备、应用发布、配置反向…

mysql8 用C++源码角度看客户端发起sql网络请求,并处理sql命令

MySQL 8 的 C 源码中&#xff0c;处理网络请求和 SQL 命令的流程涉及多个函数和类。以下是关键的函数和类&#xff0c;以及它们的作用&#xff1a; 1. do_command 函数 do_command 函数是 MySQL 服务器中处理客户端命令的核心函数。它从客户端读取一个命令并执行。这个函数在…

深度学习在医疗影像分析中的应用

引言 随着人工智能技术的快速发展&#xff0c;深度学习在各个领域都展现出了巨大的潜力。特别是在医疗影像分析中&#xff0c;深度学习的应用不仅提高了诊断的准确性&#xff0c;还大大缩短了医生的工作时间&#xff0c;提升了医疗服务的质量。本文将详细介绍深度学习在医疗影像…

计算机领域QPM、TPM分别是什么并发指标,还有其他类似指标吗?

在计算机领域&#xff0c;QPM和TPM是两种不同的并发指标&#xff0c;它们分别用于衡量系统处理请求的能力和吞吐量。 QPM&#xff08;每分钟请求数&#xff09; QPM&#xff08;Query Per Minute&#xff09;表示每分钟系统能够处理的请求数量。它通常用于衡量系统在单位时间…

python基础入门:3.2字典(Dict)与集合(Set)

Python高效数据管理&#xff1a;字典与集合深度剖析 # 快速导航 config {"数据结构": "字典", "特性": ["键值对", "快速查找"]} unique_nums {1, 2, 3, 5, 8} # 集合自动去重一、字典核心操作全解 1. 键值对基础操作 …

celery

&#x1f525; 太棒了&#xff01;兄弟&#xff0c;你的学习欲望真的让我佩服得五体投地&#xff01;&#x1f680; 既然你已经完全掌握 background_tasks 了&#xff0c;那我们就来深入解析 Celery&#xff01;&#x1f331;&#x1f680; 1. Celery 解决了什么问题&#xff…

【安当产品应用案例100集】036-视频监控机房权限管理新突破:安当windows操作系统登录双因素认证解决方案

一、机房管理痛点&#xff1a;权限失控下的数据泄露风险 在智慧城市与数字化转型浪潮下&#xff0c;视频监控系统已成为能源、金融、司法等行业的核心安防设施。然而&#xff0c;传统机房管理模式中&#xff0c;值班人员通过单一密码即可解锁监控画面的操作漏洞&#xff0c;正…

Unity抖音云启动测试:如何用cmd命令行启动exe

相关资料&#xff1a;弹幕云启动&#xff08;原“玩法云启动能力”&#xff09;_直播小玩法_抖音开放平台 1&#xff0c;操作方法 在做云启动的时候&#xff0c;接完发现需要命令行模拟云环境测试启动&#xff0c;所以研究了下。 首先进入cmd命令&#xff0c;CD进入对应包的文件…

< OS 有关 > 利用 google-drive-ocamlfuse 工具,在 Ubuntu 24 系统上 加载 Google DRIVE 网盘

Created by Dave On 8Feb.2025 起因&#xff1a; 想下载 StableDiffusion&#xff0c;清理系统文件时把 i/o 搞到 100%&#xff0c;已经删除到 apt 缓存&#xff0c;还差 89MB&#xff0c;只能另想办法。 在网上找能不能挂在 Google 网盘&#xff0c;百度网盘&#xff0c;或 …