Kafka REST Proxy for MapR Streams入门

介绍

MapR生态系统软件包2.0(MEP)随附了一些与MapR流有关的新功能:

  • MapR Streams的Kafka REST代理为MapR Streams和Kafka集群提供RESTful接口,以使用和生成消息并执行管理操作。
  • Kafka Connect for MapR Streams是一个实用程序,用于在MapR Streams与Apache Kafka和其他存储系统之间流式传输数据。

MapR生态系统软件包(MEP)是一种提供与核心升级脱钩的生态系统升级的方法-允许您独立于聚合数据平台升级工具。 您可以在本文中进一步了解MEP 2.0。

在此博客中,我们描述了如何使用REST代理向MapR Streams发布消息和从MapR Streams使用消息。 REST代理是对MapR融合数据平台的重要补充,允许任何编程语言使用MapR流。

MapR Streams工具随附的Kafka REST Proxy可以与MapR Streams一起使用(默认),也可以与Apache Kafka混合使用。 在本文中,我们将重点介绍MapR流。 <!–更多–>

先决条件

  • 具有MEP 2.0的MapR融合数据平台5.2
    • 使用MapR Streams工具
  • curl,wget或任何HTTP / REST客户端工具

创建MapR流和主题

流是主题的集合,您可以通过以下方式将其作为一个组进行管理:

  1. 设置适用于该流中所有主题的安全策略
  2. 为流中创建的每个新主题设置默认的分区数
  3. 为流中每个主题中的消息设置生存时间

您可以在文档中找到有关MapR Streams概念的更多信息。

在您的Mapr群集或沙盒上,运行以下命令:

$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3

启动Kafka控制台的生产者和消费者

打开两个终端窗口,并使用以下命令运行使用者的Kafka实用程序:

消费者

  • 主题传感器-json
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
  • 主题传感器二进制
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary

这两个终端窗口可让您查看有关不同主题的消息

使用Kafka REST代理

检查主题元数据

端点/topics/[topic_name]允许您获取有关该主题的一些信息。 在MapR Streams中,主题是路径标识的流的一部分; 要使用REST API使用主题,您必须使用完整路径,并将其编码在URL中; 例如:

  • /apps/iot-stream:sensor-json将使用%2Fapps%2Fiot-stream%3Asensor-json进行编码

运行以下命令,以获取有关sensor-json主题的信息

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

注意:为简单起见,我从运行Kafka REST代理的节点上运行命令,因此可以使用localhost

您可以通过添加以下Python命令,以一种漂亮的方式打印JSON:

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool

默认流

如上所述,流路径是您必须在命令中使用的主题名称的一部分。 但是可以将MapR Kafka REST代理配置为使用默认流。 为此,您应该在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下属性:

  • streams.default.stream=/apps/iot-stream更改Kafka REST代理配置时,必须使用maprcli或MCS重新启动服务。使用streams.default.stream属性的streams.default.stream是简化URL使用的URL。以应用为例
    • 通过streams.default.stream ,可以使用curl -X GET http://localhost:8082/topics/

    在本文中,所有URL都包含编码的流名称,就像您可以开始使用Kafka REST代理而无需更改配置,也可以将其用于其他流。

发布消息

用于MapR流的Kafka REST代理允许应用程序将消息发布到MapR流。 消息可以作为JSON或二进制内容(base64编码)发送。

要发送JSON消息:

  • 查询应该是HTTP POST
  • 内容类型应为: application/vnd.kafka.json.v1+json
  • 身体:
{"records":[{"value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"}  }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

您应该在运行/apps/iot-stream:sensor-json使用者的终端窗口中看到打印的消息。

要发送二进制消息:

  • 查询应该是HTTP POST
  • 内容类型应为: application/vnd.kafka.binary.v1+json
  • 身体:
{"records":[{"value":"SGVsbG8gV29ybGQ="}]
}

请注意, SGVsbG8gV29ybGQ=是在Base64中编码的字符串“ Hello World”。

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您应该在运行/apps/iot-stream:sensor-binary使用者的终端窗口中看到打印的消息。

发送多条消息

HTTP正文的records字段允许您发送多个消息,例如,您可以发送:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"}  }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"}  } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

该命令将发送2条消息,并将偏移量增加2。您可以对二进制内容执行相同的操作,只需在JSON数组中添加新元素即可; 例如:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您可能知道,可以为消息设置密钥,以确保所有具有相同密钥的消息都将到达同一分区。 为此,将key属性添加到消息中,如下所示:

{"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

既然您知道如何使用REST代理将消息发布到MapR Stream主题,那么让我们看看如何使用消息。

消费信息

REST代理还可以用于消费主题消息。 为此,您需要:

  1. 创建使用者实例。
  2. 使用第一次调用返回的URL来阅读消息。
  3. 如果需要,请删除所引用的使用者。

创建使用者实例

以下请求创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json

服务器的响应如下所示:

{"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer"
}

请注意,我们已使用/consumers/[topic_name]创建使用者。

后续请求将使用base_uri从主题获取消息。 与任何MapR Streams / Kafka使用者一样, auto.offset.reset定义其行为。 在此示例中,该值设置为earliest ,这意味着使用者将从头开始阅读消息。 您可以在MapR Streams文档中找到有关使用者配置的更多信息。

消费信息

要使用这些消息,只需将Mapr Streams主题添加到使用者实体的URL。

以下请求使用了该主题的消息:

curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json

此调用返回JSON文档中的消息:

[{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3}
]

每次对API的调用都会根据上一次调用的偏移量返回发布的新消息。

请注意,消费者将被销毁:

  • consumer.instance.timeout.ms实例。超时。毫秒设置的空闲时间后(默认值设置为300000毫秒/ 5分钟)
  • 使用REST API调用销毁它(见下文)。

消费二进制格式的消息

如果需要使用二进制消息,则需要更改格式并接受标头,该方法是相同的。

调用此URL为二进制主题创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary

然后使用消息,accept标头设置为application/vnd.kafka.binary.v1+json

curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary

该调用返回JSON文档中的消息,并且该值在Base64中编码

[{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2}
]

删除使用者实例

如前所述,使用者将根据REST Proxy的consumer.instance.timeout.ms配置自动销毁。 也可以使用使用者实例URI和HTTP DELETE调用销毁实例,如下所示:

curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer

结论

在本文中,您学习了如何对MapR流使用Kafka REST代理,该代理允许任何应用程序使用在MapR融合数据平台中发布的消息。

您可以在MapR文档和以下资源中找到有关Kafka REST代理的更多信息:

  • MapR Streams入门
  • Ted Dunning和Ellen Friedman撰写的“流式传输体系结构:使用Apache Kafka和MapR流的新设计”电子书

翻译自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/350047.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开张大吉

在Windows的天地中做了五年开发之后&#xff0c;又跑到了开源的Linux/Java世界遨游了五年&#xff0c;于最近半年&#xff0c;又重新回到M$阵营。这半年来&#xff0c;迫于项目进度的压力&#xff0c;较少与他人交流&#xff0c;项目中虽然采用了VS2003&#xff0c;但是运用的思…

mysql四种输入_mysql四种事务隔离级别

mysql事务并发问题ACID什么的就不啰嗦了。mysql多个事务并发的时候&#xff0c;可能会出现如下问题&#xff1a;1. 更新丢失即两个事务同时更新某一条数据&#xff0c;后执行的更新操作会覆盖先执行的更新操作&#xff0c;导致先执行的更新结果丢失。2. 脏读即一个事务会读到另…

apache hadoop_春天遇见Apache Hadoop

apache hadoopSpringSource 刚刚宣布了适用于Apache Hadoop的Spring的第一个GA版本 。 该项目的目的是简化基于Hadoop的应用程序的开发。 您可以下载该项目在这里 &#xff0c;并检查了Maven的文物在这里 。 Apache Hadoop的Spring诞生是为了解决Hadoop应用程序构建不良的问题…

sinacloud webpy mysql_Mysqldb和webpy的安装

1.首先安装mysqlsudo apt-get install mysql-server2.然后安装libmysqld-dev和libmysqlclient-dev&#xff0c;否则在安装Mysqldb的时候会报找不到mysql_config文件sudo apt-get install libmysqld-devsudo apt-get install libmysqlclient-dev修改site.cfg中的mysql_config的配…

消费者驱动的Pact和Spring Boot测试

最近&#xff0c;我的一位同事偶然发现了Pact.io &#xff0c;我们的当前应用程序已发展到50多种服务&#xff0c;并且我们开始出现一些集成测试失败和脆弱的开发/验收测试环境。 因此&#xff0c;我们决定研究尝试与此相关的方法。 我从阅读开始&#xff1a; https : //docs.…

python空格怎么加密_使用Python的RSA加密

如果您想使用python高效地编码RSA加密&#xff0c;我的github存储库肯定会理解和解释python中RSA的数学定义RSA密钥生成def keyGen(): Generate Keypair i_prandint(0,20)i_qrandint(0,20)# Instead of Asking the user for the prime Number which in case is not feasible,# …

MySQL中字符串函数详细介绍

MySQL字符串函数对于针对字符串位置的操作&#xff0c;第一个位置被标记为1。 ASCII(str)返回字符串str的 最左面字符的ASCII代码值。如果str是空字符串&#xff0c; 返回0。如果str是NULL&#xff0c;返回NULL。 mysql> select ASCII(2);-> 50mysql> select ASCII(2)…

java 转储快照分析_分析Java核心转储

java 转储快照分析在本文中&#xff0c;我将向您展示如何调试Java核心文件&#xff0c;以查看导致JVM崩溃的原因。 我将使用在上一篇文章&#xff1a; 生成Java Core Dump中生成的核心文件。 您可以通过以下几种方法来诊断JVM崩溃&#xff1a; hs_err_pid日志文件 当JVM中发生…

zbox mysql_20190213云服务器部署禅道

1.系统环境&#xff1a;腾讯云服务器&#xff1b;Centos 7.02.工具&#xff1a;禅道的压缩包(需要是tar.gz文件名的)、Xshell、Xftp&#xff1b;3.安全组规则的设置&#xff1b;4.端口号的设置以下为详细步骤&#xff1a;需要在空白的服务器上去进行操作。还需要再琢磨的。1、x…

Java编程语言的历史和未来

通过AppDynamics解决应用程序问题的速度提高了10倍–以最小的开销在代码级深度监视生产应用程序。 开始免费试用&#xff01; 作为Internet上著名的编程语言 &#xff0c;Java对人们如何浏览数字世界产生了深远的影响。 Java功能设置了用户对他们访问互联网的设备的性能期望的…

fmdb和mysql的区别_FMDB

什么是数据库数据库(Database)是按照数据结构来组织、存储和管理数据的仓库,我们可以很方便的对数据库中的数据进行增、删、改、查操作数据库类型数据库可以分为2大种类关系型数据库(主流)关系型数据库(主流)对象型数据库常用关系型数据库PC端&#xff1a;Oracle、MySQL、SQL S…

RabbiqMQ快速入门

RabbitMQ 官网地址: https://www.rabbitmq.com/ 一个遵循AMQP协议&#xff0c;开源面向消息的中间件,支持多种编程语言。 Rabbitmq 能做什么? 逻辑解耦&#xff0c;异步的消息任务消息持久化&#xff0c;重启不影响削峰&#xff0c;大规模的消息处理主要的特点 可靠性&#xf…

Java命令行界面(第13部分):JArgs

JArgs 1.0的区别在于&#xff0c;这是我的第13篇文章的主题&#xff0c;该文章是关于Java命令行参数解析的。 JArgs是一个开放源代码&#xff08; BSD许可证 &#xff09;库&#xff0c;主要由Steve Purcell和Ewan Mellor等 不同的贡献者支持。 事实证明&#xff0c;这在第一次…

pthread vs openMP之我见

前两天看了些并行计算的文章&#xff0c;了解了一些并行计算的方法和原理。然后发现多线程实现里面还有个openMP&#xff0c;这个以前从来没见过&#xff08;火星了&#xff09;&#xff0c;之前只是知道pthread线程库和微软也实现了一套线程。又看了看openMP的一些教程才知道它…

线程池默认多少个线程_我需要多少个线程?

线程池默认多少个线程这取决于您的应用程序。 但是&#xff0c;对于那些希望对如何从生产站点购买的所有昂贵内核中挤出大量资金的人&#xff0c;请多多包涵&#xff0c;我将阐明围绕多线程 Java应用程序的奥秘。 内容针对最典型的Java EE应用程序进行了“优化”&#xff0c;该…

mysql error writing_MySQL:Error writing file (Errcode: 28)解决方法

问题描述&#xff1a;在执行创建表语句时提示&#xff1a;mysql> CREATE TABLE cash_request (id int(11) NOT NULL auto_increment,dev_id int(11) NOT NULL,bank_account_info varchar(255) NOT NULL,money int(11) NOT NULL,status tinyint(1) NOT NULL default 1,is_fan…

[暑假集训Day4T3]曲线

三分模板。 三分法求单峰函数最优值,之后每次取所有二次函数最优值即可 #pragma GCC optimize(3,"Ofast","inline") #include<iostream> #include<cstdio> #define N 100005 #define eps 1e-9 using namespace std; int read() {int x0,f1;cha…

模拟Spring Security上下文进行单元测试

今天&#xff0c;在为一种Java方法编写单元测试用例时&#xff0c;如下所示&#xff1a; public ApplicationUser getApplicationUser() {ApplicationUser applicationUser (ApplicationUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal();return…

mysql semi-synchronous_MySQL Semisynchronous Replication介绍

前言MySQL 5.5版本之前默认的复制是异步(Asynchronous )模式的, MySQL 5.5 以plugins的方式提供了Semisynchronous Replication 模式。在介绍 semi sync 之前,我们先了解&#xff1a;半同步 Asynchronous 和 同步 Synchronous 。异步复制模式主库将已经提交的事务event 写入bin…

Jquery屏蔽回车键

1 $(function(){2 3 $(“#tagForm input”).keypress(4 5 function(event){6 7 if(event.keyCode 13){8 9 returnfalse;10 11 }12 13 });14 15 })转载于:https://www.cnblogs.com/pfs1314/archive/2011/04/19/2020706.html