怎么做资源类网站网站建设外文版政策文件
news/
2025/9/23 3:44:46/
文章来源:
怎么做资源类网站,网站建设外文版政策文件,网上商城网站建设设计方案,最好要使用中文目录【Spark-HDFS小文件合并】使用 Spark 实现 HDFS 小文件合并 1#xff09;导入依赖2#xff09;代码实现2.1.HDFSUtils2.2.MergeFilesApplication 需求描述#xff1a;
1、使用 Spark 做小文件合并压缩处理。
2、实际生产中相关配置、日志、明细可以记录在 Mysql 中。
3、… 【Spark-HDFS小文件合并】使用 Spark 实现 HDFS 小文件合并 1导入依赖2代码实现2.1.HDFSUtils2.2.MergeFilesApplication 需求描述
1、使用 Spark 做小文件合并压缩处理。
2、实际生产中相关配置、日志、明细可以记录在 Mysql 中。
3、core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xmlx 等文件放在项目的 resources 目录下进行认证。
4、下面的案例抽取出了主体部分的代码具体实现时需要结合 HDFS 工具类利用好 Mysql 做好配置、日志、以及相关明细结合各自业务进行文件合并。
1导入依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdtest.cn.suitcase/groupIdartifactIdmergefiles/artifactIdversion4.0.0/versionpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingjava.version1.8/java.versionmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetencodingUTF-8/encoding
!-- spark.version3.0.2/spark.version--spark.version2.4.8/spark.versionscala.version2.11.12/scala.version/propertiesdependenciesdependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion2.20.0/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion3.3.2/version/dependency!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.3.2/version/dependency!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion3.3.2/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion2.20.0/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion${scala.version}/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-compiler/artifactIdversion${scala.version}/version/dependencydependencygroupIdorg.scala-lang/groupIdartifactIdscala-reflect/artifactIdversion${scala.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion${spark.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-launcher_2.11/artifactIdversion${spark.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.11/artifactIdversion${spark.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.11/artifactIdversion${spark.version}/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.32/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.33/version/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactIdversion2.14.2/version/dependency/dependenciesbuildplugins!-- Java Compiler --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource${java.version}/sourcetarget${java.version}/target/configuration/plugin!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --!-- Change the value of mainClass.../mainClass if your program entry point changes. --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.0.0/versionexecutions!-- Run shade goal on package phase --executionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludeorg.apache.flink:force-shading/excludeexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludeorg.apache.logging.log4j:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filters/configuration/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-surefire-plugin/artifactIdversion2.22.1/versionconfigurationgroupsIntegrationTest/groups/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource${java.version}/sourcetarget${java.version}/target/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-surefire-plugin/artifactIdversion2.22.1/version/plugin/plugins/build/project2代码实现
2.1.HDFSUtils
public class HDFSUtils {private static Logger logger LoggerFactory.getLogger(HDFSUtils.class);private static final Configuration hdfsConfig new Configuration();private static FileSystem fs;public static void init() {System.out.println(Thread.currentThread().getContextClassLoader());try {hdfsConfig.addResource(Thread.currentThread().getContextClassLoader().getResource(./core-site.xml));hdfsConfig.addResource(Thread.currentThread().getContextClassLoader().getResource(./hdfs-site.xml));fs FileSystem.get(hdfsConfig);} catch (FileNotFoundException fnfe) {fnfe.printStackTrace();logger.error(Load properties failed.);} catch (IOException ioe) {ioe.printStackTrace();logger.error(String.format(IOException: ioe.getMessage()));}}public static long getDirectorySize(String directoryPath) {final Path path new Path(directoryPath);long size 0;try {size fs.getContentSummary(path).getLength();} catch (IOException ex) {}return size;}public static long getFileCount(String directoryPath) {final Path path new Path(directoryPath);long count 0;try {count fs.getContentSummary(path).getFileCount();} catch (IOException ex) {}return count;}public static long getBlockSize() {return fs.getDefaultBlockSize(fs.getHomeDirectory());}public static String getFile(String filePath) {final Path path new Path(filePath);FSDataInputStream dis null;String fileName null;try {if (fs.exists(path) fs.isFile(path)) {dis fs.open(path);StringWriter stringWriter new StringWriter();IOUtils.copy(dis, stringWriter, UTF-8);fileName stringWriter.toString();return fileName;} else {throw new FileNotFoundException();}} catch (IOException ioException) {logger.error(Get file from hdfs failed: ioException.getMessage());} finally {if (dis ! null) {try {dis.close();} catch (IOException ex) {logger.error(close FSDataInputStream failed: ex.getMessage());}}}return fileName;}public static Boolean exists(String filePath) {Path path new Path(filePath);Boolean ifExists false;try {ifExists fs.exists(path);return ifExists;} catch (IOException ex) {logger.error(String.format(hdfs file %s not exists, filePath));}return ifExists;}public static boolean renameDir(String existingName, String newName) {final Path existingPath new Path(existingName);final Path finalName new Path(newName);try {if (exists(newName)) {logger.error(String.format(Path %s already exists when try to rename %s to %s., newName, existingName, newName));return false;}return fs.rename(existingPath, finalName);} catch (IOException ex) {logger.error(Rename hdfs directory failed: ex.getMessage());}return false;}public static boolean removeDirSkipTrash(String dir) {Path path new Path(dir);boolean rv false;try {if (exists(dir)) {if (fs.delete(path, true)) {logger.info(String.format(文件夹 %s 删除成功., path));rv true;}} else {logger.error(String.format(要删除的文件夹 %s 不存在, dir));return false;}} catch (IOException ex) {logger.error(文件夹 %s 存在但是删除失败);}return rv;}public static ListString listDirs(String baseDir) {Path path new Path(baseDir);ListString dirs new ArrayList();try {FileStatus[] fileStatuses fs.globStatus(path);for (int i 0; i fileStatuses.length; i) {dirs.add(fileStatuses[i].getPath().toUri().getRawPath());}}} catch (Exception ex) {logger.error(String.format(List directories under %s failed., baseDir));}return dirs;}public static void close() {try {fs.close();} catch (IOException ex) {logger.error(hdfs file system close failed: ex.getMessage());}}}2.2.MergeFilesApplication
下面的案例抽取出了主体部分的代码具体实现时需要结合 HDFS 工具类利用好 Mysql 做好配置、日志、以及相关明细结合各自业务进行文件合并。
public class MergeFilesApplication {public static void main(String[] args) {System.out.println(Arrays.asList(args));//指定hadoop用户System.setProperty(HADOOP_USER_NAME, hdfs);System.setProperty(user.name, hdfs);//获取 SparkSession 对象SparkSession sparkSession SparkSession.builder().config(spark.scheduler.mode, FAIR)//配置调度模式.config(spark.sql.warehouse.dir, /warehouse/tablespace/external/hive)//配置warehouse目录.appName(MergeFilesApplication).getOrCreate();//合并文件sparkSession.read()//spark读取.parquet(sourceDir)//读取数据源目录.coalesce(partitions)//配置spark分区数.sortWithinPartitions(col1, col2)//每个分区内按照指定需要的列进行排序.write()//spark写入.mode(SaveMode.Append)//写入模式为追加.option(compression, gzip)//压缩方式以为gzip.parquet(targetMergedDir);//写入目标目录}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/911268.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!