18
18
19
19
20
20
TARGET_SERVICE_NAME = b'host'
21
- FQDN = socket .getfqdn ().encode ('utf-8' )
21
+ FQDN = (
22
+ 'localhost' if sys .platform == 'darwin' else socket .getfqdn ()
23
+ ).encode ('utf-8' )
22
24
SERVICE_PRINCIPAL = TARGET_SERVICE_NAME + b'/' + FQDN
23
25
24
26
# disable error deferring to catch errors immediately
@@ -124,7 +126,8 @@ def setUp(self):
124
126
usage = 'both' )
125
127
def test_acquire_by_init (self , str_name , kwargs ):
126
128
creds = gsscreds .Credentials (name = self .name , ** kwargs )
127
- self .assertIsInstance (creds .lifetime , int )
129
+ if sys .platform != 'darwin' :
130
+ self .assertIsInstance (creds .lifetime , int )
128
131
del creds
129
132
130
133
@exist_perms (lifetime = 30 , mechs = [gb .MechType .kerberos ],
@@ -137,7 +140,8 @@ def test_acquire_by_method(self, str_name, kwargs):
137
140
creds , actual_mechs , ttl = cred_resp
138
141
self .assertIsInstance (creds , gsscreds .Credentials )
139
142
self .assertIn (gb .MechType .kerberos , actual_mechs )
140
- self .assertIsInstance (ttl , int )
143
+ if sys .platform != 'darwin' :
144
+ self .assertIsInstance (ttl , int )
141
145
142
146
del creds
143
147
@@ -165,9 +169,12 @@ def test_store_acquire(self):
165
169
self .assertIsNotNone (deleg_creds )
166
170
167
171
store_res = deleg_creds .store (usage = 'initiate' , set_default = True ,
172
+ mech = gb .MechType .kerberos ,
168
173
overwrite = True )
169
- self .assertEqual (store_res .usage , "initiate" )
170
- self .assertIn (gb .MechType .kerberos , store_res .mechs )
174
+ # While Heimdal doesn't fail it doesn't set the return values as exp.
175
+ if self .realm .provider .lower () != 'heimdal' :
176
+ self .assertEqual (store_res .usage , "initiate" )
177
+ self .assertIn (gb .MechType .kerberos , store_res .mechs )
171
178
172
179
reacquired_creds = gsscreds .Credentials (name = deleg_creds .name ,
173
180
usage = 'initiate' )
@@ -187,10 +194,18 @@ def test_store_into_acquire_from(self):
187
194
initial_creds = gsscreds .Credentials (name = None ,
188
195
usage = 'initiate' )
189
196
190
- store_res = initial_creds .store (store , overwrite = True )
197
+ acquire_kwargs = {}
198
+ expected_usage = 'initiate'
199
+ if self .realm .provider .lower () == 'heimdal' :
200
+ acquire_kwargs ['usage' ] = 'initiate'
201
+ acquire_kwargs ['mech' ] = gb .MechType .kerberos
202
+ expected_usage = 'both'
203
+
204
+ store_res = initial_creds .store (store , overwrite = True ,
205
+ ** acquire_kwargs )
191
206
self .assertIsNotNone (store_res .mechs )
192
207
self .assertGreater (len (store_res .mechs ), 0 )
193
- self .assertEqual (store_res .usage , "initiate" )
208
+ self .assertEqual (store_res .usage , expected_usage )
194
209
195
210
name = gssnames .Name (princ_name )
196
211
retrieved_creds = gsscreds .Credentials (name = name , store = store )
@@ -212,13 +227,14 @@ def test_inquire(self, str_name, kwargs):
212
227
else :
213
228
self .assertIsNone (resp .name )
214
229
215
- if kwargs ['lifetime' ]:
230
+ if kwargs ['lifetime' ] and sys . platform != 'darwin' :
216
231
self .assertIsInstance (resp .lifetime , int )
217
232
else :
218
233
self .assertIsNone (resp .lifetime )
219
234
220
235
if kwargs ['usage' ]:
221
- self .assertEqual (resp .usage , "both" )
236
+ expected = "accept" if sys .platform == "darwin" else "both"
237
+ self .assertEqual (resp .usage , expected )
222
238
else :
223
239
self .assertIsNone (resp .usage )
224
240
@@ -242,17 +258,21 @@ def test_inquire_by_mech(self, str_name, kwargs):
242
258
else :
243
259
self .assertIsNone (resp .init_lifetime )
244
260
245
- if kwargs ['accept_lifetime' ]:
261
+ if kwargs ['accept_lifetime' ] and sys . platform != "darwin" :
246
262
self .assertIsInstance (resp .accept_lifetime , int )
247
263
else :
248
264
self .assertIsNone (resp .accept_lifetime )
249
265
250
266
if kwargs ['usage' ]:
251
- self .assertEqual (resp .usage , "both" )
267
+ expected = "accept" if sys .platform == "darwin" else "both"
268
+ self .assertEqual (resp .usage , expected )
252
269
else :
253
270
self .assertIsNone (resp .usage )
254
271
255
272
def test_add (self ):
273
+ if sys .platform == 'darwin' :
274
+ self .skipTest ("macOS Heimdal broken" )
275
+
256
276
input_creds = gsscreds .Credentials (gb .Creds ())
257
277
name = gssnames .Name (SERVICE_PRINCIPAL )
258
278
new_creds = input_creds .add (name , gb .MechType .kerberos ,
@@ -265,18 +285,25 @@ def test_store_into_add_from(self):
265
285
KT = '{tmpdir}/other_keytab' .format (tmpdir = self .realm .tmpdir )
266
286
store = {'ccache' : CCACHE , 'keytab' : KT }
267
287
268
- princ_name = 'service /cs@' + self .realm .realm
288
+ princ_name = 'service_add_from /cs@' + self .realm .realm
269
289
self .realm .addprinc (princ_name )
270
290
self .realm .extract_keytab (princ_name , KT )
271
291
self .realm .kinit (princ_name , None , ['-k' , '-t' , KT ])
272
292
273
293
initial_creds = gsscreds .Credentials (name = None ,
274
294
usage = 'initiate' )
275
295
276
- store_res = initial_creds .store (store , overwrite = True )
296
+ store_kwargs = {}
297
+ expected_usage = 'initiate'
298
+ if self .realm .provider .lower () == 'heimdal' :
299
+ store_kwargs ['usage' ] = 'initiate'
300
+ store_kwargs ['mech' ] = gb .MechType .kerberos
301
+ expected_usage = 'both'
302
+
303
+ store_res = initial_creds .store (store , overwrite = True , ** store_kwargs )
277
304
self .assertIsNotNone (store_res .mechs )
278
305
self .assertGreater (len (store_res .mechs ), 0 )
279
- self .assertEqual (store_res .usage , "initiate" )
306
+ self .assertEqual (store_res .usage , expected_usage )
280
307
281
308
name = gssnames .Name (princ_name )
282
309
input_creds = gsscreds .Credentials (gb .Creds ())
@@ -286,26 +313,34 @@ def test_store_into_add_from(self):
286
313
287
314
@ktu .gssapi_extension_test ('cred_imp_exp' , 'credentials import-export' )
288
315
def test_export (self ):
289
- creds = gsscreds .Credentials (name = self .name )
316
+ creds = gsscreds .Credentials (name = self .name ,
317
+ mechs = [gb .MechType .kerberos ])
290
318
token = creds .export ()
291
319
self .assertIsInstance (token , bytes )
292
320
293
321
@ktu .gssapi_extension_test ('cred_imp_exp' , 'credentials import-export' )
294
322
def test_import_by_init (self ):
295
- creds = gsscreds .Credentials (name = self .name )
323
+ creds = gsscreds .Credentials (name = self .name ,
324
+ mechs = [gb .MechType .kerberos ])
296
325
token = creds .export ()
297
326
imported_creds = gsscreds .Credentials (token = token )
298
327
299
- self .assertEqual (imported_creds .lifetime , creds .lifetime )
328
+ # lifetime seems to be None in Heimdal
329
+ if self .realm .provider .lower () != 'heimdal' :
330
+ self .assertEqual (imported_creds .lifetime , creds .lifetime )
331
+
300
332
self .assertEqual (imported_creds .name , creds .name )
301
333
302
334
@ktu .gssapi_extension_test ('cred_imp_exp' , 'credentials import-export' )
303
335
def test_pickle_unpickle (self ):
304
- creds = gsscreds .Credentials (name = self .name )
336
+ creds = gsscreds .Credentials (name = self .name ,
337
+ mechs = [gb .MechType .kerberos ])
305
338
pickled_creds = pickle .dumps (creds )
306
339
unpickled_creds = pickle .loads (pickled_creds )
307
340
308
- self .assertEqual (unpickled_creds .lifetime , creds .lifetime )
341
+ # lifetime seems to be None in Heimdal
342
+ if self .realm .provider .lower () != 'heimdal' :
343
+ self .assertEqual (unpickled_creds .lifetime , creds .lifetime )
309
344
self .assertEqual (unpickled_creds .name , creds .name )
310
345
311
346
@exist_perms (lifetime = 30 , mechs = [gb .MechType .kerberos ],
@@ -381,8 +416,15 @@ def test_sasl_properties(self):
381
416
if mech .description :
382
417
self .assertIsInstance (mech .description , str )
383
418
384
- cmp_mech = gssmechs .Mechanism .from_sasl_name (mech .sasl_name )
385
- self .assertEqual (str (cmp_mech ), str (mech ))
419
+ # Heimdal fails with Unknown mech-code on sanon
420
+ if not (self .realm .provider .lower () == "heimdal" and
421
+ s == '1.3.6.1.4.1.5322.26.1.110' ):
422
+ cmp_mech = gssmechs .Mechanism .from_sasl_name (mech .sasl_name )
423
+
424
+ # For some reason macOS sometimes returns this for mechs
425
+ if not (sys .platform == 'darwin' and
426
+ str (cmp_mech ) == '1.2.752.43.14.2' ):
427
+ self .assertEqual (str (cmp_mech ), str (mech ))
386
428
387
429
@ktu .gssapi_extension_test ('rfc5587' , 'RFC 5587: Mech Inquiry' )
388
430
def test_mech_inquiry (self ):
@@ -441,6 +483,8 @@ def test_create_from_token(self):
441
483
self .assertEqual (name2 .name_type , gb .NameType .kerberos_principal )
442
484
443
485
@ktu .gssapi_extension_test ('rfc6680' , 'RFC 6680' )
486
+ @ktu .krb_provider_test (['mit' ], 'gss_display_name_ext as it is not '
487
+ 'implemented for krb5' )
444
488
def test_display_as (self ):
445
489
name = gssnames .Name (TARGET_SERVICE_NAME ,
446
490
gb .NameType .hostbased_service )
@@ -457,6 +501,8 @@ def test_display_as(self):
457
501
self .assertEqual (krb_name , princ_str )
458
502
459
503
@ktu .gssapi_extension_test ('rfc6680' , 'RFC 6680' )
504
+ @ktu .krb_provider_test (['mit' ], 'gss_canonicalize_name as it is not '
505
+ 'implemented for krb5' )
460
506
def test_create_from_composite_token_no_attrs (self ):
461
507
name1 = gssnames .Name (TARGET_SERVICE_NAME ,
462
508
gb .NameType .hostbased_service )
@@ -539,7 +585,16 @@ def test_canonicalize(self):
539
585
540
586
canonicalized_name = name .canonicalize (gb .MechType .kerberos )
541
587
self .assertIsInstance (canonicalized_name , gssnames .Name )
542
- self .assertEqual (bytes (canonicalized_name ), SERVICE_PRINCIPAL + b"@" )
588
+
589
+ expected = SERVICE_PRINCIPAL + b"@"
590
+ if sys .platform == 'darwin' :
591
+ # No idea - just go with it
592
+ expected = b"host/wellknown:org.h5l.hostbased-service@" \
593
+ b"H5L.HOSTBASED-SERVICE"
594
+ elif self .realm .provider .lower () == 'heimdal' :
595
+ expected += self .realm .realm .encode ('utf-8' )
596
+
597
+ self .assertEqual (bytes (canonicalized_name ), expected )
543
598
544
599
def test_copy (self ):
545
600
name1 = gssnames .Name (SERVICE_PRINCIPAL )
@@ -551,6 +606,7 @@ def test_copy(self):
551
606
# doesn't actually implement it
552
607
553
608
@ktu .gssapi_extension_test ('rfc6680' , 'RFC 6680' )
609
+ @ktu .krb_provider_test (['mit' ], 'Heimdal does not implemented for krb5' )
554
610
def test_is_mech_name (self ):
555
611
name = gssnames .Name (TARGET_SERVICE_NAME ,
556
612
gb .NameType .hostbased_service )
@@ -562,6 +618,7 @@ def test_is_mech_name(self):
562
618
self .assertEqual (canon_name .mech , gb .MechType .kerberos )
563
619
564
620
@ktu .gssapi_extension_test ('rfc6680' , 'RFC 6680' )
621
+ @ktu .krb_provider_test (['mit' ], 'Heimdal does not implemented for krb5' )
565
622
def test_export_name_composite_no_attrs (self ):
566
623
name = gssnames .Name (TARGET_SERVICE_NAME ,
567
624
gb .NameType .hostbased_service )
@@ -611,8 +668,13 @@ def setUp(self):
611
668
self .client_creds = gsscreds .Credentials (name = None ,
612
669
usage = 'initiate' )
613
670
614
- self .target_name = gssnames .Name (TARGET_SERVICE_NAME ,
615
- gb .NameType .hostbased_service )
671
+ if sys .platform == "darwin" :
672
+ spn = TARGET_SERVICE_NAME + b"@" + FQDN
673
+ self .target_name = gssnames .Name (spn ,
674
+ gb .NameType .hostbased_service )
675
+ else :
676
+ self .target_name = gssnames .Name (TARGET_SERVICE_NAME ,
677
+ gb .NameType .hostbased_service )
616
678
617
679
self .server_name = gssnames .Name (SERVICE_PRINCIPAL )
618
680
self .server_creds = gsscreds .Credentials (name = self .server_name ,
@@ -628,7 +690,12 @@ def _create_client_ctx(self, **kwargs):
628
690
def test_create_from_other (self ):
629
691
raw_client_ctx , raw_server_ctx = self ._create_completed_contexts ()
630
692
high_level_ctx = gssctx .SecurityContext (raw_client_ctx )
631
- self .assertEqual (high_level_ctx .target_name , self .target_name )
693
+
694
+ expected = self .target_name
695
+ if self .realm .provider .lower () == "heimdal" :
696
+ expected = gssnames .Name (self .realm .host_princ .encode ('utf-8' ),
697
+ name_type = gb .NameType .kerberos_principal )
698
+ self .assertEqual (high_level_ctx .target_name , expected )
632
699
633
700
@exist_perms (lifetime = 30 , flags = [],
634
701
mech = gb .MechType .kerberos ,
@@ -688,7 +755,13 @@ def test_initiate_accept_steps(self):
688
755
self .assertTrue (server_ctx .complete )
689
756
690
757
self .assertLessEqual (client_ctx .lifetime , 400 )
691
- self .assertEqual (client_ctx .target_name , self .target_name )
758
+
759
+ expected = self .target_name
760
+ if self .realm .provider .lower () == "heimdal" :
761
+ expected = gssnames .Name (self .realm .host_princ .encode ('utf-8' ),
762
+ name_type = gb .NameType .kerberos_principal )
763
+ self .assertEqual (client_ctx .target_name , expected )
764
+
692
765
self .assertIsInstance (client_ctx .mech , gb .OID )
693
766
self .assertIsInstance (client_ctx .actual_flags , gb .IntEnumFlagSet )
694
767
self .assertTrue (client_ctx .locally_initiated )
@@ -714,6 +787,9 @@ def test_channel_bindings(self):
714
787
client_ctx .step (server_token )
715
788
716
789
def test_bad_channel_bindings_raises_error (self ):
790
+ if sys .platform == "darwin" :
791
+ self .skipTest ("macOS Heimdal doesn't fail as expected" )
792
+
717
793
bdgs = gb .ChannelBindings (application_data = b'abcxyz' ,
718
794
initiator_address_type = gb .AddressType .ip ,
719
795
initiator_address = b'127.0.0.1' ,
@@ -738,7 +814,13 @@ def test_export_create_from_token(self):
738
814
739
815
imported_ctx = gssctx .SecurityContext (token = token )
740
816
self .assertEqual (imported_ctx .usage , "initiate" )
741
- self .assertEqual (imported_ctx .target_name , self .target_name )
817
+
818
+ expected = self .target_name
819
+ if self .realm .provider .lower () == "heimdal" :
820
+ expected = gssnames .Name (self .realm .host_princ .encode ('utf-8' ),
821
+ name_type = gb .NameType .kerberos_principal )
822
+
823
+ self .assertEqual (imported_ctx .target_name , expected )
742
824
743
825
def test_pickle_unpickle (self ):
744
826
client_ctx , server_ctx = self ._create_completed_contexts ()
@@ -747,7 +829,12 @@ def test_pickle_unpickle(self):
747
829
unpickled_ctx = pickle .loads (pickled_ctx )
748
830
self .assertIsInstance (unpickled_ctx , gssctx .SecurityContext )
749
831
self .assertEqual (unpickled_ctx .usage , "initiate" )
750
- self .assertEqual (unpickled_ctx .target_name , self .target_name )
832
+
833
+ expected = self .target_name
834
+ if self .realm .provider .lower () == "heimdal" :
835
+ expected = gssnames .Name (self .realm .host_princ .encode ('utf-8' ),
836
+ name_type = gb .NameType .kerberos_principal )
837
+ self .assertEqual (unpickled_ctx .target_name , expected )
751
838
752
839
def test_encrypt_decrypt (self ):
753
840
client_ctx , server_ctx = self ._create_completed_contexts ()
@@ -810,7 +897,8 @@ def test_verify_signature_raise(self):
810
897
self .assertRaises (gb .GSSError , server_ctx .verify_signature ,
811
898
b"other message" , mic_token )
812
899
813
- @ktu .krb_minversion_test ("1.11" , "returning tokens" )
900
+ @ktu .krb_minversion_test ("1.11" , "returning tokens" , provider = "mit" )
901
+ @ktu .krb_provider_test (["mit" ], "returning tokens" )
814
902
def test_defer_step_error_on_method (self ):
815
903
gssctx .SecurityContext .__DEFER_STEP_ERRORS__ = True
816
904
bdgs = gb .ChannelBindings (application_data = b'abcxyz' )
@@ -827,7 +915,8 @@ def test_defer_step_error_on_method(self):
827
915
self .assertRaises (gb .BadChannelBindingsError , server_ctx .encrypt ,
828
916
b"test" )
829
917
830
- @ktu .krb_minversion_test ("1.11" , "returning tokens" )
918
+ @ktu .krb_minversion_test ("1.11" , "returning tokens" , provider = "mit" )
919
+ @ktu .krb_provider_test (["mit" ], "returning tokens" )
831
920
def test_defer_step_error_on_complete_property_access (self ):
832
921
gssctx .SecurityContext .__DEFER_STEP_ERRORS__ = True
833
922
bdgs = gb .ChannelBindings (application_data = b'abcxyz' )
0 commit comments