Skip to content

Commit 2d3e017

Browse files
committed
Add Airflow example with scale-out workers
1 parent e80633e commit 2d3e017

File tree

7 files changed

+201
-0
lines changed

7 files changed

+201
-0
lines changed

aws-ts-airflow/Pulumi.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
name: airflow
2+
runtime: nodejs

aws-ts-airflow/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
A Pulumi program to deploy an RDS Postgres instance and containerized Airflow
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
FROM puckel/docker-airflow
2+
3+
ADD dags /usr/local/airflow/dags
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""
2+
Code that goes along with the Airflow tutorial located at:
3+
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
4+
"""
5+
from airflow import DAG
6+
from airflow.operators.bash_operator import BashOperator
7+
from datetime import datetime, timedelta
8+
9+
10+
default_args = {
11+
'owner': 'airflow',
12+
'depends_on_past': False,
13+
'start_date': datetime(2015, 6, 1),
14+
'email': ['[email protected]'],
15+
'email_on_failure': False,
16+
'email_on_retry': False,
17+
'retries': 1,
18+
'retry_delay': timedelta(minutes=5),
19+
# 'queue': 'bash_queue',
20+
# 'pool': 'backfill',
21+
# 'priority_weight': 10,
22+
# 'end_date': datetime(2016, 1, 1),
23+
}
24+
25+
dag = DAG('tutorial', default_args=default_args)
26+
27+
# t1, t2 and t3 are examples of tasks created by instantiating operators
28+
t1 = BashOperator(
29+
task_id='print_date',
30+
bash_command='date',
31+
dag=dag)
32+
33+
t2 = BashOperator(
34+
task_id='sleep',
35+
bash_command='sleep 5',
36+
retries=3,
37+
dag=dag)
38+
39+
templated_command = """
40+
{% for i in range(5) %}
41+
echo "{{ ds }}"
42+
echo "{{ macros.ds_add(ds, 7)}}"
43+
echo "{{ params.my_param }}"
44+
{% endfor %}
45+
"""
46+
47+
t3 = BashOperator(
48+
task_id='templated',
49+
bash_command=templated_command,
50+
params={'my_param': 'Parameter I passed in'},
51+
dag=dag)
52+
53+
t2.set_upstream(t1)
54+
t3.set_upstream(t1)

aws-ts-airflow/index.ts

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import * as pulumi from "@pulumi/pulumi";
2+
import * as aws from "@pulumi/aws";
3+
import * as awscloud from "@pulumi/cloud-aws";
4+
5+
let config = new pulumi.Config("airflow");
6+
const dbPassword = config.require("dbPassword");
7+
8+
let securityGroupIds = [ awscloud.getCluster()!.securityGroupId! ];
9+
10+
let dbSubnets = new aws.rds.SubnetGroup("dbsubnets", {
11+
subnetIds: awscloud.getNetwork().subnetIds,
12+
});
13+
14+
let db = new aws.rds.Instance("postgresdb", {
15+
engine: "postgres",
16+
17+
instanceClass: "db.t2.micro",
18+
allocatedStorage: 20,
19+
20+
dbSubnetGroupName: dbSubnets.id,
21+
vpcSecurityGroupIds: securityGroupIds,
22+
23+
name: "airflow",
24+
username: "airflow",
25+
password: dbPassword,
26+
27+
skipFinalSnapshot: true,
28+
});
29+
30+
let cacheSubnets = new aws.elasticache.SubnetGroup("cachesubnets", {
31+
subnetIds: awscloud.getNetwork().subnetIds,
32+
});
33+
34+
let cacheCluster = new aws.elasticache.Cluster("cachecluster", {
35+
clusterId: "cache-" + pulumi.getStack(),
36+
engine: "redis",
37+
38+
nodeType: "cache.t2.micro",
39+
numCacheNodes: 1,
40+
41+
subnetGroupName: cacheSubnets.id,
42+
securityGroupIds: securityGroupIds,
43+
});
44+
45+
let environment = {
46+
"POSTGRES_HOST": db.endpoint.apply(e => e.split(":")[0]),
47+
"POSTGRES_PASSWORD": dbPassword,
48+
49+
"REDIS_HOST": cacheCluster.cacheNodes.apply(n => n[0].address),
50+
51+
"EXECUTOR": "Celery",
52+
};
53+
54+
let airflowController = new awscloud.Service("airflowcontroller", {
55+
containers: {
56+
"webserver": {
57+
build: "./airflow-container",
58+
ports: [{ port: 8080, external: true, protocol: "http" }],
59+
environment: environment,
60+
command: [ "webserver" ],
61+
},
62+
63+
"scheduler": {
64+
build: "./airflow-container",
65+
environment: environment,
66+
command: [ "scheduler" ],
67+
},
68+
},
69+
replicas: 1,
70+
});
71+
72+
let airflower = new awscloud.Service("airflower", {
73+
containers: {
74+
// If the container is named "flower", we create environment variables that start
75+
// with `FLOWER_` and Flower tries and fails to parse them as configuration.
76+
"notflower": {
77+
build: "./airflow-container",
78+
ports: [{ port: 5555, external: true, protocol: "http" }],
79+
environment: environment,
80+
command: [ "flower" ],
81+
},
82+
},
83+
});
84+
85+
let airflowWorkers = new awscloud.Service("airflowworkers", {
86+
containers: {
87+
"worker": {
88+
build: "./airflow-container",
89+
environment: environment,
90+
command: [ "worker" ],
91+
memory: 1024,
92+
},
93+
},
94+
replicas: 3,
95+
});
96+
97+
export let airflowEndpoint = airflowController.defaultEndpoint.apply(e => e.hostname);
98+
export let flowerEndpoint = airflower.defaultEndpoint.apply(e => e.hostname);

aws-ts-airflow/package.json

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"name": "airflow",
3+
"version": "0.1",
4+
"main": "bin/index.js",
5+
"typings": "bin/index.d.ts",
6+
"scripts": {
7+
"build": "tsc"
8+
},
9+
"devDependencies": {
10+
"@types/node": "^10.1.2",
11+
"typescript": "^2.8.3"
12+
},
13+
"dependencies": {
14+
"@pulumi/aws": "^0.13.0",
15+
"@pulumi/cloud": "^0.13.1",
16+
"@pulumi/cloud-aws": "^0.13.1",
17+
"@pulumi/pulumi": "^0.12.2"
18+
}
19+
}

aws-ts-airflow/tsconfig.json

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"compilerOptions": {
3+
"outDir": "bin",
4+
"target": "es6",
5+
"lib": [
6+
"es6"
7+
],
8+
"module": "commonjs",
9+
"moduleResolution": "node",
10+
"declaration": true,
11+
"sourceMap": true,
12+
"stripInternal": true,
13+
"experimentalDecorators": true,
14+
"pretty": true,
15+
"noFallthroughCasesInSwitch": true,
16+
"noImplicitAny": true,
17+
"noImplicitReturns": true,
18+
"forceConsistentCasingInFileNames": true,
19+
"strictNullChecks": true
20+
},
21+
"files": [
22+
"index.ts"
23+
]
24+
}

0 commit comments

Comments
 (0)