Skip to content

Commit 67cb0c2

Browse files
authored
Using couler-proj/couler (#3075)
* migrate couler-proj/couler * remove old code * update * fix ci * update * update * remove unnecessary file * debug ci * update * test ci * fix ci * fix ci * fix ci * fix invalide env * escape couler env value
1 parent 7cc284f commit 67cb0c2

File tree

8 files changed

+127
-126
lines changed

8 files changed

+127
-126
lines changed

docker/dev/build.sh

+9-4
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ echo "Build sqlflowserver, sqlflow, and step into $SQLFLOW_BIN ..."
2929
go generate ./...
3030
GOBIN=$SQLFLOW_BIN go install ./...
3131

32-
echo "Build $SQLFLOWPATH/python/couler into $SQLFLOW_BIN ..."
33-
cd $SQLFLOWPATH/python/couler
34-
python setup.py bdist_wheel -q --dist-dir $SQLFLOW_BIN > /dev/null
35-
3632
echo "Build Fluid ..."
3733
cd $SQLFLOW_BIN
3834
if [[ ! -d fluid ]]; then
@@ -88,6 +84,15 @@ git fetch origin # The residual local repo might not be on a branch.
8884
git checkout 5dc6421f562ea447e501fa355a48a6ee89856a1d
8985
python setup.py bdist_wheel -q --dist-dir $SQLFLOW_BIN > /dev/null
9086

87+
echo "Build couler-proj/couler ..."
88+
cd $SQLFLOW_BIN
89+
if [[ ! -d couler ]]; then
90+
git clone https://github.com/couler-proj/couler.git
91+
fi
92+
cd couler
93+
git checkout 9c93f72791ebf5411fc5bec7d68890de753d8431
94+
python setup.py bdist_wheel -q --dist-dir $SQLFLOW_BIN > /dev/null
95+
9196
echo "Convert tutorials from Markdown to IPython notebooks ..."
9297
mkdir -p $SQLFLOW_BIN/tutorial
9398
for file in $SQLFLOWPATH/doc/tutorial/*.md; do

go/codegen/experimental/codegen_couler.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,9 @@ func CodeGenCouler(stepList []*stepContext, session *pb.Session) (string, error)
131131
func GetPyFuncBody(program string, funcName string) (string, error) {
132132
const coulerGetPyFuncCodeImpl = `
133133
%s
134-
135-
import couler.pyfunc as pyfunc
136-
print(pyfunc.body(%s))
134+
from couler.core.utils import body as pybody
135+
import sys
136+
print(pybody(%s), file=sys.stderr)
137137
`
138138

139139
tmpFile, err := ioutil.TempFile("/tmp", "sqlflow-couler-tmp")
@@ -157,20 +157,24 @@ print(pyfunc.body(%s))
157157
if err := cmd.Run(); err != nil {
158158
return "", fmt.Errorf("%v: %s\nCode is:\n%s", err, stderr, coulerCode)
159159
}
160-
return strings.TrimSpace(stdout.String()), nil
160+
return strings.TrimSpace(stderr.String()), nil
161161
}
162162

163163
const coulerCodeTmpl = `
164164
import couler.argo as couler
165-
import couler.pyfunc as pyfunc
165+
from couler.core.utils import body as pybody
166166
from os import path
167167
import json
168168
import re
169169
170170
datasource = "{{ .DataSource }}"
171+
workflow_ttl = {{.WorkflowTTL}}
171172
173+
# it's bug of the couler project, that needs "" on integer environment variable value to avoid the
174+
# workflow failed: "invalid spec: cannot convert int64 to string"
175+
# issue: https://github.com/couler-proj/couler/issues/108
172176
step_envs = dict()
173-
{{range $k, $v := .StepEnvs}}step_envs["{{$k}}"] = '''{{$v}}'''
177+
{{range $k, $v := .StepEnvs}}step_envs["{{$k}}"] = '''"{{$v}}"'''
174178
{{end}}
175179
176180
sqlflow_secret = None
@@ -184,8 +188,6 @@ resources = None
184188
if '''{{.Resources}}''' != "":
185189
resources=json.loads('''{{.Resources}}''')
186190
187-
couler.clean_workflow_after_seconds_finished({{.WorkflowTTL}})
188-
189191
step_log_file = "{{.StepLogFile}}"
190192
step_exit_time_wait = {{.StepExitTimeWait}}
191193
@@ -194,7 +196,7 @@ step_exit_time_wait = {{.StepExitTimeWait}}
194196
195197
codes = [
196198
"python <<EOF",
197-
pyfunc.body(step_entry_{{.StepIndex}}),
199+
pybody(step_entry_{{.StepIndex}}),
198200
"EOF",
199201
]
200202
@@ -216,6 +218,7 @@ if step_log_file:
216218
]
217219
218220
couler.run_script(image="{{.Image}}", command="bash", source="\n".join(codes), env=step_envs, resources=resources)
221+
couler.config_workflow(time_to_clean=workflow_ttl)
219222
{{end}}
220223
`
221224

go/workflow/couler/codegen.go

+30-22
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"bytes"
1818
"encoding/json"
1919
"fmt"
20+
"io/ioutil"
2021
"os"
2122
"os/exec"
2223
"reflect"
@@ -46,8 +47,6 @@ func fillEnvFromSession(envs *map[string]string, session *pb.Session) {
4647
fillMapIfValueNotEmpty(*envs, "SQLFLOW_HADOOP_USER", session.HdfsUser)
4748
fillMapIfValueNotEmpty(*envs, "SQLFLOW_HADOOP_PASS", session.HdfsUser)
4849
fillMapIfValueNotEmpty(*envs, "SQLFLOW_submitter", session.Submitter)
49-
fillMapIfValueNotEmpty(*envs, "SQLFLOW_WORKFLOW_STEP_LOG_FILE", os.Getenv("SQLFLOW_WORKFLOW_STEP_LOG_FILE"))
50-
fillMapIfValueNotEmpty(*envs, "SQLFLOW_WORKFLOW_EXIT_TIME_WAIT", os.Getenv("SQLFLOW_WORKFLOW_EXIT_TIME_WAIT"))
5150
}
5251

5352
// GetStepEnvs returns a map of envs used for couler workflow.
@@ -100,6 +99,14 @@ func GetSecret() (string, string, error) {
10099
return name, string(value), nil
101100
}
102101

102+
func escapeStepSQL(sql string) string {
103+
sql = strings.Replace(sql, `\`, `\\\`, -1)
104+
sql = strings.Replace(sql, `"`, `\\\"`, -1)
105+
sql = strings.Replace(sql, "`", "\\`", -1)
106+
sql = strings.Replace(sql, "$", "\\$", -1)
107+
return sql
108+
}
109+
103110
// GenFiller generates Filler to fill the template
104111
func GenFiller(programIR []ir.SQLFlowStmt, session *pb.Session) (*Filler, error) {
105112
stepEnvs, err := GetStepEnvs(session)
@@ -123,13 +130,14 @@ func GenFiller(programIR []ir.SQLFlowStmt, session *pb.Session) (*Filler, error)
123130
stepLogFile := os.Getenv("SQLFLOW_WORKFLOW_STEP_LOG_FILE")
124131

125132
r := &Filler{
126-
DataSource: session.DbConnStr,
127-
StepEnvs: stepEnvs,
128-
WorkflowTTL: workflowTTL,
129-
SecretName: secretName,
130-
SecretData: secretData,
131-
Resources: os.Getenv(envResource),
132-
StepLogFile: stepLogFile,
133+
DataSource: session.DbConnStr,
134+
StepEnvs: stepEnvs,
135+
WorkflowTTL: workflowTTL,
136+
SecretName: secretName,
137+
SecretData: secretData,
138+
Resources: os.Getenv(envResource),
139+
StepLogFile: stepLogFile,
140+
ClusterConfigFn: os.Getenv("SQLFLOW_WORKFLOW_CLUSTER_CONFIG"),
133141
}
134142
// NOTE(yancey1989): does not use ModelImage here since the Predict statement
135143
// does not contain the ModelImage field in SQL Program IR.
@@ -138,11 +146,12 @@ func GenFiller(programIR []ir.SQLFlowStmt, session *pb.Session) (*Filler, error)
138146
}
139147

140148
for _, sqlIR := range programIR {
149+
escapedSQL := escapeStepSQL(sqlIR.GetOriginalSQL())
141150
switch i := sqlIR.(type) {
142151
case *ir.NormalStmt, *ir.PredictStmt, *ir.ExplainStmt, *ir.EvaluateStmt:
143152
// TODO(typhoonzero): get model image used when training.
144153
sqlStmt := &sqlStatement{
145-
OriginalSQL: sqlIR.GetOriginalSQL(), IsExtendedSQL: sqlIR.IsExtended(),
154+
OriginalSQL: escapedSQL, IsExtendedSQL: sqlIR.IsExtended(),
146155
DockerImage: defaultDockerImage}
147156
r.SQLStatements = append(r.SQLStatements, sqlStmt)
148157
case *ir.TrainStmt:
@@ -158,20 +167,20 @@ func GenFiller(programIR []ir.SQLFlowStmt, session *pb.Session) (*Filler, error)
158167
r.SQLStatements = append(r.SQLStatements, sqlStmt)
159168
} else {
160169
sqlStmt := &sqlStatement{
161-
OriginalSQL: sqlIR.GetOriginalSQL(), IsExtendedSQL: sqlIR.IsExtended(),
170+
OriginalSQL: escapedSQL, IsExtendedSQL: sqlIR.IsExtended(),
162171
DockerImage: stepImage}
163172
r.SQLStatements = append(r.SQLStatements, sqlStmt)
164173
}
165174
case *ir.ShowTrainStmt, *ir.OptimizeStmt:
166175
sqlStmt := &sqlStatement{
167-
OriginalSQL: sqlIR.GetOriginalSQL(),
176+
OriginalSQL: escapedSQL,
168177
IsExtendedSQL: sqlIR.IsExtended(),
169178
DockerImage: defaultDockerImage,
170179
}
171180
r.SQLStatements = append(r.SQLStatements, sqlStmt)
172181
case *ir.RunStmt:
173182
sqlStmt := &sqlStatement{
174-
OriginalSQL: sqlIR.GetOriginalSQL(),
183+
OriginalSQL: escapedSQL,
175184
IsExtendedSQL: sqlIR.IsExtended(),
176185
DockerImage: i.ImageName,
177186
Select: i.Select,
@@ -199,16 +208,15 @@ func GenCode(programIR []ir.SQLFlowStmt, session *pb.Session) (string, error) {
199208

200209
// GenYAML translate the Couler program into Argo YAML
201210
func GenYAML(coulerProgram string) (string, error) {
202-
cmdline := bytes.Buffer{}
203-
fmt.Fprintf(&cmdline, "couler run --mode argo --workflow_name sqlflow ")
204-
if c := os.Getenv("SQLFLOW_WORKFLOW_CLUSTER_CONFIG"); len(c) > 0 {
205-
fmt.Fprintf(&cmdline, "--cluster_config %s ", c)
211+
file, e := ioutil.TempFile("/tmp", "sqlflow.py")
212+
if e != nil {
213+
return "", e
206214
}
207-
fmt.Fprintf(&cmdline, "--file -")
208-
209-
coulerExec := strings.Split(cmdline.String(), " ")
210-
// execute command: `cat sqlflow.couler | couler run --mode argo --workflow_name sqlflow --file -`
211-
cmd := exec.Command(coulerExec[0], coulerExec[1:]...)
215+
defer os.Remove(file.Name())
216+
if e := ioutil.WriteFile(file.Name(), []byte(coulerProgram), 0644); e != nil {
217+
return "", e
218+
}
219+
cmd := exec.Command("python", file.Name())
212220
cmd.Env = append(os.Environ())
213221
cmd.Stdin = strings.NewReader(coulerProgram)
214222
out, err := cmd.CombinedOutput()

go/workflow/couler/codegen_test.go

+17-24
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package couler
1515

1616
import (
17+
"fmt"
1718
"io/ioutil"
1819
"os"
1920
"regexp"
@@ -31,11 +32,11 @@ class K8s(object):
3132
def __init__(self):
3233
pass
3334
34-
def with_pod(self, template):
35+
def config_pod(self, template):
3536
self._with_tolerations(template)
3637
return template
3738
38-
def with_workflow_spec(self, spec):
39+
def config_workflow(self, spec):
3940
spec["hostNetwork"] = True
4041
return spec
4142
@@ -67,9 +68,12 @@ spec:
6768
container:
6869
image: docker/whalesay
6970
command:
70-
- bash
71-
- -c
7271
- 'echo "SQLFlow bridges AI and SQL engine."'
72+
env:
73+
- name: NVIDIA_VISIBLE_DEVICES
74+
value: ""
75+
- name: NVIDIA_DRIVER_CAPABILITIES
76+
value: ""
7377
tolerations:
7478
- effect: NoSchedule
7579
key: key
@@ -81,6 +85,7 @@ spec:
8185
var testCoulerProgram = `
8286
import couler.argo as couler
8387
couler.run_container(image="docker/whalesay", command='echo "SQLFlow bridges AI and SQL engine."')
88+
couler.config_workflow(cluster_config_file="%s")
8489
`
8590

8691
func TestCoulerCodegen(t *testing.T) {
@@ -92,20 +97,15 @@ func TestCoulerCodegen(t *testing.T) {
9297
defer os.Unsetenv("SQLFLOW_OSS_AK")
9398
code, err := GenCode(sqlIR, &pb.Session{})
9499
a.NoError(err)
95-
96-
r, e := regexp.Compile(`steps.sqlflow\(sql=r'''(.*);''', `)
97-
a.NoError(e)
98-
a.Equal(r.FindStringSubmatch(code)[1], "SELECT * FROM iris.train limit 10")
99-
a.True(strings.Contains(code, `step_envs["SQLFLOW_OSS_AK"] = '''oss_key'''`))
100+
a.True(strings.Contains(code, `SELECT * FROM iris.train limit 10`))
101+
a.True(strings.Contains(code, `step_envs["SQLFLOW_OSS_AK"] = '''"%s"''' % escape_env('''oss_key''')`))
100102
a.False(strings.Contains(code, `step_envs["SQLFLOW_WORKFLOW_SECRET"]`))
101-
a.True(strings.Contains(code, `couler.clean_workflow_after_seconds_finished(86400)`))
102-
a.True(strings.Contains(code, `couler.secret(secret_data, name="sqlflow-secret", dry_run=True)`))
103+
a.True(strings.Contains(code, `couler.create_secret(secret_data, name="sqlflow-secret", dry_run=True)`))
103104
a.True(strings.Contains(code, `resources=json.loads('''{"memory": "32Mi", "cpu": "100m"}''')`))
104105

105-
_, e = GenYAML(code)
106106
yaml, e := GenYAML(code)
107107
a.NoError(e)
108-
r, e = regexp.Compile(`step -e "(.*);"`)
108+
r, e := regexp.Compile(`step -e "(.*);"`)
109109
a.NoError(e)
110110
a.Equal("SELECT * FROM iris.train limit 10", r.FindStringSubmatch(yaml)[1])
111111
a.NoError(e)
@@ -116,7 +116,6 @@ func TestCoulerCodegen(t *testing.T) {
116116
a.NoError(err)
117117
r, e = regexp.Compile(`step_log_file = "(.*)"`)
118118
a.True(strings.Contains(code, `step_log_file = "/home/admin/logs/step.log"`))
119-
a.True(strings.Contains(code, "log_file=step_log_file"))
120119

121120
yaml, e = GenYAML(code)
122121
a.NoError(e)
@@ -125,10 +124,6 @@ func TestCoulerCodegen(t *testing.T) {
125124
a.Equal("/home/admin/logs", r.FindStringSubmatch(yaml)[1])
126125
a.Equal("/home/admin/logs/step.log", r.FindStringSubmatch(yaml)[3])
127126
a.NoError(e)
128-
129-
r, e = regexp.Compile("- name: SQLFLOW_WORKFLOW_STEP_LOG_FILE\n.*value: '(.*)'")
130-
a.NoError(e)
131-
a.Equal("/home/admin/logs/step.log", r.FindStringSubmatch(yaml)[1])
132127
}
133128

134129
func TestCoulerCodegenSpecialChars(t *testing.T) {
@@ -158,8 +153,9 @@ func TestStringInStringSQL(t *testing.T) {
158153
a.NoError(err)
159154
yaml, e := GenYAML(code)
160155
a.NoError(e)
161-
expect := `validation.select=\\\"select * from iris.train where\
162-
\ name like \\\\\\\"Versicolor\\\\\\\";\\\"`
156+
expect := `validation.select=\\\"select * from iris.train where\`
157+
a.True(strings.Contains(yaml, expect))
158+
expect = `\ name like \\\\\\\"Versicolor\\\\\\\";\\\"`
163159
a.True(strings.Contains(yaml, expect))
164160
}
165161

@@ -177,11 +173,8 @@ func TestCompileCoulerProgram(t *testing.T) {
177173
a.NoError(e)
178174
defer os.Remove(cfFileName)
179175

180-
os.Setenv("SQLFLOW_WORKFLOW_CLUSTER_CONFIG", cfFileName)
181-
defer os.Unsetenv("SQLFLOW_WORKFLOW_CLUSTER_CONFIG")
182-
out, e := GenYAML(testCoulerProgram)
176+
out, e := GenYAML(fmt.Sprintf(testCoulerProgram, cfFileName))
183177
a.NoError(e)
184-
185178
a.Equal(expectedArgoYAML, out)
186179
}
187180

0 commit comments

Comments
 (0)