一、HDFS dao接口
package cn.mk.dao;import java.io.FileNotFoundException; import java.io.IOException;import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus;public interface HDFSDao {public boolean mkDirs(String path) throws IOException;public boolean rmr(String path) throws IOException;public boolean rmdir(String path) throws IOException;public boolean rename(String src, String dst) throws IOException;public FileStatus[] ls(String path) throws FileNotFoundException, IOException;public boolean createFile(String file, String content) throws IOException;public void copyFile(String local, String remote) throws IOException;public void download(String remote, String local) throws IOException;public String cat(String remoteFile) throws IOException;public BlockLocation[] location(String path) throws IOException;boolean createNewFile(String file, String content) throws IOException; }
二、HDFS dao实现类
package cn.mk.dao.impl;import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils;import cn.mk.dao.HDFSDao;public class HDFSDaoImpl implements HDFSDao {private static final String HDFS_PATH = "hdfs://master:9000/";private final FileSystem fileSystem;public HDFSDaoImpl(Configuration conf) throws IOException {this(HDFS_PATH, conf);}public HDFSDaoImpl(String hdfs, Configuration conf) throws IOException {fileSystem = FileSystem.get(URI.create(hdfs), conf);}@Overridepublic boolean mkDirs(String path) throws IOException {Path p = new Path(path);if (!fileSystem.exists(p)) {return fileSystem.mkdirs(p);}return false;}@Overridepublic boolean rmr(String path) throws IOException {Path p = new Path(path);return fileSystem.delete(p, true);}@Overridepublic boolean rmdir(String path) throws IOException {Path p = new Path(path);return fileSystem.delete(p, true);}@Overridepublic boolean rename(String src, String dst) throws IOException {Path p1 = new Path(src);Path p2 = new Path(dst);return fileSystem.rename(p1, p2);}@Overridepublic FileStatus[] ls(String path) throws FileNotFoundException, IOException {Path p = new Path(path);return fileSystem.listStatus(p);}@Overridepublic boolean createFile(String file, String content) throws IOException {Path p = new Path(file);return fileSystem.createNewFile(p);}@Overridepublic boolean createNewFile(String file, String content) throws IOException {Path p = new Path(file);boolean b= fileSystem.createNewFile(p);if(!b)return false;FSDataOutputStream os = null;try {byte[] bytes=content.getBytes();os = fileSystem.create(p);os.write(bytes, 0,bytes.length);}finally{if(os!=null)os.close();}return true;}@Overridepublic void copyFile(String local, String remote) throws IOException {Path p1 =new Path(local);Path p2 =new Path(remote);fileSystem.copyFromLocalFile(p1, p2);}@Overridepublic void download(String remote, String local) throws IOException {Path p1 =new Path(local);Path p2 =new Path(remote);fileSystem.copyToLocalFile(p2, p1);}@Overridepublic String cat(String remoteFile) throws IOException {Path p =new Path(remoteFile);FSDataInputStream in = null;String content=null;try {in=fileSystem.open(p);ByteArrayOutputStream bos =new ByteArrayOutputStream();IOUtils.copyBytes(in, bos, 4096,false);} catch (Exception e) {IOUtils.closeStream(in);}return content;}@Overridepublic BlockLocation[] location(String path) throws IOException {Path p =new Path(path);FileStatus fStatus=fileSystem.getFileStatus(p);return fileSystem.getFileBlockLocations(fStatus, 0, fStatus.getLen());}@Overrideprotected void finalize() throws Throwable {fileSystem.close();}}