19
19
package org .apache .flink .yarn ;
20
20
21
21
import org .apache .flink .util .FlinkException ;
22
+ import org .apache .flink .util .jackson .JacksonMapperFactory ;
23
+
24
+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonFactoryBuilder ;
25
+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonParser ;
26
+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonProcessingException ;
27
+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
28
+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
29
+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .ObjectNode ;
22
30
23
31
import org .apache .hadoop .fs .FileStatus ;
24
32
import org .apache .hadoop .fs .Path ;
25
33
import org .apache .hadoop .yarn .api .records .LocalResource ;
26
34
import org .apache .hadoop .yarn .api .records .LocalResourceType ;
27
35
import org .apache .hadoop .yarn .api .records .LocalResourceVisibility ;
36
+ import org .slf4j .Logger ;
37
+ import org .slf4j .LoggerFactory ;
28
38
29
39
import java .util .Objects ;
30
- import java .util .regex .Matcher ;
31
- import java .util .regex .Pattern ;
32
40
33
41
import static org .apache .flink .util .Preconditions .checkNotNull ;
34
42
38
46
* {@link Utils#createTaskExecutorContext}.
39
47
*/
40
48
class YarnLocalResourceDescriptor {
49
+ private static final Logger LOG = LoggerFactory .getLogger (YarnLocalResourceDescriptor .class );
41
50
42
- private static final String STRING_FORMAT =
43
- "YarnLocalResourceDescriptor{"
44
- + "key=%s, path=%s, size=%d, modificationTime=%d, visibility=%s, type=%s}" ;
45
- private static final Pattern LOCAL_RESOURCE_DESC_FORMAT =
46
- Pattern .compile (
47
- "YarnLocalResourceDescriptor\\ {"
48
- + "key=(\\ S+), path=(\\ S+), size=([\\ d]+), modificationTime=([\\ d]+), visibility=(\\ S+), type=(\\ S+)}" );
51
+ private static final ObjectMapper OBJECT_MAPPER =
52
+ JacksonMapperFactory .createObjectMapper (
53
+ new JsonFactoryBuilder ().quoteChar ('\'' ).build ())
54
+ .enable (JsonParser .Feature .ALLOW_SINGLE_QUOTES );
49
55
50
56
private final String resourceKey ;
51
57
private final Path path ;
@@ -98,21 +104,33 @@ LocalResourceType getResourceType() {
98
104
}
99
105
100
106
static YarnLocalResourceDescriptor fromString (String desc ) throws Exception {
101
- Matcher m = LOCAL_RESOURCE_DESC_FORMAT .matcher (desc );
102
- boolean mat = m .find ();
103
- if (mat ) {
107
+ try {
108
+ JsonNode node = OBJECT_MAPPER .readTree (desc );
109
+ if (!validate (node )) {
110
+ throw new FlinkException ("Error to parse YarnLocalResourceDescriptor from " + desc );
111
+ }
104
112
return new YarnLocalResourceDescriptor (
105
- m . group ( 1 ),
106
- new Path (m . group ( 2 )),
107
- Long . parseLong ( m . group ( 3 ) ),
108
- Long . parseLong ( m . group ( 4 ) ),
109
- LocalResourceVisibility .valueOf (m . group ( 5 )),
110
- LocalResourceType .valueOf (m . group ( 6 )));
111
- } else {
112
- throw new FlinkException ("Error to parse YarnLocalResourceDescriptor from " + desc );
113
+ node . get ( "resourceKey" ). asText ( ),
114
+ new Path (node . get ( "path" ). asText ( )),
115
+ node . get ( "size" ). asLong ( ),
116
+ node . get ( "modificationTime" ). asLong ( ),
117
+ LocalResourceVisibility .valueOf (node . get ( "visibility" ). asText ( )),
118
+ LocalResourceType .valueOf (node . get ( "resourceType" ). asText ( )));
119
+ } catch ( JsonProcessingException e ) {
120
+ throw new FlinkException ("Error to parse YarnLocalResourceDescriptor from " + desc , e );
113
121
}
114
122
}
115
123
124
+ private static boolean validate (JsonNode node ) {
125
+ return !node .isNull ()
126
+ && node .hasNonNull ("resourceKey" )
127
+ && node .hasNonNull ("path" )
128
+ && node .hasNonNull ("size" )
129
+ && node .hasNonNull ("modificationTime" )
130
+ && node .hasNonNull ("visibility" )
131
+ && node .hasNonNull ("resourceType" );
132
+ }
133
+
116
134
static YarnLocalResourceDescriptor fromFileStatus (
117
135
final String key ,
118
136
final FileStatus fileStatus ,
@@ -132,14 +150,20 @@ static YarnLocalResourceDescriptor fromFileStatus(
132
150
133
151
@ Override
134
152
public String toString () {
135
- return String .format (
136
- STRING_FORMAT ,
137
- resourceKey ,
138
- path .toString (),
139
- size ,
140
- modificationTime ,
141
- visibility ,
142
- resourceType );
153
+ try {
154
+ ObjectNode node = OBJECT_MAPPER .createObjectNode ();
155
+ node .put ("resourceKey" , resourceKey );
156
+ node .put ("path" , path .toString ());
157
+ node .put ("size" , size );
158
+ node .put ("modificationTime" , modificationTime );
159
+ node .put ("visibility" , visibility .toString ());
160
+ node .put ("resourceType" , resourceType .toString ());
161
+ return OBJECT_MAPPER .writeValueAsString (node );
162
+ } catch (JsonProcessingException e ) {
163
+ LOG .error ("Could not serialize YarnLocalResourceDescriptor to String." , e );
164
+ throw new RuntimeException (
165
+ "Could not serialize YarnLocalResourceDescriptor[%s] to String." , e );
166
+ }
143
167
}
144
168
145
169
@ Override
0 commit comments