主程序
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ParameterTool propertiesargs = ParameterTool.fromArgs(args);String fileName = propertiesargs.get("CephConfPath");org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();FileSystem fs = FileSystem.get(URI.create(fileName), conf);fs.open(new org.apache.hadoop.fs.Path(fileName));ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(fs.open(new org.apache.hadoop.fs.Path(fileName)).getWrappedStream());env.getConfig().setGlobalJobParameters(propertiesFile);new CephConfig(propertiesFile);env.setStateBackend(new FsStateBackend(FSSTATEBACKEND));env.enableCheckpointing(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(100000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));DataStreamSource<String> dataStream = env.addSource(KafkaUtils.getKafkaSource(KAFKA_SOURCE_TOPIC, KAFKA_SOURCE_GROUP));…………SingleOutputStreamOperator<CephAccessRecord> record = AsyncDataStream.unorderedWait(validDS,new DimAsyncFunction<CephAccessRecord>() {@Overridepublic String getKey(CephAccessRecord record) {return record.access_key;}},60, TimeUnit.SECONDS);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(HDFS_FILE_PATH),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.DAYS.toMillis(1)).withInactivityInterval(TimeUnit.DAYS.toMillis(1 )).withMaxPartSize(1024 * 1024 * 1024).build()).withBucketAssigner(assigner).build();
dataStream.map(line->JSON.toJSONString(line)).addSink(fileSink);env.execute();
异步关联程序
package com.data.ceph.function;import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;import java.util.Collections;
import java.util.Map;public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimAsyncJoinFunction<T> {private org.apache.hadoop.hbase.client.Connection connection = null;private ResultScanner rs = null;private Table table = null;@Overridepublic void open(Configuration parameters) throws Exception {System.setProperty("zookeeper.sasl.client", "false");Map<String, String> stringStringMap = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();String hbase = stringStringMap.get("hbase_zookeeper_quorum");org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();hconf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.23.37,172.16.23.38,172.16.23.39");
hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hive");connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));table = connection.getTable(TableName.valueOf("cloud:user_info"));}@Overridepublic void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {Get get = new Get(Bytes.toBytes(getKey(input)));Result rs = table.get(get);for (Cell cell : rs.rawCells()) {String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());BeanUtils.setProperty(input, column, value);}resultFuture.complete(Collections.singletonList(input));}@Overridepublic void close() throws Exception {if (rs != null) rs.close();if (table != null) table.close();if (connection != null) connection.close();}@Overridepublic void timeout(T input, ResultFuture<T> resultFuture) throws Exception {System.out.println("TimeOut:" + input);}
}