1.概述
在NIFI 中,可以 ListenHTTP 组件 启动一个HTTP服务,通过HTTP 服务接收 客户端 发送的信息,后续可以增加处理器,对请求进行处理。
我做了一个示例
- 通过 ListenHTTP 接收信息
- 通过 ExecuteGroovyScript 对数据进行处理
- 通过 PutFile 对数据进行验证,查看数据是否有效
2.配置
2.1 配置ListenHTTP 组件

配置后,我们 就可以通过 将数据 post 到 http://ip:8080/data 上进行数据接收
2.2 ExecuteGroovyScript 处理脚本
import groovy.json.JsonSlurper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.core.JsonGenerator
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsetsdef flowFile = session.get()
if (!flowFile) returntry {def text = ''session.read(flowFile, { inputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)// 用 Groovy 解析 JSON(支持宽松语法)def jsonSlurper = new JsonSlurper()def data = jsonSlurper.parseText(text)data['age'] = 20// 使用 Jackson 序列化,不转义非 ASCII 字符(保留中文)def mapper = new ObjectMapper()mapper.getFactory().setCodec(mapper)// 关键:禁用 Unicode 转义def writer = mapper.writer().without(JsonGenerator.Feature.ESCAPE_NON_ASCII)def updatedJson = writer.writeValueAsString(data)flowFile = session.write(flowFile, { outputStream ->outputStream.write(updatedJson.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)flowFile = session.putAttribute(flowFile, "mime.type", "application/json; charset=utf-8")session.transfer(flowFile, REL_SUCCESS)} catch (Exception e) {log.error("处理FlowFile时发生错误", e)session.transfer(flowFile, REL_FAILURE)
}
这个代码的作用是读取 上游的数据,并通过脚本修改这个数据,再投放给下游。
2.3 配置 PutFile
这里只需要配置一下执行路径就可以了

2.4 测试
我们使用 apipost 工具给 http 服务发一个请求

我们可以查看生成的文件内容
{"name":"张飞","age":20},我们可以操作这个JSON.