storm-hbase jar包中的bolt节点源码解析

      一段时间内,大家都是自己在storm的节点中实现对hbase的操作,不管是普通的topo还是在trident中都是这样;不知道从那个版本起,在storm的压缩包中就多出了好几个jar包,把针对habse,mysql,mongodb等等的数据库的操作都写好了框架,不需要我们在去自己实现了;这里就先解析一下strom-habse这个jar包在普通的topo中是怎么实现的:


    

package org.apache.storm.hbase.bolt;import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
import org.apache.storm.hbase.common.HBaseClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;/**
* @ClassName: AbstractHBaseBolt
* @Description: 抽象的HbaseBolt,在该bolt中不提供任何的功能,只是对一些公用的初始值进行了赋值;具体的功能由他的子类负责,一个子类负责保存,一个子类负责查询
*/
public abstract class AbstractHBaseBolt extends BaseRichBolt {//这里继承了storm的BaseRichBoltprivate static final Logger LOG = LoggerFactory.getLogger(AbstractHBaseBolt.class);protected OutputCollector collector;//hbaseClient 不参与序列化,这是storm-hbase自己实现的HbaseClient,用来连接和访问hbase数据库;在文中后面的地方会说protected transient HBaseClient hBaseClient;//这个bolt所操作的hbase中的表名protected String tableName;//这是storm-hbase中定一个一个接口,作用就是把一个接收到的tuple转换为hbase对应的rowkey和columnprotected HBaseMapper mapper;protected String configKey;//会在子类中进行赋值,表示对hbase的配置信息在strom的map中的key值/*使用strom-hbase的时候,在初始化topology的时候,在storm的conf中把habse的相关配置设置到一个map中,然后通过这个key在bolt中取到*/public AbstractHBaseBolt(String tableName, HBaseMapper mapper) {Validate.notEmpty(tableName, "Table name can not be blank or null");Validate.notNull(mapper, "mapper can not be null");this.tableName = tableName;this.mapper = mapper;}@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {this.collector = collector;final Configuration hbConfig = HBaseConfiguration.create();Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);if(conf == null) {//在使用storm-hbase的时候,需要在storm的conf中初始化一个map,用来存放hbase对应的配置信息throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");}if(conf.get("hbase.rootdir") == null) {LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");}for(String key : conf.keySet()) {//设置相关的hbase的配置信息hbConfig.set(key, String.valueOf(conf.get(key)));}Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);//conf是一个持久化的map,所以这里复制一份给hbaseClient使用//为了能够向后兼容,需要把TOPOLOGY_AUTO_CREDENTIALS赋值到hbase的配置信息中(有了这个参数nimbus给每个worker自己的凭证信息,然后worker这个凭证信息去访问habse;在开启kebers认证的时候用的吧)hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);//创建hbaseClient}@Overridepublic void cleanup() {//正确关闭hbaseClienttry {hBaseClient.close();} catch (IOException e) {LOG.error("HBase Client Close Failed ", e);}}
}

上面的代码是一个抽象的hbase的bolt,和我们写bolt的时候一样,继承了BaseRichBolt(虽然我偶尔集成basebasicBolt);在代码中的prepare方法中主要做了下面的事情

1.初始化了collector,因为只有在运行到prepare方法的时候,才能够获得到collector的实例;

2.初始化了hBaseClient,在这里初始化hBaseCilent是因为他是transient修饰的,不会被序列化;所以在nimbus下发任务的时候该对象是null,只能在prepare中实例化,这么做的原因是hBaseClient在网络序列化传输的过程中存在很多无法反序列化的情况;(具体一点,就举一个例子,在hbaseClient中会建立客户端和集群之间的代理对象,进行rpc通信;所以nimbus不可能把他建立的rpc通信通过网络传输赋值给某个worker;因为就算worker拿到了这个client对象,也并没有建立rpc通信)


在构造方法中则赋值了可以在网络之间传输的tablename和mapper;


在cleanup方法中,主要负责安全地关闭hbaseClient;



下面来看他的两个子类,他们分别用来写入hbase和查询habse;主要逻辑就是实现execute方法了。


(1)向hbase写入数据的bolt,源码与注释如下:


package org.apache.storm.hbase.bolt;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.BatchHelper;
import org.apache.storm.utils.TupleUtils;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
import org.apache.storm.hbase.common.ColumnList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.List;
import java.util.LinkedList;/*** Basic bolt for writing to HBase.向habse写入数据的最基础的bolt** Note: Each HBaseBolt defined in a topology is tied to a specific table.注意:每一个habseBolt都与habse中一张特定的表绑定在一起**/
public class HBaseBolt  extends AbstractHBaseBolt {private static final long serialVersionUID = 5638715724596618454L;private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class);private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;//默认的保存周期为1秒boolean writeToWAL = true;//是否写入到滚动日志(在hbase中滚动日志的存在能够确保在断电等紧急情况发生后,重新开机数据不丢失;但是会降低吞吐量)List<Mutation> batchMutations;//要保存到hbase的数据,可能是存放数据,也可能是计数int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;int batchSize;//在batchHelper中的batchSize的大小,默认为0BatchHelper batchHelper;//storm中自带的批处理工具public HBaseBolt(String tableName, HBaseMapper mapper) {super(tableName, mapper);this.batchMutations = new LinkedList<>();}public HBaseBolt writeToWAL(boolean writeToWAL) {this.writeToWAL = writeToWAL;return this;}public HBaseBolt withConfigKey(String configKey) {this.configKey = configKey;return this;}public HBaseBolt withBatchSize(int batchSize) {this.batchSize = batchSize;return this;}public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {this.flushIntervalSecs = flushIntervalSecs;return this;}@Overridepublic Map<String, Object> getComponentConfiguration() {//设置每隔flushIntervalSecs,发送一次心跳信息的tuplereturn TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);}@Overridepublic void execute(Tuple tuple) {try {if (batchHelper.shouldHandle(tuple)) {//接收到的tuple是需要处理的数据tuple(接收到的是业务tuple)byte[] rowKey = this.mapper.rowKey(tuple);//通过mapper来获得hbase中对应的rowkeyColumnList cols = this.mapper.columns(tuple);//把tuple中的列相关的数据映射为hbase的col(这个ColumnList是封装过的,封装了hbase的普通的列和计数列两种,后面会说)/*这里正式将封装过的结果映射为一个mutation的list,然后直接调用hbase的API对这个list进行批量保存*/List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);batchMutations.addAll(mutations);//放到总的batchMutations中batchHelper.addBatch(tuple);//将要批量保存的tuple缓存到batch中}if (batchHelper.shouldFlush()) {//要保存数据到hbase中,(当接收到tick tuple或者在上面的代码执行后,缓存的tuple达到了batchsize的时候执行)this.hBaseClient.batchMutate(batchMutations);//调用hbaseClient进行批量保存LOG.debug("acknowledging tuples after batchMutate");batchHelper.ack();//批量确认batchMutations.clear();//清空}} catch(Exception e){batchHelper.fail(e);//处理失败batchMutations.clear();//清空 这里还不太确定,(在失败以后,整个batch的tuple都会失败重发,所以要清空)}}@Overridepublic void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext topologyContext, OutputCollector collector) {super.prepare(map, topologyContext, collector);//这个必须有,要调用父类中的prepare方法做很多事情this.batchHelper = new BatchHelper(batchSize, collector);//趋势化一个batchHelper,大小就是配置的batchsize}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
}


   (2)从hbase中读取信息

package org.apache.storm.hbase.bolt;import org.apache.commons.lang.Validate;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;import com.google.common.collect.Lists;/*** Basic bolt for querying from HBase.** Note: Each HBaseBolt defined in a topology is tied to a specific table.**/
public class HBaseLookupBolt extends AbstractHBaseBolt {private static final long serialVersionUID = 8253062226922790455L;/*这个是用来将从habse查询到的result转化为tuple以及声明输出的field字段的,需要自己实现;后面介绍*/private HBaseValueMapper rowToTupleMapper;/*这个主要用于辅助get查询,里面分装了要查询的columFamily 和 columFamily:qualifier 两数据,以便于在get的时候可以只指定列族,也可以指定特定的列;后面会介绍*/private HBaseProjectionCriteria projectionCriteria;public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){super(tableName, mapper);Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null");this.rowToTupleMapper = rowToTupleMapper;}public HBaseLookupBolt withConfigKey(String configKey){this.configKey = configKey;return this;}public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) {this.projectionCriteria = projectionCriteria;return this;}@Overridepublic void execute(Tuple tuple) {if (TupleUtils.isTick(tuple)) {//如果是心跳信息的tuple,那么直接忽略不处理collector.ack(tuple);//直接确认这个tick tuplereturn;}byte[] rowKey = this.mapper.rowKey(tuple);Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria);//构建查询try {/*其实操作很简单 就是 table.get(get)操作;他这里看着有些不对劲的地方就是调用批量查询方法来查询单个的get;这是为了匹配他们自己封装的接口,也就是没有多封装一下*/Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];for(Values values : rowToTupleMapper.toValues(tuple, result)) {//把result转换为tuple,并逐一发送;this.collector.emit(tuple, values);}this.collector.ack(tuple);//发送确认消息} catch (Exception e) {//出错处理this.collector.reportError(e);this.collector.fail(tuple);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);//声明输出field字段}
}



 接着是hbaseClient的代码注释

package org.apache.storm.hbase.common;import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
import org.apache.storm.hbase.security.HBaseSecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;public class HBaseClient implements Closeable{private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class);private HTable table;//Hbase中的table对象public HBaseClient(Map<String, Object> map , final Configuration configuration, final String tableName) {try {UserProvider provider = HBaseSecurityUtil.login(map, configuration);//这里使用了Hbase的安全登录方式来进行登录this.table = provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<HTable>() {@Overridepublic HTable run() throws IOException {return new HTable(configuration, tableName);//创建hbase中的table}});} catch(Exception e) {throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e);}}public List<Mutation> constructMutationReq(byte[] rowKey, ColumnList cols, Durability durability) {//创建一个mutation的LIST用来保存/更新List<Mutation> mutations = Lists.newArrayList();if (cols.hasColumns()) {//如果有标准的column要保存Put put = new Put(rowKey);//创建put对象put.setDurability(durability);//设置持久行for (ColumnList.Column col : cols.getColumns()) {if (col.getTs() > 0) {//有时间戳的加时间戳put.add(col.getFamily(),col.getQualifier(),col.getTs(),col.getValue());} else {//没有时间戳的不加时间戳put.add(col.getFamily(),col.getQualifier(),col.getValue());}}mutations.add(put);//把put缓存起来}if (cols.hasCounters()) {//如果有计数列要保存Increment inc = new Increment(rowKey);//创建一个increment对象inc.setDurability(durability);//设置持久性for (ColumnList.Counter cnt : cols.getCounters()) {inc.addColumn(cnt.getFamily(),cnt.getQualifier(),cnt.getIncrement());}mutations.add(inc);}if (mutations.isEmpty()) {//如果即没有计数也没有标准的column,那就添加一个空的---也就是只保存rowkeymutations.add(new Put(rowKey));}return mutations;}public void batchMutate(List<Mutation> mutations) throws Exception {//批量提交mutation列表中的插入和更新操作Object[] result = new Object[mutations.size()];try {table.batch(mutations, result);} catch (InterruptedException e) {LOG.warn("Error performing a mutation to HBase.", e);throw e;} catch (IOException e) {LOG.warn("Error performing a mutation to HBase.", e);throw e;}}public Get constructGetRequests(byte[] rowKey, HBaseProjectionCriteria projectionCriteria) {//创建查询操作Get get = new Get(rowKey);//创建一个habse的get对象if (projectionCriteria != null) {for (byte[] columnFamily : projectionCriteria.getColumnFamilies()) {//获取要查询的列族get.addFamily(columnFamily);}for (HBaseProjectionCriteria.ColumnMetaData columnMetaData : projectionCriteria.getColumns()) {//获取要查询的列get.addColumn(columnMetaData.getColumnFamily(), columnMetaData.getQualifier());}}return get;}public Result[] batchGet(List<Get> gets) throws Exception {//批量查好多个gettry {return table.get(gets);} catch (Exception e) {LOG.warn("Could not perform HBASE lookup.", e);throw e;}}@Overridepublic void close() throws IOException {//关闭操作table.close();}
}


在hbaseClient中用到了HBaseSecurityUtil,其代码注释如下:

package org.apache.storm.hbase.security;import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** This class provides util methods for storm-hbase connector communicating* with secured HBase.*/
public class HBaseSecurityUtil {private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";private static  UserProvider legacyProvider = null;@SuppressWarnings("rawtypes")public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {//Allowing keytab based login for backward compatibility.(为了向后兼容,允许使用keytab)if (UserGroupInformation.isSecurityEnabled() && (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||!(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName())))) {//如果开启了安全认证,并且不是自动的安全认证登录,那么进行登录LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);//insure that if keytab is used only one login per process executedif(legacyProvider == null) {synchronized (HBaseSecurityUtil.class) {if(legacyProvider == null) {//初始化一个userProviderlegacyProvider = UserProvider.instantiate(hbaseConfig);/*下面就是一堆安全登录的代码,有兴趣自己看*/String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);if (keytab != null) {hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);}String userName = (String) conf.get(STORM_USER_NAME_KEY);if (userName != null) {hbaseConfig.set(STORM_USER_NAME_KEY, userName);}legacyProvider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,InetAddress.getLocalHost().getCanonicalHostName());}}}return legacyProvider;} else {//如果布开启安全认证或者是自动安全认证,那么就直接实例化一个userProviderreturn UserProvider.instantiate(hbaseConfig);}}
}


然后是HBaseMapper的代码注释:

package org.apache.storm.hbase.bolt.mapper;import org.apache.storm.tuple.Tuple;
import org.apache.storm.hbase.common.ColumnList;import java.io.Serializable;/*** Maps a <code>org.apache.storm.tuple.Tuple</code> object* to a row in an HBase table.* 这是一个接口,需要自己实现,主要提供了两个方法,一个是如何从tuple中获取rowKey,一个是如何从tuple中获取columns* 自带一个简单实现,在下面的代码中会看到*/
public interface HBaseMapper extends Serializable {/*** Given a tuple, return the HBase rowkey.** @param tuple* @return*/byte[] rowKey(Tuple tuple);/*** Given a tuple, return a list of HBase columns to insert.** @param tuple* @return*/ColumnList columns(Tuple tuple);}

他的简单实现如下:

package org.apache.storm.hbase.bolt.mapper;import static org.apache.storm.hbase.common.Utils.toBytes;
import static org.apache.storm.hbase.common.Utils.toLong;import org.apache.storm.hbase.common.ColumnList;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;public class SimpleHBaseMapper implements HBaseMapper {private static final long serialVersionUID = 1L;private String rowKeyField;//hbase中的rowKey在tuple中对应的fieldprivate byte[] columnFamily;//列族private Fields columnFields;//hbase中普通列对应的tuple中的字段数组private Fields counterFields;//hbase中计数列对应的tuple中的字段数组public SimpleHBaseMapper(){}public SimpleHBaseMapper withRowKeyField(String rowKeyField){//就是一堆set方法,不过链式调用起来很爽的感觉···this.rowKeyField = rowKeyField;return this;}public SimpleHBaseMapper withColumnFields(Fields columnFields){this.columnFields = columnFields;return this;}public SimpleHBaseMapper withCounterFields(Fields counterFields){this.counterFields = counterFields;return this;}public SimpleHBaseMapper withColumnFamily(String columnFamily){this.columnFamily = columnFamily.getBytes();return this;}@Overridepublic byte[] rowKey(Tuple tuple) {//返回rowKey的值Object objVal = tuple.getValueByField(this.rowKeyField);return toBytes(objVal);}@Overridepublic ColumnList columns(Tuple tuple) {//这个columnList也是storm-hbase自定义的,见下面ColumnList cols = new ColumnList();if(this.columnFields != null){//如果普通的列的字段不为空,那么给添加一个hbase中的列数据{列族-->列-->值}for(String field : this.columnFields){cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));}}if(this.counterFields != null){//如果计数列的字段不为空,那么添加一个 计数值{列族-->列-->计数值(一定是long类型的)}for(String field : this.counterFields){cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));}}return cols;}
}

在很多地方用到了ColumnList,其代码注释如下:

package org.apache.storm.hbase.common;import java.util.ArrayList;
import java.util.List;/*** Represents a list of HBase columns.代表一个hbase的column的列表** There are two types of columns, <i>standard</i> and <i>counter</i>.有两种类型的column,分别是标准的和计数的** Standard columns have <i>column family</i> (required), <i>qualifier</i> (optional),* <i>timestamp</i> (optional), and a <i>value</i> (optional) values.* 标准的列有column family(必须有),qualifier(可选),时间错(可选)以及值(可选)  ;** Counter columns have <i>column family</i> (required), <i>qualifier</i> (optional),* and an <i>increment</i> (optional, but recommended) values.*计数列有columnfamily(必须有),qualifier(可选)以及一个要增加的数值(可选,但是建议填上)*** Inserts/Updates can be added via the <code>addColumn()</code> and <code>addCounter()</code>* methods.*插入/更新操作可以通过addColumn()和addCounter()来添加到对象中**/
public class ColumnList {public static abstract class AbstractColumn {//一个抽象的column,提供了最基本的famliy和qualifierbyte[] family, qualifier;AbstractColumn(byte[] family, byte[] qualifier){this.family = family;this.qualifier = qualifier;}public byte[] getFamily() {return family;}public byte[] getQualifier() {return qualifier;}}public static class Column extends AbstractColumn {//标准的columnbyte[] value;long ts = -1;//默认的时间戳为-1Column(byte[] family, byte[] qualifier, long ts, byte[] value){super(family, qualifier);this.value = value;this.ts = ts;}public byte[] getValue() {return value;}public long getTs() {return ts;}}public static class Counter extends AbstractColumn {//计数的columnlong incr = 0;//默认计数值为0Counter(byte[] family, byte[] qualifier, long incr){super(family, qualifier);this.incr = incr;}public long getIncrement() {return incr;}}private ArrayList<Column> columns;private ArrayList<Counter> counters;private ArrayList<Column> columns(){if(this.columns == null){this.columns = new ArrayList<Column>();}return this.columns;}private ArrayList<Counter> counters(){if(this.counters == null){this.counters = new ArrayList<Counter>();}return this.counters;}/*** Add a standard HBase column.** @param family* @param qualifier* @param ts* @param value* @return*/public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] value){//添加一个标准的column到对象中columns().add(new Column(family, qualifier, ts, value));return this;}/*** Add a standard HBase column* @param family* @param qualifier* @param value* @return*/public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){columns().add(new Column(family, qualifier, -1, value));return this;}/*** Add a standard HBase column given an instance of a class that implements* the <code>IColumn</code> interface.* @param column* @return*/public ColumnList addColumn(IColumn column){return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value());}/*** Add an HBase counter column.** @param family* @param qualifier* @param incr* @return*/public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){//添加一个计数columncounters().add(new Counter(family, qualifier, incr));return this;}/*** Add an HBase counter column given an instance of a class that implements the* <code>ICounter</code> interface.* @param counter* @return*/public ColumnList addCounter(ICounter counter){return this.addCounter(counter.family(), counter.qualifier(), counter.increment());}/*** Query to determine if we have column definitions.** @return*/public boolean hasColumns(){return this.columns != null;}/*** Query to determine if we have counter definitions.** @return*/public boolean hasCounters(){return this.counters != null;}/*** Get the list of column definitions.** @return*/public List<Column> getColumns(){return this.columns;}/*** Get the list of counter definitions.* @return*/public List<Counter> getCounters(){return this.counters;}}

在ColumnList中有通过IColumn和ICounter来添加标准column和计数column的方法,对应的接口如下:

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.storm.hbase.common;/*** Interface definition for classes that support being written to HBase as* a regular column.**/
public interface IColumn {byte[] family();byte[] qualifier();byte[] value();long timestamp();
}

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.storm.hbase.common;/*** Interface definition for classes that support being written to HBase as* a counter column.**/
public interface ICounter {byte[] family();byte[] qualifier();long increment();
}

HBaseProjectionCriteria的代码注释如下:

package org.apache.storm.hbase.bolt;import org.apache.commons.lang.Validate;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;import com.google.common.collect.Lists;/*** Basic bolt for querying from HBase.** Note: Each HBaseBolt defined in a topology is tied to a specific table.**/
public class HBaseLookupBolt extends AbstractHBaseBolt {private static final long serialVersionUID = 8253062226922790455L;/*这个是用来将从habse查询到的result转化为tuple以及声明输出的field字段的,需要自己实现;后面介绍*/private HBaseValueMapper rowToTupleMapper;/*这个主要用于辅助get查询,里面分装了要查询的columFamily 和 columFamily:qualifier 两数据,以便于在get的时候可以只指定列族,也可以指定特定的列;后面会介绍*/private HBaseProjectionCriteria projectionCriteria;public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){super(tableName, mapper);Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null");this.rowToTupleMapper = rowToTupleMapper;}public HBaseLookupBolt withConfigKey(String configKey){this.configKey = configKey;return this;}public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) {this.projectionCriteria = projectionCriteria;return this;}@Overridepublic void execute(Tuple tuple) {if (TupleUtils.isTick(tuple)) {//如果是心跳信息的tuple,那么直接忽略不处理collector.ack(tuple);//直接确认这个tick tuplereturn;}byte[] rowKey = this.mapper.rowKey(tuple);Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria);//构建查询try {/*其实操作很简单 就是 table.get(get)操作;他这里看着有些不对劲的地方就是调用批量查询方法来查询单个的get;这是为了匹配他们自己封装的接口,也就是没有多封装一下*/Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];for(Values values : rowToTupleMapper.toValues(tuple, result)) {//把result转换为tuple,并逐一发送;this.collector.emit(tuple, values);}this.collector.ack(tuple);//发送确认消息} catch (Exception e) {//出错处理this.collector.reportError(e);this.collector.fail(tuple);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);//声明输出field字段}
}

最后是HBaseValueMapper的代码注释:

package org.apache.storm.hbase.bolt.mapper;import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;
import org.apache.hadoop.hbase.client.Result;import java.io.Serializable;
import java.util.List;//这是一个接口,主要定义了如何把hbase中的查询结果映射人tuple中的values以及如何声明tuple的输出字段
public interface HBaseValueMapper extends Serializable {/**** @param input tuple.* @param result HBase lookup result instance.* @return list of values that should be emitted by the lookup bolt.* @throws Exception*/public List<Values> toValues(ITuple input, Result result) throws Exception;/*** declares the output fields for the lookup bolt.* @param declarer*/void declareOutputFields(OutputFieldsDeclarer declarer);
}



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/569225.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件之道:软件开发争议问题剖析

软件之道&#xff1a;软件开发争议问题剖析 基本信息 原书名&#xff1a; Making Software 原出版社&#xff1a; OReilly 作者&#xff1a; (美)Andy Oram Greg Wilson 译者&#xff1a; 鲍央舟 张玳 沈欢星丛书名&#xff1a; 图灵程序设计丛书出版社&#xff1a;人民邮…

如何理解矩阵

线性代数课程&#xff0c;无论你从行列式入手还是直接从矩阵入手&#xff0c;从一开始就充斥着莫名其妙。比如说&#xff0c;在全国一般工科院系教学中应用最广泛的同济线性代数教材&#xff08;现在到了第四版&#xff09;&#xff0c;一上来就介绍逆序数这个“前无古人&#…

对于泛型的理解

如果希望构建一个集合容器&#xff0c;会用到ArrayList array new ArrayList(); ArrayList有几个缺点&#xff1a;1 无法保证容器中的类型安全&#xff08;类型一致问题&#xff09; 2 存进arralist的数据&#xff0c;CIL会自动进行装箱&#xff0c;也就是保存进ArrayList中的…

交通灯管理系统

题目需求&#xff1a; 模拟实现十字路口的交通灯管理系统逻辑&#xff0c;具体需求如下&#xff1a; 1、异步随机生成按照各个路线行驶的车辆。 例如&#xff1a; 由南向而来去往北向的车辆 ---- 直行车辆 由西向而来去往南向的车辆 ---- 右转车辆 由东向…

REDIS提供的map,list,set,sortedset使用测试

public class RedisTest {public JedisPool jedisPool null;public void init(){//创建配置信息JedisPoolConfig pool new JedisPoolConfig();//设置最大的总链接数pool.setMaxTotal(300);//设置最大空闲链接数pool.setMaxIdle(100);//设置最大等待时间pool.setMaxWaitMilli…

Java 多线程-生产者、消费者

一、整体代码 ThreadDemo.java public class ThreadDemo { public static void main(String[] args) { Godown godown new Godown(0); Consumer c1 new Consumer(50, godown); Consumer c2 new Consumer(20, godown); Consumer c3 new Consumer(30, godown); Producer p1 …

scala初学之helloWorld

特此声明&#xff0c;本文中的代码 部分或全部来源王家林的scala教程&#xff1b;虽然王家林一直被大家所诟病&#xff0c;而且也无法确定这些scala的程序代码是不是他的。但是作为一个初学者觉得就算代码不是他的&#xff0c;他只是拿过来翻译一次&#xff0c;看他的视频也是能…

scala初学之函数定义、流程控制、异常处理入门

特此声明&#xff0c;本文中的代码 部分或全部来源王家林的scala教程&#xff1b;虽然王家林一直被大家所诟病&#xff0c;而且也无法确定这些scala的程序代码是不是他的。但是作为一个初学者觉得就算代码不是他的&#xff0c;他只是拿过来翻译一次&#xff0c;看他的视频也是能…

HGOI20190707 题解

Problem A 钢铁侠的诞生 现在有$n$个数字$a_i \leq 10^9 $&#xff0c;然后取出$m$个数字&#xff0c;保证合法。 从小到大输出剩余的$n-m$个数字。 对于100%的数据$m\leq n \leq 3\times 10^5$ Sol : 直接map映射然后用iterator来遍历整个map输出答案即可。 复杂度大概是$O(n…

scala初学之Tuple、Array、Map、文件操作入门实战

特此声明&#xff0c;本文中的代码 部分或全部来源王家林的scala教程&#xff1b;虽然王家林一直被大家所诟病&#xff0c;而且也无法确定这些scala的程序代码是不是他的。但是作为一个初学者觉得就算代码不是他的&#xff0c;他只是拿过来翻译一次&#xff0c;看他的视频也是能…

Java连载3-编译与运行阶段详解JRE,JDK,JVM关系

一、 1.JDK下载地址&#xff1a;https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html 二、Java的加载与执行 1.Java程序运行包括&#xff1a; &#xff08;1&#xff09;编译阶段&#xff1a;检查Java源程序是否符合Java语法&#xff0c;符…

KMP算法NEXT数组纯手工生成

用一个实际的例子来说明&#xff0c;经历了看懂&#xff0c;看不懂&#xff0c;看懂&#xff0c;看不懂&#xff0c;看懂...后我终于决定把它记下来了。 例子字符串为&#xff1a;abaabaca 首先可以肯定&#xff0c;第一个位置永远位0&#xff0c;第二个位置永远为1.那么可以…

P1078 文化之旅

题面 这题好像是初二时老师讲过的一道题&#xff0c;但是。。我没听&#xff1f;&#xff1f;反正没交过就对了。。 我本来想的是深搜spfa&#xff0c;写到50行实现不了&#xff1f;&#xff1f;果断看tj&#xff0c;floyd&#xff1f;&#xff1f;&#xff1f;&#xff08;黑人…

KMP算法中NEXT数组的作用以及代码实现

在http://blog.csdn.net/u012613903/article/details/79004094中写到了如何手工去求一个NEXT数组&#xff0c;这个在很多考试中可以用来解题。但是在实际的使用中&#xff0c;NEXT数组究竟发挥着什么样的作用&#xff0c;如何用代码实现KMP算法呢&#xff1f; KMP算法是用来确…

最长公共连续子串

给出两个字符串&#xff08;可能包含空格&#xff09;,找出其中最长的公共连续子串,输出其长度。 注意这里是找连续子串。 算法&#xff1a;动态规划。f[i][j]表示第一个字符串前i个字符中与第二个字符串前j个中的最长连续子串长度 那么状态转移为&#xff1a;当s1(i)s2(j)时&a…

求最长回文串-从动态规划到马拉车之路(上)

要解决的问题&#xff1a; 给定一个字符串&#xff0c;要求求出这个字符串中的最长的回文串子串。 例子&#xff1a; cbddba的最长回文子串为 bddb cbdedba的最长回文子串为dbedb 由上面的例子可以看到&#xff0c;在考虑回文子串的问题时需要考虑奇偶性。因为奇回文关于中…

为什么Python中称__lt__、__gt__等为“富比较”方法

Python中基类object提供了一系列可以用于实现同类对象进行“比较”的方法&#xff0c;可以用于同类对象的不同实例进行比较&#xff0c;包括__lt__、__gt__、__le__、__ge__、__eq__和__ne__六个方法。 那么为什么叫“富比较”(“rich comparison”)方法呢&#xff1f;查了相关…

求最长回文串-从动态规划到马拉车之路(下)

预备知识&#xff1a; &#xff08;1&#xff09;在一个数轴上有两点i和j&#xff08;i<j&#xff09;关于点m对称&#xff0c;那么有 i 2m-j&#xff1b; 证明&#xff1a; 因为 i<j 且 i 和 j 关于 m 对称&#xff0c;那么有 &#xff08;i j&#xff09;/ 2 m 所…

项目管理实战之团队管理 (转自:zhuweisky)

一个系统不仅需要优秀的分析和设计&#xff0c;更需要一个良好的过程将其从蓝图转化为实现。这个过程中最重要的是对团队的管理&#xff0c;也就是人的管理。一个优秀的团队和一个糟糕的团队的效能是天壤之别&#xff0c;她们之间的比例不是1:100或1:1000这样量化的数字能够表示…

python3 内置方法

# -*- coding:utf-8 -*- # Author: Evan Mi import functools # 取绝对值 print(abs:, abs(-1)) # 如果一个可迭代对象的所有元素都为真&#xff0c;返回true ;空也返回真 print(all:, all([1, 0, -3])) # 有一个为真就全为真 print(any:, any([1, 0, -1])) # 变成可打印的字符…