[flink] flink macm1pro 快速使用从零到一

文章目录

  • 快速使用

快速使用

  1. 打开 https://flink.apache.org/downloads/ 下载 flink

因为书籍介绍的是 1.12版本的,为避免不必要的问题,下载相同版本

image.png
image.png

  1. 解压
 tar -xzvf flink-1.11.2-bin-scala_2.11.tgz

image.png

  1. 启动 flink
./bin/start-cluster.sh

image.png

  1. 打开 flink web 页面 localhost:8081

image.png

  1. 编写结合 Kafka 词频统计程序

具体参考 https://weread.qq.com/web/reader/51032ac07236f8e05107816k1f032c402131f0e3dad99f3?

package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;public class WordCountKafkaInStdOut {public static void main(String[] args) throws Exception {// 设置Flink执行环境 StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// Kafka参数 Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");String inputTopic = "Shakespeare";String outputTopic = "WordCount";// Source FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),properties);DataStream<String> stream = env.addSource(consumer);// Transformation // 使用Flink  API对输入流的文本进行操作 // 按空格切词、计数、分区、设置时间窗口、聚合 DataStream<Tuple2<String, Integer>> wordCount = stream.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] tokens = line.split("\\s");// 输出结果  for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<>(token, 1));}}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).timeWindow(Time.seconds(5)).sum(1);// Sink wordCount.print();// execute env.execute("kafka streaming word count");}
} 
  1. 打包应用(当然在这之前需要本地调试一下,至少得运行通吧😄)
  2. 使用Flink提供的命令行工具flink,将打包好的作业提交到集群上。命令行的参数 --class 用来指定哪个主类作为入口。
./bin/flink run --class org.example.WordCountKafkaInStdOut xxtarget/flink_study-1.0-SNAPSHOT.jar

class 建议直接拷贝引用
image.png

  1. web 页面查看作业提交成功

image.png

  1. kafka 生产者随便发点消息

image.png

  1. 查看作业日志,词频统计结果

image.png
image.png

  1. 关闭 flink
./bin/stop-cluster.sh

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/777484.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【每日一题】盛水容器

问题描述 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你不能倾斜容…

产品经理的自我修养

点击下载《产品经理的自我修养》 1. 前言 在产品领域取得成功的关键在于持续的激情。只有保持热情不减,我们才能克服各种困难,打造出卓越的产品。 如果你真心渴望追求产品之路,我强烈建议你立即行动起来,亲自参与实际的产品创作。无论是建立一个网站、创建一个社群,还是…

【前端学习——js篇】4.浅拷贝与深拷贝

具体可见https://github.com/febobo/web-interview 4.浅拷贝与深拷贝 ①栈内存与堆内存 栈内存&#xff08;Stack Memory&#xff09; 栈内存用于存储基本类型的变量和引用类型的变量引用&#xff08;即指向堆内存中实际数据的指针&#xff09;。当一个函数被调用时&#xf…

Mysql的日志管理,备份与回复

目录 一、Mysql日志管理 1、日志的默认位置及配置文件 2、日志分类 2.1错误日志 2.2通用查询日志 2.3二进制日志 2.4慢查询日志 2.5中继日志 3、日志配置 4、日志查询 4.1查询通用日志是否开启 4.2查询二进制日志是否开启 4.3查看慢查询日志是否开启 4.4查询慢查…

Vivado Lab Edition

Vivado Lab Edition 是完整版 Vivado Design Suite 的独立安装版本 &#xff0c; 包含在生成比特流后对赛灵思 FPGA 进行编程和 调试所需的所有功能。通常适用于在如下实验室环境内进行编程和调试&#xff1a; 实验室环境中的机器所含磁盘空间、内存和连 接资源较少。Vivad…

python数据实时传给unity工程并绘制出来

python # 服务器端代码 import socket import random import struct import time# 创建一个服务器Socket server_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 监听的地址和端口 host 127.0.0.1 port 12345# 绑定地址和端口 server_socket.bind((host, port…

纯分享万岳外卖跑腿系统客户端源码uniapp目录结构示意图

系统买的是商业版&#xff0c;使用非常不错有三端uniapp开源代码&#xff0c;自从上次分享uniapp后有些网友让我分享下各个端的uniapp下的各个目录结构说明 我就截图说以下吧&#xff0c;

【Java - 框架 - Lombok】(1) 普通Java项目通过Lombok+Logback完成日志的创建使用 - 快速上手

普通Java项目通过"Lombok""Logback"完成日志的创建使用 - 快速上手&#xff1b; 步骤A 说明 创建"Maven"项目&#xff1b; 图片 步骤B 说明 添加相关依赖项&#xff1b; 图片 代码 <!-- "Lombok"依赖项--> <dependency>&…

c++核心学习--继承2

4.6.7多继承语法 4.6.8菱形继承 利用虚继承解决菱形继承的问题&#xff1a;继承之前加上关键字virtual变为虚继承

经纬恒润RTaW-Pegase:车载网络通信建模与时间特性分析工具

▎RTaW简介 RTaW-Pegase是由法国国家信息与自动化研究所&#xff08;INRIA&#xff09;旗下的RTaW公司开发的产品。它主要用于构建和优化汽车、航空航天以及工业领域的通信网络&#xff0c;包括时间敏感网络&#xff08;TSN&#xff09;、CAN&#xff08;FD&#xff0c;XL&…

react-navigation:

我的仓库地址&#xff1a;https://gitee.com/ruanjianbianjing/bj-hybrid react-navigation&#xff1a; 学习文档&#xff1a;https://reactnavigation.org 安装核心包: npm install react-navigation/native 安装react-navigation/native本身依赖的相关包: react-nativ…

开源 | 电动自行车充换电解决方案,从智能硬件到软件系统,全部自主研发

文章目录 一、产品功能部分截图1.手机端&#xff08;小程序、安卓、ios&#xff09;2.PC端 二、小程序体验账号以及PC后台体验账号1.小程序体验账号2.PC后台体验账号关注公众号获取最新资讯 三、产品简介&#xff1f;1. 充电桩云平台&#xff08;含硬件充电桩&#xff09;&…

Codeforces Round 841 (Div. 2) C. Even Subarrays

题目 思路&#xff1a; #include <bits/stdc.h> using namespace std; #define int long long #define pb push_back #define fi first #define se second #define lson p << 1 #define rson p << 1 | 1 const int maxn 1e6 5, inf 1e9, maxm 4e4 5; co…

Java学习之方法

目录 方法 方法声明格式&#xff1a; 调用方式&#xff1a; 详细说明 示例 --方法的声明及调用 语句块 练习 方法的重载(overload) 构成条件 示例 --方法重载 递归结构 缺陷 方法 方法(method)&#xff1a;一段用于完成特定功能的代码片段&#xff0c;类似于其他语…

opencv 十九 python下实现多线程间rtsp直播流的复用

在多线程拉流的任务场景中&#xff0c;有时需要将一个rtsp拉取多次&#xff0c;每重新打开一次rtsp视频流就要多消耗一次带宽&#xff0c;为此基于类的静态对象实现rtsp视频流的复用。 1、实现代码 import threading import cv2,time #接收摄影机串流影像&#xff0c;采用多线…

【嵌入式机器学习开发实战】(七)—— 政安晨:通过ARM-Linux掌握基本技能【环境准备:树莓派】

ARM-Linux是一种针对ARM架构的操作系统&#xff0c;它的设计目标是在低功耗、低成本的硬件平台上运行。ARM-Linux可以运行在多种ARM处理器上&#xff0c;包括树莓派。 树莓派&#xff08;Raspberry Pi&#xff09;是一款基于ARM架构的单板计算机&#xff0c;由英国的树莓派基金…

【系统架构师】-第12章-信息系统架构

信息系统架构(ISA)是指对某一特定内容里的信息进行统筹、规划、设计、安排等一系列有机处理的活动。 为了更好地理解信息系统架构的定义&#xff0c; 特作如下说明: (1)架构是对系统的抽象&#xff0c;它通过描述元素、元素的外部可见属性及元素之间的关系来反映这种抽象。因此…

open_clip仓库成分与模型文件model.py 介绍

起因&#xff1a; 在DA-CLIP的开源库的DA-CLIP.md中自述该项目基于CLIP 和open_clip&#xff0c;在之前的退化类型检测中 我一度以为仓库只是使用了CLIP 的源码&#xff0c; 然而当发现缺少da-clip的模型名称时&#xff0c;我发现DA-CLIP使用的完全是open_clip的代码版本&#…

16.JRE和JDK

程序员在编写代码的时候其实是需要一些环境&#xff0c;例如我们之前写的HelloWorld。我们需要的东西有JVM、核心类库、开发工具。 1、JVM&#xff08;Java Virtual Machine&#xff09;&#xff1a;Java虚拟机&#xff0c;真正运行Java程序的地方。没有虚拟机&#xff0c;代码…

C语言例4-9:格式字符s的使用例子

代码如下&#xff1a; //格式字符s的使用例子 #include<stdio.h> int main(void) {printf("%s,%5s,%-5s\n","Internet","Internet","Internet");//以三种不同格式&#xff0c;输出字符串printf("%10.5s,%-10.5s,%4.5s\n&q…