wordpress 英文站Wordpress 百度多久收录
wordpress 英文站,Wordpress 百度多久收录,有专业做网站,免费设计房屋的软件背景#xff1a;
之前在使用spark operator的时候必须指定executor的个数#xff0c;在将任务发布到spark operator后#xff0c;k8s会根据指定的个数启动executor#xff0c;但是对于某些spark sql可能并不需要用到那么多executor#xff0c;在此时executor的数量就不好…背景
之前在使用spark operator的时候必须指定executor的个数在将任务发布到spark operator后k8s会根据指定的个数启动executor但是对于某些spark sql可能并不需要用到那么多executor在此时executor的数量就不好控制了。而executor的多少代表了集群资源的多少如果不提前指定executor能够动态扩展那将是最好的策略。在查询了资料后得知spark3.0已经支持了executor的动态分配。而且用法也很简单所以在之前的spark operator发布k8s的基础上又做了动态生成executor的功能。 本文参考之前一篇的文章做了部分修改以支持该功能。
【Java Kubernates】Java调用kubernates提交Yaml到SparkOperator-CSDN博客文章浏览阅读1.1k次点赞18次收藏18次。最终我选择了fabric8io因为我们需要使用k8s的自定义资源sparkApplication对于自定义资源kubernetes-client/java需要创建各个k8s对象的pojo比较麻烦。这里提一下我在重新使用spark operator的时候发现原来官方的google的spark operator镜像已经不能拉取了貌似是google发现它的两个镜像存在漏洞所以关闭了开源镜像。目前查询框架使用的是trino但是trino也有其局限性需要准备一个备用的查询框架。https://blog.csdn.net/w8998036/article/details/135821058?spm1001.2014.3001.5501
一 删除yaml中executor指定个数的配置
//测试spark 3.0的动态分配instanceprivate static String buildSparkApplicationYAMLDynamic(String taskName, String sparkImage, String sparkJarFile, String mainClass, String instance,String driverCpu, String driverMemory, String executorCpu, String executorMemory, String dynamicSQLQuery) {return String.format(apiVersion: \sparkoperator.k8s.io/v1beta2\\n kind: SparkApplication\n metadata:\n name: %s\n namespace: spark-app\n spec:\n type: Scala\n mode: cluster\n image: \%s\\n imagePullPolicy: Always\n imagePullSecrets: [\harbor\]\n mainClass: \%s\\n mainApplicationFile: \%s\\n sparkVersion: \3.3.1\\n restartPolicy:\n type: Never\n volumes:\n - name: nfs-spark-volume\n persistentVolumeClaim:\n claimName: sparkcode\n driver:\n cores: %s\n coreLimit: \1200m\\n memory: \%s\\n labels:\n version: 3.3.1\n serviceAccount: spark-svc-account\n volumeMounts:\n - name: nfs-spark-volume\n mountPath: \/app/sparkcode\\n env:\n - name: AWS_REGION\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: AWS_REGION\n - name: AWS_ACCESS_KEY_ID\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: AWS_ACCESS_KEY_ID\n - name: AWS_SECRET_ACCESS_KEY\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: AWS_SECRET_ACCESS_KEY\n - name: MINIO_ENDPOINT\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: MINIO_ENDPOINT\n - name: MINIO_HOST\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: MINIO_HOST\n - name: BUCKET_NAME\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: BUCKET_NAME\n executor:\n cores: %s\n
############去除该配置#############################################################// instances: %s\n
################################################################################## memory: \%s\\n labels:\n version: 3.3.1\n volumeMounts:\n - name: nfs-spark-volume\n mountPath: \/app/sparkcode\\n env:\n - name: AWS_REGION\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: AWS_REGION\n - name: AWS_ACCESS_KEY_ID\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: AWS_ACCESS_KEY_ID\n - name: AWS_SECRET_ACCESS_KEY\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: AWS_SECRET_ACCESS_KEY\n - name: MINIO_ENDPOINT\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: MINIO_ENDPOINT\n - name: MINIO_HOST\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: MINIO_HOST\n - name: BUCKET_NAME\n valueFrom:\n secretKeyRef:\n name: minio-secret\n key: BUCKET_NAME\n sparkConf:\n spark.query.sql: \%s\,taskName,sparkImage,mainClass,sparkJarFile,driverCpu,driverMemory,executorCpu,executorMemory,dynamicSQLQuery);} 二 配置动态参数
//测试spark3.0 动态分配executor的instancepublic static void sparkQueryForFhcS3DynamicExecutor() throws Exception{System.out.println(1);String warehouseLocation new File(spark-warehouse).getAbsolutePath();System.out.println(2);String metastoreUri thrift://wuxdihadl09b.seagate.com:9083;SparkConf sparkConf new SparkConf();sparkConf.set(fs.s3a.impl, org.apache.hadoop.fs.s3a.S3AFileSystem);sparkConf.set(fs.s3a.access.key, apPeWWr5KpXkzEW2jNKW);sparkConf.set(spark.hadoop.fs.s3a.path.style.access, true);sparkConf.set(spark.hadoop.fs.s3a.connection.ssl.enabled, true);sparkConf.set(fs.s3a.secret.key, cRt3inWAhDYtuzsDnKGLGg9EJSbJ083ekuW7PejM);sparkConf.set(fs.s3a.endpoint, wuxdimiov001.seagate.com:9000); // 替换为实际的 S3 存储的地址sparkConf.set(spark.hadoop.fs.s3a.impl, org.apache.hadoop.fs.s3a.S3AFileSystem);sparkConf.set(spark.sql.metastore.uris, metastoreUri);sparkConf.set(spark.sql.warehouse.dir, warehouseLocation);sparkConf.set(spark.sql.catalogImplementation, hive);sparkConf.set(hive.metastore.uris, metastoreUri);#####################################################添加动态参数####################### //#总开关是否开启动态资源配置根据工作负载来衡量是否应该增加或减少executor默认falsesparkConf.set(spark.dynamicAllocation.enabled, true);//#spark3新增之前没有官方支持的on k8s的Dynamic Resouce Allocation。启用shuffle文件跟踪此配置不会回收保存了shuffle数据的executorsparkConf.set(spark.dynamicAllocation.shuffleTracking.enabled, true);//#启用shuffleTracking时控制保存shuffle数据的executor超时时间默认使用GC垃圾回收控制释放。如果有时候GC不及时配置此参数后即使executor上存在shuffle数据也会被回收。sparkConf.set(spark.dynamicAllocation.shuffleTracking.timeout, 60s);//#动态分配最小executor个数在启动时就申请好的默认0sparkConf.set(spark.dynamicAllocation.minExecutors, 1);//#动态分配最大executor个数默认infinitysparkConf.set(spark.dynamicAllocation.maxExecutors, 10);//#动态分配初始executor个数默认值spark.dynamicAllocation.minExecutorssparkConf.set(spark.dynamicAllocation.initialExecutors, 2);//#当某个executor空闲超过这个设定值就会被kill默认60ssparkConf.set(spark.dynamicAllocation.executorIdleTimeout, 60s);//#当某个缓存数据的executor空闲时间超过这个设定值就会被kill默认infinitysparkConf.set(spark.dynamicAllocation.cachedExecutorIdleTimeout, 240s);//#任务队列非空资源不够申请executor的时间间隔默认1s第一次申请sparkConf.set(spark.dynamicAllocation.schedulerBacklogTimeout, 3s);//#同schedulerBacklogTimeout是申请了新executor之后继续申请的间隔默认schedulerBacklogTimeout第二次及之后sparkConf.set(spark.dynamicAllocation.sustainedSchedulerBacklogTimeout, 30s);//#开启推测执行对长尾task会在其他executor上启动相同task先运行结束的作为结果sparkConf.set(spark.specution, true);####################################################################################### //Class.forName(org.apache.hadoop.fs.s3a.S3AFileSystem);long zhenyang2 System.currentTimeMillis();SparkSession sparkSession SparkSession.builder().appName(Fhc Spark Query).config(sparkConf).enableHiveSupport().getOrCreate();System.out.println(sparkSession create cost:(System.currentTimeMillis()-zhenyang2));System.out.println(3);// 获取 SparkConf 对象String exesql sparkSession.sparkContext().getConf().get(spark.query.sql);System.out.println(3.1:exesql);System.out.println(Hive Metastore URI: sparkConf.get(spark.sql.metastore.uris));System.out.println(Hive Warehouse Directory: sparkConf.get(spark.sql.warehouse.dir));System.out.println(SHOW DATABASES3.1:exesql);sparkSession.sql(SHOW DATABASES).show();long zhenyang3 System.currentTimeMillis();DatasetRow sqlDF sparkSession.sql(exesql);System.out.println(sparkSession sql:(System.currentTimeMillis()-zhenyang3));System.out.println(4);//System.out.println(sqlDF count:sqlDF.count());//sqlDF.show();long zhenyang5 System.currentTimeMillis();ListRow jaList sqlDF.javaRDD().collect();System.out.println(rdd collect cost:(System.currentTimeMillis()-zhenyang5));System.out.println(jaList list:jaList.size());ListTaskListModel list new ArrayListTaskListModel();long zhenyang4 System.currentTimeMillis();AtomicInteger i new AtomicInteger(0);jaList.stream().forEachOrdered(result - {i.incrementAndGet();//System.out.println(serial_num is :result.getString(1));});System.out.println(for each times:i.get());System.out.println(SparkDemo foreach cost:(System.currentTimeMillis()-zhenyang4));System.out.println(5);sparkSession.close();sparkSession.stop();}
三 发布一二中的程序逻辑见前面的博客文章
四 测试
首先提交一个简单sql: select * from cimarronbp_n.p_vbar_metric_summary limit 10
查看k8s spark operator生成的pod 根据pod启动的时间可以看出先生成了2个executor在16s后又生成了1个最后完成可以看出executor确实根据任务的执行情况动态生成了。而之前文章中的executor 20个是同一时间生成的 再测试一个join的sql
select distinct t1.serial_num,t1.trans_seq,t2.state_name,t2.p_vbar_metric_summary,t1.event_date from cimarronbp_n.p_vbar_metric_summary t1 left join cimarronbp_n.p_vbar_metric_summary t2 on t1.serial_num t2.serial_num AND t1.trans_seq t2.trans_seq where t1.event_date20231204 and t1.family2TJ and t1.operationCAL2
查看k8s spark operator生成的pod executor从2个到3个3个到4个是动态的 五 本文参考链接
「Spark从精通到重新入门(二)」Spark中不可不知的动态资源分配-阿里云开发者社区资源是影响 Spark 应用执行效率的一个重要因素。Spark 应用中真正执行 task 的组件是 Executor可以通过spark.executor.instances 指定 Spark 应用的 Executor 的数量。在运行过程中无论 Executor上是否有 task 在执行都会被一直占有直到此 Spark 应用结束。https://developer.aliyun.com/article/832482
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/86441.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!