请编程实现以下功能:
(1)createTable(String tableName, String[] fields)
创建表,参数tableName为表的名称,字符串数组fields为存储记录各个字段名称的数组。要求当HBase已经存在名为tableName的表的时候,先删除原有的表,然后再创建新的表。
(2)addRecord(String tableName, String row, String[] fields, String[] values)
向表tableName、行row(用S_Name表示)和字符串数组fields指定的单元格中添加对应的数据values。其中,fields中每个元素如果对应的列族下还有相应的列限定符的话,用“columnFamily:column”表示。例如,同时向“Math”、“Computer Science”、“English”三列添加成绩时,字符串数组fields为{“Score:Math”, ”Score:Computer Science”, ”Score:English”},数组values存储这三门课的成绩。
(3)scanColumn(String tableName, String column)
浏览表tableName某一列的数据,如果某一行记录中该列数据不存在,则返回null。要求当参数column为某一列族名称时,如果底下有若干个列限定符,则要列出每个列限定符代表的列的数据;当参数column为某一列具体名称(例如“Score:Math”)时,只需要列出该列的数据。
(4)modifyData(String tableName, String row, String column)
修改表tableName,行row(可以用学生姓名S_Name表示),列column指定的单元格的数据。
(5)deleteRow(String tableName, String row)
删除表tableName中row指定的行的记录。
代码如下:
导入依赖:
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-shaded-client</artifactId><version>2.1.0</version>
</dependency>
HBaseConnectionManager
package com.example;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;public class HBaseConnectionManager {private static Connection connection = null;public static synchronized Connection getConnection() {if (connection == null || connection.isClosed()) {try {Configuration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum", "node1,node2,node3");config.set("hbase.client.log.scanner", "false");config.set("hbase.client.log.batch", "false");config.set("hbase.security.authentication", "simple");config.set("hbase.rpc.protection", "authentication");connection = ConnectionFactory.createConnection(config);System.out.println("HBase连接已建立");} catch (Exception e) {System.err.println("连接失败: " + e.getMessage());e.printStackTrace();}}return connection;}public static synchronized void closeConnection() {if (connection != null) {try {if (!connection.isClosed()) {connection.close();System.out.println("HBase连接已关闭");}} catch (Exception e) {e.printStackTrace();} finally {connection = null;}}}// 在程序退出时调用public static void shutdown() {closeConnection();}
}
HBaseCRUDPooled
package com.example;import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.util.*;public class HBaseCRUDPooled {/*** (1) 创建表 - 如果表已存在则先删除再创建*/public static void createTable(String tableName, String[] fields) {try (Admin admin = HBaseConnectionManager.getConnection().getAdmin()) {TableName tn = TableName.valueOf(tableName);// 如果表已存在,先删除if (admin.tableExists(tn)) {System.out.println("表 " + tableName + " 已存在,正在删除...");admin.disableTable(tn);admin.deleteTable(tn);System.out.println("表 " + tableName + " 删除成功");}// 从fields中提取列族(去重)Set<String> columnFamilies = new HashSet<>();for (String field : fields) {if (field.contains(":")) {String family = field.split(":")[0];columnFamilies.add(family);} else {// 如果没有冒号,整个字段作为列族columnFamilies.add(field);}}// 创建表描述器TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder.newBuilder(tn);// 添加列族List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>();for (String family : columnFamilies) {ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(family);familyDescriptors.add(familyDesc);}tableDescBuilder.setColumnFamilies(familyDescriptors);TableDescriptor tableDesc = tableDescBuilder.build();// 创建表admin.createTable(tableDesc);System.out.println("表 " + tableName + " 创建成功,列族: " + columnFamilies);} catch (IOException e) {System.err.println("创建表失败: " + e.getMessage());e.printStackTrace();}}/*** (2) 添加记录*/public static void addRecord(String tableName, String row, String[] fields, String[] values) {if (fields.length != values.length) {System.err.println("错误:字段数量和值数量不匹配");return;}// 检查行键是否为空if (row == null || row.trim().isEmpty()) {System.err.println("错误:行键不能为空");return;}try (Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf(tableName))) {Put put = new Put(Bytes.toBytes(row));for (int i = 0; i < fields.length; i++) {String field = fields[i];String value = values[i];if (field.contains(":")) {// 有列限定符的情况 "family:qualifier"String[] parts = field.split(":", 2);String family = parts[0];String qualifier = parts[1];put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));} else {// 只有列族的情况,使用字段名作为列限定符put.addColumn(Bytes.toBytes(field), Bytes.toBytes("value"), Bytes.toBytes(value));}}table.put(put);System.out.println("成功向表 " + tableName + " 的行 " + row + " 添加记录");} catch (IOException e) {System.err.println("添加记录失败: " + e.getMessage());e.printStackTrace();}}/*** (3) 浏览某一列的数据*/public static void scanColumn(String tableName, String column) {try (Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf(tableName))) {Scan scan = new Scan();if (column.contains(":")) {// 具体列 "family:qualifier"String[] parts = column.split(":", 2);String family = parts[0];String qualifier = parts[1];scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));} else {// 整个列族scan.addFamily(Bytes.toBytes(column));}ResultScanner scanner = table.getScanner(scan);System.out.println("=== 表 " + tableName + " 的列 " + column + " 数据 ===");boolean hasData = false;for (Result result : scanner) {hasData = true;System.out.print("行键: " + Bytes.toString(result.getRow()) + " -> ");if (column.contains(":")) {// 具体列的情况String[] parts = column.split(":", 2);String family = parts[0];String qualifier = parts[1];byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));if (value != null) {System.out.println(Bytes.toString(value));} else {System.out.println("null");}} else {// 列族的情况NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes(column));if (familyMap != null && !familyMap.isEmpty()) {for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) {String qualifier = Bytes.toString(entry.getKey());String value = Bytes.toString(entry.getValue());System.out.print(column + ":" + qualifier + "=" + value + " ");}System.out.println();} else {System.out.println("null");}}}if (!hasData) {System.out.println("没有找到数据");}} catch (IOException e) {System.err.println("浏览列数据失败: " + e.getMessage());e.printStackTrace();}}/*** (4) 修改指定单元格的数据*/public static void modifyData(String tableName, String row, String column, String newValue) {try (Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf(tableName))) {if (!column.contains(":")) {System.err.println("错误:列参数必须包含列族和列限定符,格式为 'family:qualifier'");return;}String[] parts = column.split(":", 2);String family = parts[0];String qualifier = parts[1];Put put = new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(newValue));table.put(put);System.out.println("成功修改表 " + tableName + " 行 " + row + " 列 " + column + " 的值为: " + newValue);} catch (IOException e) {System.err.println("修改数据失败: " + e.getMessage());e.printStackTrace();}}/*** (5) 删除指定行*/public static void deleteRow(String tableName, String row) {try (Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf(tableName))) {Delete delete = new Delete(Bytes.toBytes(row));table.delete(delete);System.out.println("成功删除表 " + tableName + " 的行: " + row);} catch (IOException e) {System.err.println("删除行失败: " + e.getMessage());e.printStackTrace();}}/*** 辅助方法:显示表的所有数据*/public static void displayTable(String tableName) {try (Table table = HBaseConnectionManager.getConnection().getTable(TableName.valueOf(tableName))) {Scan scan = new Scan();ResultScanner scanner = table.getScanner(scan);System.out.println("\n=== 表 " + tableName + " 的所有数据 ===");boolean hasData = false;for (Result result : scanner) {hasData = true;System.out.println("行键: " + Bytes.toString(result.getRow()));for (Cell cell : result.rawCells()) {String family = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength());String qualifier = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength());String value = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());System.out.println(" " + family + ":" + qualifier + " = " + value);}System.out.println("---");}if (!hasData) {System.out.println("表为空");}} catch (IOException e) {System.err.println("显示表数据失败: " + e.getMessage());e.printStackTrace();}}/*** 辅助方法:检查表是否存在*/public static boolean tableExists(String tableName) {try (Admin admin = HBaseConnectionManager.getConnection().getAdmin()) {return admin.tableExists(TableName.valueOf(tableName));} catch (IOException e) {System.err.println("检查表存在失败: " + e.getMessage());return false;}}
}
TestHBaseCRUD
package com.example;import java.util.Scanner;public class TestHBaseCRUD {private static Scanner scanner = new Scanner(System.in);public static void main(String[] args) {System.out.println("=== HBase CRUD 操作测试系统 ===");// 注册关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {HBaseConnectionManager.shutdown();scanner.close();}));try {// 预先建立连接System.out.println("正在初始化HBase连接...");HBaseConnectionManager.getConnection();while (true) {showMenu();int choice = getIntInput("请选择操作: ");switch (choice) {case 1:testCreateTable();break;case 2:testAddRecord();break;case 3:testScanColumn();break;case 4:testModifyData();break;case 5:testDeleteRow();break;case 6:testDisplayTable();break;case 0:System.out.println("感谢使用,再见!");HBaseConnectionManager.shutdown();return;default:System.out.println("无效选择,请重新输入");}System.out.println("\n按回车键继续...");scanner.nextLine();}} finally {HBaseConnectionManager.shutdown();}}private static void showMenu() {System.out.println("\n========== 菜单 ==========");System.out.println("1. 创建表");System.out.println("2. 添加记录");System.out.println("3. 浏览列数据");System.out.println("4. 修改数据");System.out.println("5. 删除行");System.out.println("6. 显示表所有数据");System.out.println("0. 退出");System.out.println("==========================");}private static void testCreateTable() {System.out.println("\n--- 创建表测试 ---");String tableName = getNonEmptyInput("请输入表名: ");String fieldsInput = getNonEmptyInput("请输入字段数组(用逗号分隔,如 'info:name,info:age,score:math'): ");String[] fields = fieldsInput.split(",");HBaseCRUDPooled.createTable(tableName, fields);}private static void testAddRecord() {System.out.println("\n--- 添加记录测试 ---");String tableName = getNonEmptyInput("请输入表名: ");if (!HBaseCRUDPooled.tableExists(tableName)) {System.out.println("表 " + tableName + " 不存在,请先创建表");return;}String row = getNonEmptyInput("请输入行键: ");String fieldsInput = getNonEmptyInput("请输入字段数组(用逗号分隔): ");String valuesInput = getNonEmptyInput("请输入值数组(用逗号分隔): ");String[] fields = fieldsInput.split(",");String[] values = valuesInput.split(",");if (fields.length != values.length) {System.out.println("错误:字段数量和值数量不匹配");return;}HBaseCRUDPooled.addRecord(tableName, row, fields, values);}private static void testScanColumn() {System.out.println("\n--- 浏览列数据测试 ---");String tableName = getNonEmptyInput("请输入表名: ");if (!HBaseCRUDPooled.tableExists(tableName)) {System.out.println("表 " + tableName + " 不存在");return;}String column = getNonEmptyInput("请输入列族或列(如 'info' 或 'score:math'): ");HBaseCRUDPooled.scanColumn(tableName, column);}private static void testModifyData() {System.out.println("\n--- 修改数据测试 ---");String tableName = getNonEmptyInput("请输入表名: ");if (!HBaseCRUDPooled.tableExists(tableName)) {System.out.println("表 " + tableName + " 不存在");return;}String row = getNonEmptyInput("请输入行键: ");String column = getNonEmptyInput("请输入列(格式 'family:qualifier'): ");String newValue = getNonEmptyInput("请输入新值: ");HBaseCRUDPooled.modifyData(tableName, row, column, newValue);}private static void testDeleteRow() {System.out.println("\n--- 删除行测试 ---");String tableName = getNonEmptyInput("请输入表名: ");if (!HBaseCRUDPooled.tableExists(tableName)) {System.out.println("表 " + tableName + " 不存在");return;}String row = getNonEmptyInput("请输入要删除的行键: ");HBaseCRUDPooled.deleteRow(tableName, row);}private static void testDisplayTable() {System.out.println("\n--- 显示表所有数据 ---");String tableName = getNonEmptyInput("请输入表名: ");if (!HBaseCRUDPooled.tableExists(tableName)) {System.out.println("表 " + tableName + " 不存在");return;}HBaseCRUDPooled.displayTable(tableName);}private static String getStringInput(String prompt) {System.out.print(prompt);return scanner.nextLine().trim();}private static String getNonEmptyInput(String prompt) {while (true) {System.out.print(prompt);String input = scanner.nextLine().trim();if (!input.isEmpty()) {return input;}System.out.println("错误:输入不能为空,请重新输入");}}private static int getIntInput(String prompt) {while (true) {try {System.out.print(prompt);int result = scanner.nextInt();scanner.nextLine(); // 消耗换行符return result;} catch (Exception e) {System.out.println("请输入有效的数字");scanner.nextLine(); // 清除无效输入}}}
}