Flink异步io关联Hbase

主程序

    public static void main(String[] args) throws Exception {//1.获取流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置动态参数ParameterTool propertiesargs = ParameterTool.fromArgs(args);String fileName = propertiesargs.get("CephConfPath");//从hdfs获取动态参数配置文件org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();FileSystem fs = FileSystem.get(URI.create(fileName), conf);fs.open(new org.apache.hadoop.fs.Path(fileName));ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(fs.open(new org.apache.hadoop.fs.Path(fileName)).getWrappedStream());// 注册给环境变量(HBASE使用)env.getConfig().setGlobalJobParameters(propertiesFile);new CephConfig(propertiesFile);//2.设置CK&状态后端env.setStateBackend(new FsStateBackend(FSSTATEBACKEND));env.enableCheckpointing(10000);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(100000);// Checkpoint 必须在** ms内完成,否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s//3.从kafka中读取日志信息,将将每行数据转换为JavaBean对象 主流DataStreamSource<String> dataStream = env.addSource(KafkaUtils.getKafkaSource(KAFKA_SOURCE_TOPIC, KAFKA_SOURCE_GROUP));…………//8.读取HBase中user表,进行维度关联SingleOutputStreamOperator<CephAccessRecord> record = AsyncDataStream.unorderedWait(validDS,new DimAsyncFunction<CephAccessRecord>() {@Overridepublic String getKey(CephAccessRecord record) {return record.access_key;}},60, TimeUnit.SECONDS);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(HDFS_FILE_PATH),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.DAYS.toMillis(1))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.DAYS.toMillis(1 ))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();// 将record-->过滤上传数据-->转换成jsonstring-->写入到hdfs
//        allDataDS.filter(log->log.event_type.equals("upload")).map(line->JSON.toJSONString(line)).addSink(fileSink);dataStream.map(line->JSON.toJSONString(line)).addSink(fileSink);//10.流环境执行env.execute();

异步关联程序

package com.data.ceph.function;import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;import java.util.Collections;
import java.util.Map;public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimAsyncJoinFunction<T> {private org.apache.hadoop.hbase.client.Connection connection = null;private ResultScanner rs = null;private Table table = null;@Overridepublic void open(Configuration parameters) throws Exception {//不启用安全认证System.setProperty("zookeeper.sasl.client", "false");Map<String, String> stringStringMap = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();String hbase = stringStringMap.get("hbase_zookeeper_quorum");org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();hconf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.23.37,172.16.23.38,172.16.23.39");
//        hconf.set(HConstants.ZOOKEEPER_QUORUM, hbase);hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");//指定用户名为hbase的用户去访问hbase服务UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hive");connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));table = connection.getTable(TableName.valueOf("cloud:user_info"));}@Overridepublic void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {Get get = new Get(Bytes.toBytes(getKey(input)));Result rs = table.get(get);for (Cell cell : rs.rawCells()) {String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());BeanUtils.setProperty(input, column, value);}resultFuture.complete(Collections.singletonList(input));}@Overridepublic void close() throws Exception {if (rs != null) rs.close();if (table != null) table.close();if (connection != null) connection.close();}@Overridepublic void timeout(T input, ResultFuture<T> resultFuture) throws Exception {System.out.println("TimeOut:" + input);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/746091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机视觉——目标检测(R-CNN、Fast R-CNN、Faster R-CNN )

前言、相关知识 1.闭集和开集 开集&#xff1a;识别训练集不存在的样本类别。闭集&#xff1a;识别训练集已知的样本类别。 2.多模态信息融合 文本和图像&#xff0c;文本的语义信息映射成词向量&#xff0c;形成词典&#xff0c;嵌入到n维空间。 图片内容信息提取特征&…

【北京大学】徐高《金融经济学二十五讲》

一、经济的任务 经济的任务之一是确保有效地分配稀缺资源&#xff0c;这是经济学中的一个核心问题。资源是有限的&#xff0c;而需求是无限的&#xff0c;因此经济系统需要通过合理的机制来分配资源以满足社会的需求。以下是关于经济分配资源的几个方面&#xff1a; 1. 资源配…

CentOS7.9 Nginx + EMQX集群组建 MQTTS平台

前面我们有介绍过单机版EMQX的安装 CentOS7 安装 EMQX&#xff08;MQTT&#xff09;&#xff0c;今天我们来讲一下实际项目里用的到MQTTS平台。 一、EMQX单机配置 简单部署两个节点&#xff0c;修改对应配置文件 (/usr/local/emqx/etc/emqx.conf) 中的node内容&#xff1a; nam…

HTML 学习笔记(十)块和内联

每个HTML元素都有一个默认的显示值&#xff0c;显示值又可以再分为block(块)和inline(内联) 一、块元素 通过F12进入浏览器开发者模式查看该元素会发现其所占宽度为整个网页的宽度 1.div标签 通过div标签将一些元素装进"盒子"&#xff0c;从而对盒子中的全部元素…

HDFS的架构优势与基本操作

目录 写在前面一、 HDFS概述1.1 HDFS简介1.2 HDFS优缺点1.2.1 优点1.2.2 缺点 1.3 HDFS组成架构1.4 HDFS文件块大小 二、HDFS的Shell操作&#xff08;开发重点&#xff09;2.1 基本语法2.2 命令大全2.3 常用命令实操2.3.1 上传2.3.2 下载2.3.3 HDFS直接操作 三、HDFS的API操作3…

提前十分钟!有方法论的人和没有方法论的人,谁更从容?弱者不应被错误引导——早读(逆天打工人爬取热门微信文章解读)

熬夜不熬夜&#xff0c;取决于你的生活态度 引言Python 代码第一篇 人民日报 提前十分钟&#xff0c;人生大不同第二篇 人民日报 来啦 新闻早班车要闻社会政策 结尾 君子如潜龙&#xff0c;藏器待时发 紧握时间的脉搏&#xff0c;提前规划十分钟 既显对他人的敬意&#xff0c;亦…

C++ 4种类型转换运算符

C语言中的强制转换在C中对基础数据类型也是适用的&#xff0c;但是对于类对象就不够严格。于是C中增加了4种类型转换运算符&#xff0c;使得转换过程更规范&#xff1a; dynamic_cast; const_cast; static_cast; reinterpret_cast; 1、dynamic_cast动态类型转换     dynam…

【SysBench】Linux 安装 sysbench-1.20

安装目的是为了对 MySQL 8.0.x 、PostgreSQL 进行基准测试。 0、sysbench 简介 sysbench 是一个可编写脚本的多线程基准测试工具&#xff0c;基于 LuaJIT 。 它最常用于数据库基准测试&#xff0c;但也可以 用于创建任意不涉及数据库服务器的复杂工作负载。 sysbench 附带以…

【诚信3·15】广州流辰信息|诚信至上,始终如一!

每一个承诺&#xff0c;广州流辰信息皆倾心对待&#xff1b;每一份期待&#xff0c;广州流辰信息亦用心守护。近十年用专业缔造好品质&#xff0c;用服务追求好口碑。在为客户服务的路上&#xff0c;流辰信息始终无惧考验&#xff0c;保持初心。在3.15国际消费者权益日&#xf…

SpringBoot3学习记录(有ssm基础)

目录 一、SpringBoot3 介绍 SpringBoot3 简介 SpringBoot3 快速入门 入门总结 1.为什么依赖不需要写版本 2.Startrer&#xff08;启动器&#xff09;是什么 3.SpringBootApplication 二、SpringBoot3 配置文件 统一配置管理 使用yaml配置文件&#xff08;推荐&#x…

Seatunnel系列之:部署Seatunnel

Seatunnel系列之&#xff1a;部署Seatunnel 一、步骤一&#xff1a;准备环境二、步骤二&#xff1a;下载SeaTunnel三、步骤三&#xff1a;安装连接器插件四、quick-start-seatunnel-engine1.添加作业配置文件来定义作业2.运行 SeaTunnel 应用程序 五、quick-start-flink1.部署和…

Gateway网关在url参数带有特殊字符的情况下转发失败(响应400)

本文主要分享了&#xff0c;SpringCloud Gateway网关在url参数带有空格或者特殊字符的情况下&#xff0c;转发失败导致响应错误码400的解决方案。 响应400错误码的2种场景&#xff1a; 1.参数带空格&#xff0c;Gateway会误认为该空格是切割符&#xff0c;如?phone 135****6…

QT使用dumpcpp为COM生成h及cpp的方式,COM是C#的dll注册的

目录 1.C#的dll注册为COM&#xff0c;采用bat的方式 2.通过qt的dumpcpp来生成h及cpp文件 3.h文件和cpp文件处理。 台达数控系统的C#的dll dumpcpp用的tlb文件 dumpcpp生成的原生h文件 dumpcpp生成的原生cpp dump生成后的的原生cpp文件修改后的cpp文资源 dump生成后的的…

AI短视频矩阵系统介绍|罐头鱼AI视频批量生成

智能化管理&#xff0c;轻松批量剪辑短视频&#xff01; 近年来&#xff0c;随着短视频营销行业的发展&#xff0c;我们推出了一款AI短视频矩阵系统&#xff0c;旨在帮助用户管理、剪辑和发布短视频内容&#xff0c;从而提升品牌影响力。让我们来看看这款系统都提供了哪些功能&…

C:数据结构王道

初始化顺序表&#xff08;顺序表中元素为整型&#xff09;&#xff0c;里边的元素是1,2,3&#xff0c;然后通过scanf读取一个元素&#xff08;假如插入的是6&#xff09;&#xff0c;插入到第2个位置&#xff0c;打印输出顺序表&#xff0c;每个元素占3个空格&#xff0c;格式为…

在哪些领域中最需要使用 OCR 识别技术?

光学字符识别&#xff08;OCR&#xff09;技术能够将不同格式的印刷或手写文本转换为机器编码的文本&#xff0c;这一技术在许多领域都发挥着重要作用&#xff0c;尤其是在需要大量处理文档和图像数据的场景中。以下是一些最需要使用OCR识别技术的领域&#xff1a; 1. 办公自动…

Windows Server 各版本搭建终端服务器实现远程访问(03~19)

一、Windows Server 2003 左下角开始➡管理工具➡管理您的服务器&#xff0c;点击添加或删除角色 点击下一步 勾选自定义&#xff0c;点击下一步 点击终端服务器&#xff0c;点击下一步 点击确定 重新登录后点击确定 点击开始➡管理工具➡计算机管理&#xff0c;展开本地用户…

openssl3.2 - 官方demo学习 - encode - ec_encode.c

文章目录 openssl3.2 - 官方demo学习 - encode - ec_encode.c概述笔记产生ecc私钥产生ecc公钥测试工程备注备注END openssl3.2 - 官方demo学习 - encode - ec_encode.c 概述 官方demos/encode 目录中给了2个例子工程 功能是载入(RSA/ECC)公钥, 然后自己就可以拿内存中的公钥对…

mupdf渲染过程(一):颜色

mupdf除了解析PDF功能之外&#xff0c;还有一个强大的功能就是渲染文字和图像&#xff0c;本文介绍mupdf渲染过程中涉及到的颜色问题&#xff1a;包括颜色空间&#xff0c;颜色转换&#xff0c;lcms的使用。 1.初始化 mupdf初始化第一步是实例化fz_context *ctx&#xff0c;fz…

利用适配器模式使用第三方库

文章目录 一、为什么要使用适配器模式二、适配器模式使用第三方库示例 一、为什么要使用适配器模式 适配器模式是一种设计模式&#xff0c;用于将一个类的接口转换成客户端所期望的另一个接口。适配器模式的主要目的是使不兼容的接口能够一起工作。以下是一些使用适配器的原因&…