@@ -62,44 +62,35 @@ def parse_labels(labels_string: str, openmetrics: bool = False) -> Dict[str, str
62
62
# The label name is before the equal, or if there's no equal, that's the
63
63
# metric name.
64
64
65
- term , sub_labels = _next_term (sub_labels , openmetrics )
66
- if not term :
65
+ name_term , value_term , sub_labels = _next_term (sub_labels , openmetrics )
66
+ if not value_term :
67
67
if openmetrics :
68
68
raise ValueError ("empty term in line: " + labels_string )
69
69
continue
70
70
71
- quoted_name = False
72
- operator_pos = _next_unquoted_char (term , '=' )
73
- if operator_pos == - 1 :
74
- quoted_name = True
75
- label_name = "__name__"
76
- else :
77
- value_start = _next_unquoted_char (term , '=' )
78
- label_name , quoted_name = _unquote_unescape (term [:value_start ])
79
- term = term [value_start + 1 :]
71
+ label_name , quoted_name = _unquote_unescape (name_term )
80
72
81
73
if not quoted_name and not _is_valid_legacy_metric_name (label_name ):
82
74
raise ValueError ("unquoted UTF-8 metric name" )
83
75
84
76
# Check for missing quotes
85
- term = term .strip ()
86
- if not term or term [0 ] != '"' :
77
+ if not value_term or value_term [0 ] != '"' :
87
78
raise ValueError
88
79
89
80
# The first quote is guaranteed to be after the equal.
90
- # Find the last unescaped quote.
81
+ # Make sure that the next unescaped quote is the last character .
91
82
i = 1
92
- while i < len (term ):
93
- i = term .index ('"' , i )
94
- if not _is_character_escaped (term [:i ], i ):
83
+ while i < len (value_term ):
84
+ i = value_term .index ('"' , i )
85
+ if not _is_character_escaped (value_term [:i ], i ):
95
86
break
96
87
i += 1
97
-
98
88
# The label value is between the first and last quote
99
89
quote_end = i + 1
100
- if quote_end != len (term ):
90
+ if quote_end != len (value_term ):
101
91
raise ValueError ("unexpected text after quote: " + labels_string )
102
- label_value , _ = _unquote_unescape (term [:quote_end ])
92
+
93
+ label_value , _ = _unquote_unescape (value_term )
103
94
if label_name == '__name__' :
104
95
_validate_metric_name (label_name )
105
96
else :
@@ -112,11 +103,10 @@ def parse_labels(labels_string: str, openmetrics: bool = False) -> Dict[str, str
112
103
raise ValueError ("Invalid labels: " + labels_string )
113
104
114
105
115
- def _next_term (text : str , openmetrics : bool ) -> Tuple [str , str ]:
116
- """Extract the next comma-separated label term from the text.
117
-
118
- Returns the stripped term and the stripped remainder of the string,
119
- including the comma.
106
+ def _next_term (text : str , openmetrics : bool ) -> Tuple [str , str , str ]:
107
+ """Extract the next comma-separated label term from the text. The results
108
+ are stripped terms for the label name, label value, and then the remainder
109
+ of the string including the final , or }.
120
110
121
111
Raises ValueError if the term is empty and we're in openmetrics mode.
122
112
"""
@@ -125,41 +115,48 @@ def _next_term(text: str, openmetrics: bool) -> Tuple[str, str]:
125
115
if text [0 ] == ',' :
126
116
text = text [1 :]
127
117
if not text :
128
- return "" , ""
118
+ return "" , "" , ""
129
119
if text [0 ] == ',' :
130
120
raise ValueError ("multiple commas" )
131
- splitpos = _next_unquoted_char (text , ',}' )
121
+
122
+ splitpos = _next_unquoted_char (text , '=,}' )
123
+ if splitpos >= 0 and text [splitpos ] == "=" :
124
+ labelname = text [:splitpos ]
125
+ text = text [splitpos + 1 :]
126
+ splitpos = _next_unquoted_char (text , ',}' )
127
+ else :
128
+ labelname = "__name__"
129
+
132
130
if splitpos == - 1 :
133
131
splitpos = len (text )
134
132
term = text [:splitpos ]
135
133
if not term and openmetrics :
136
134
raise ValueError ("empty term:" , term )
137
135
138
- sublabels = text [splitpos :]
139
- return term .strip (), sublabels .strip ()
136
+ rest = text [splitpos :]
137
+ return labelname , term .strip (), rest .strip ()
140
138
141
139
142
- def _next_unquoted_char (text : str , chs : str , startidx : int = 0 ) -> int :
140
+ def _next_unquoted_char (text : str , chs : Optional [ str ] , startidx : int = 0 ) -> int :
143
141
"""Return position of next unquoted character in tuple, or -1 if not found.
144
142
145
143
It is always assumed that the first character being checked is not already
146
144
inside quotes.
147
145
"""
148
- i = startidx
149
146
in_quotes = False
150
147
if chs is None :
151
148
chs = string .whitespace
152
- while i < len (text ):
153
- if text [i ] == '"' and not _is_character_escaped (text , i ):
149
+
150
+ for i , c in enumerate (text [startidx :]):
151
+ if c == '"' and not _is_character_escaped (text , startidx + i ):
154
152
in_quotes = not in_quotes
155
153
if not in_quotes :
156
- if text [i ] in chs :
157
- return i
158
- i += 1
154
+ if c in chs :
155
+ return startidx + i
159
156
return - 1
160
157
161
158
162
- def _last_unquoted_char (text : str , chs : str ) -> int :
159
+ def _last_unquoted_char (text : str , chs : Optional [ str ] ) -> int :
163
160
"""Return position of last unquoted character in list, or -1 if not found."""
164
161
i = len (text ) - 1
165
162
in_quotes = False
@@ -253,7 +250,7 @@ def _parse_sample(text):
253
250
value , timestamp = _parse_value_and_timestamp (remaining_text )
254
251
return Sample (name , {}, value , timestamp )
255
252
name = text [:label_start ].strip ()
256
- label_end = _next_unquoted_char (text , '}' )
253
+ label_end = _next_unquoted_char (text [ label_start :] , '}' ) + label_start
257
254
labels = parse_labels (text [label_start + 1 :label_end ], False )
258
255
if not name :
259
256
# Name might be in the labels
0 commit comments