使用AirFlow调度MaxCompute

简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

背景

airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

一、环境准备

  • Python 2.7.5  PyODPS支持Python2.6以上版本
  • Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10  # 可选,安装后能加速Tunnel上传。

pip install cython>=0.19.0  # 可选,不建议Windows用户安装。

pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本

2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

# -*- coding: UTF-8 -*-

import sys

import os

from odps import ODPS

from odps import options

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta

from configparser import ConfigParser

import time

reload(sys)

sys.setdefaultencoding('utf8')

#修改系统默认编码。

# MaxCompute参数设置

options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}

cfg = ConfigParser()

cfg.read("odps.ini")

print(cfg.items())

odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

default_args = {

   'owner': 'airflow',

   'depends_on_past': False,

   'retry_delay': timedelta(minutes=5),

   'start_date':datetime(2020,1,15)

   # 'email': ['airflow@example.com'],

   # 'email_on_failure': False,

   # 'email_on_retry': False,

   # 'retries': 1,

   # 'queue': 'bash_queue',

   # 'pool': 'backfill',

   # 'priority_weight': 10,

   # 'end_date': datetime(2016, 1, 1),

}

dag = DAG(

   'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))

def read_sql(sqlfile):

   with io.open(sqlfile, encoding='utf-8', mode='r') as f:

       sql=f.read()

   f.closed

   return sql

def get_time():

   print '当前时间是{}'.format(time.time())

   return time.time()

def mc_job ():

   project = odps.get_project()  # 取到默认项目。

   instance=odps.run_sql("select * from long_chinese;")

   print(instance.get_logview_address())

   instance.wait_for_success()

   with instance.open_reader() as reader:

       count = reader.count

   print("查询表数据条数:{}".format(count))

   for record in reader:

       print record

   return count

t1 = PythonOperator (

   task_id = 'get_time' ,

   provide_context = False ,

   python_callable = get_time,

   dag = dag )

t2 = PythonOperator (

   task_id = 'mc_job' ,

   provide_context = False ,

   python_callable = mc_job ,

   dag = dag )

t2.set_upstream(t1)

2.提交

python Airiflow_MC.py

3.进行测试

# print the list of active DAGs

airflow list_dags

# prints the list of tasks the "tutorial" dag_id

airflow list_tasks Airiflow_MC

# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks Airiflow_MC --tree

#测试task

airflow test Airiflow_MC get_time 2010-01-16

airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务

登录到web界面点击按钮运行

03.png

5.查看任务运行结果

1.点击view log

04.png

2.查看结果

 原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512726.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css让背景图片显示透明遮罩_CSS项目测试(支持深色模式)

*事先声明:本文章教程教学、文章封面来源自[CSS]聚光灯项目 by CodingStartup起码课,且已经CodingStartup起码课授权允许转载!为了保持原有风格,本文始终保持与CodingStartup起码课的视频风格一致*在出现同样的代码块时&#xff0…

一文读懂阿里云直播技术是如何实现的

简介: 东京奥运会已落下帷幕。比赛期间,全球亿万观众蜂拥至各大转播平台观看奥运赛事,平台直播能力显的尤为重要。阿里云作为视频直播平台的技术提供商,凭借在产品技术、资源带宽、服务保障等方面优势,可为各大转播平台…

低代码发展专访系列之七:低代码的火爆需要不一样的声音么?

编辑 | 曹芊芊话题:低代码发展系列专访前言:2019年开始,低代码爆火。有人认为它是第四代编程语言,有人认为它是开发模式的颠覆,也有人认为是企业管理模式的变革……有很多声音,社区讨论很热烈。CSDN随后展开…

java 自定义 钩子_使用现有钩子的函数不能使用自定义do_action钩子

我知道add_action用于在钩子的位置调用函数 . 我使用了下面的add_action,它给了我需要的输出(这是WooCommerce中产品的价格):add_action( woocommerce_before_variations_form, woocommerce_single_variation, 10 );我在我构建的自定义插件中使用了上面的…

启动、内存、卡顿三大分析,用户体验就用它?

简介: 启动分析支持通过预置采集和个性化自定义两种方式定义启动阶段,可以分别查询首次启动、冷启动、热启动的情况效果,并可以与设备、系统、版本、地域等维度做交叉筛选查询。 随着大量应用涌入市场加入“App内卷之战”,终端用…

adb echo shell 覆盖_一次写shell脚本的经历记录

点击上方“我的小碗汤”,选择“置顶公众号”精品文章,第一时间送达redis在容器化的过程中,涉及到纵向扩pod实例cpu、内存以及redis实例的maxmemory值,statefulset管理的pod需要重启。所以把redis集群的状态检查放到了健康检查中&a…

当新零售遇上 Serverless

简介: Serverless 的出现给传统企业数字化转型带了更多机遇。 某零售商超行业的龙头企业,其主要业务涵盖购物中心、大卖场、综合超市、标准超市、精品超市、便利店及无人值守智慧商店等零售业态,涉及全渠道零售、仓储物流、餐饮、消费服务、…

如果还不懂如何使用 Consumer 接口,就来看这篇!

作者 | 阿Q来源 | 阿Q说代码背景在开发过程中我遇到这么一个问题:表结构:一张主表A ,一张关联表B ,表 A 中存储着表 B 记录的状态。场景:第一步创建主表数据,插入A表;第二步调用第三方接口插入B…

java实验册_Java实验报告册Java实验报告册.doc

Java实验报告册Java实验报告册《面向对象程序设计》实验报告与习题册2013 / 2014 学年 第1学期系 别 计算机科学与技术系专 业 班 级 姓 名 指导教师目 录项 目成 绩批改时间实验一 开发环境的搭建及使用实验二 JAVA程序设计基本语法实验三 面向对象程序设计实验四 输入和输出流…

学python就业到底如何_学习python后,就业如何?

Python技术相关人才,正是迎合了目前人工智能时代的发展趋势,Python作为人工智能的首选语言,其发展前景那是大大的好。学习python原因1、python是脚本语言,作为程序员至少应该掌握一本通用脚本语言,因为脚本语言与编译语…

京东:Flink SQL 优化实战

简介: 本文着重从 shuffle、join 方式的选择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 任务方面做的优化措施。 本文作者为京东算法服务部的张颖和段学浩,并由 Apache Hive PMC,阿里巴巴技术专家李锐帮忙校对。主要内容为&#xff1…

Spring Boot参数校验以及分组校验的使用

简介: 做web开发基本上每个接口都要对参数进行校验,如果参数比较少,还比较容易处理,一但参数比较多了的话代码中就会出现大量的if-else语句。虽然这种方式简单直接,但会大大降低开发效率和代码可读性。所以我们可以使用…

fetch() php,PHP:使用fetch()发送数据

我试图使用fetch()发送一些数据,但作为回报,我得到SyntaxError:意外的标记,在JSON中的位置23这就是我要做的fetch(/api.php, {method: POST,body: JSON.stringify({nom : "Issa",prenom: "Oule"}),headers : {"Content-Type" : "applicatio…

python商品总价_【Python基础 | 列表】小实验:实现显示商品,选择商品,将商品加入购物车,得到总价格...

B站学习《Python爬虫技术5天速成》时遇到的小作业,原视频点我题目如下:代码如下:products [["iPhone", 6888], ["MBP", 14800], ["Coffee", 31], ["xiaomi6", 2499], ["Book", 60], [&qu…

长文解析:作为容器底层技术的半壁江山, cgroup如何突破并发创建瓶颈?

简介: io_uring 作为一种新型高性能异步编程框架,代表着 Linux 内核未来的方向,当前仍处于快速发展中。阿里云联合 InfoQ 发起《io_uring 介绍及应用实践》的技术公开课,围绕 OpenAnolis 龙蜥社区 Anolis OS 8 全方位解析高性能存…

Orion:谷歌的新一代SDN控制器

作者 | 魏煌松来源 | 鲜枣课堂时至今日,谷歌在2015年公布的成果,“利用SDN将广域网带宽利用率提升至接近100%”,仍然是SDN的一个标杆案列,也是难以逾越的巅峰。但事实上,当时使用的SDN控制器Onix,早已退出了…

移动云正式发布基于龙蜥 Anolis OS 的 BC-Linux V8.2 通用版操作系统

简介: 2020年12月CentOS项目组宣布CentOS 8将于2021年12月31日结束支持,这意味着从2022年开始,使用CentOS 8的用户,将无法得到来自官方的新硬件支持、bug修复和安全补丁。针对这一情况,移动云大云操作系统团队基于国内…

php _trait,php Trait的使用

1、php中的trait是啥?看上去既像类又像接口,其实都不是,Trait可以看做类的部分实现,可以混入一个或多个现有的PHP类中,其作用有两个:表明类可以做什么;提供模块化实现。Trait是一种代码复用技术&#xff0…

python业余项目_学会这8个优秀 Python 库用于业余项目,将大大减少程序员耗费的精力...

在数据库中即时保存数据:Dataset当我们想要在不知道最终数据库表长什么样的情况下,快速收集数据并保存到数据库中的时候,Dataset 库将是我们的最佳选择。Dataset 库有一个简单但功能强大的 API,因此我们可以很容易的把数据保存下来…

java 绘图

文章目录 Java绘图 Java绘图类 绘图颜色与画笔属性 设置颜色 设置画笔 绘制文本 显示图片 图像处理 1、放大与缩小 2、图像翻转 3、图像旋转 4、图像倾斜 End Java绘图 Java绘图是指在Java程序中创建和显示图形的过程。Java提供了许多类和方法来支持绘图。 Jav…