@@ -146,6 +146,13 @@ def test_get_cred_info(self):
146
146
"principal" : "impersonated@project.iam.gserviceaccount.com" ,
147
147
}
148
148
149
+ def test_universe_domain_matching_source (self ):
150
+ source_credentials = service_account .Credentials (
151
+ SIGNER , "some@email.com" , TOKEN_URI , universe_domain = "foo.bar"
152
+ )
153
+ credentials = self .make_credentials (source_credentials = source_credentials )
154
+ assert credentials .universe_domain == "foo.bar"
155
+
149
156
def test__make_copy_get_cred_info (self ):
150
157
credentials = self .make_credentials ()
151
158
credentials ._cred_file_path = "/path/to/file"
@@ -231,6 +238,38 @@ def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
231
238
== ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE
232
239
)
233
240
241
+ @pytest .mark .parametrize ("use_data_bytes" , [True , False ])
242
+ def test_refresh_success_nonGdu (self , use_data_bytes , mock_donor_credentials ):
243
+ source_credentials = service_account .Credentials (
244
+ SIGNER , "some@email.com" , TOKEN_URI , universe_domain = "foo.bar"
245
+ )
246
+ credentials = self .make_credentials (
247
+ lifetime = None , source_credentials = source_credentials
248
+ )
249
+ token = "token"
250
+
251
+ expire_time = (
252
+ _helpers .utcnow ().replace (microsecond = 0 ) + datetime .timedelta (seconds = 500 )
253
+ ).isoformat ("T" ) + "Z"
254
+ response_body = {"accessToken" : token , "expireTime" : expire_time }
255
+
256
+ request = self .make_request (
257
+ data = json .dumps (response_body ),
258
+ status = http_client .OK ,
259
+ use_data_bytes = use_data_bytes ,
260
+ )
261
+
262
+ credentials .refresh (request )
263
+
264
+ assert credentials .valid
265
+ assert not credentials .expired
266
+ # Confirm override endpoint used.
267
+ request_kwargs = request .call_args [1 ]
268
+ assert (
269
+ request_kwargs ["url" ]
270
+ == "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateAccessToken"
271
+ )
272
+
234
273
@pytest .mark .parametrize ("use_data_bytes" , [True , False ])
235
274
def test_refresh_success_iam_endpoint_override (
236
275
self , use_data_bytes , mock_donor_credentials
@@ -397,6 +436,38 @@ def test_service_account_email(self):
397
436
398
437
def test_sign_bytes (self , mock_donor_credentials , mock_authorizedsession_sign ):
399
438
credentials = self .make_credentials (lifetime = None )
439
+ expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob"
440
+ self ._sign_bytes_helper (
441
+ credentials ,
442
+ mock_donor_credentials ,
443
+ mock_authorizedsession_sign ,
444
+ expected_url ,
445
+ )
446
+
447
+ def test_sign_bytes_nonGdu (
448
+ self , mock_donor_credentials , mock_authorizedsession_sign
449
+ ):
450
+ source_credentials = service_account .Credentials (
451
+ SIGNER , "some@email.com" , TOKEN_URI , universe_domain = "foo.bar"
452
+ )
453
+ credentials = self .make_credentials (
454
+ lifetime = None , source_credentials = source_credentials
455
+ )
456
+ expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob"
457
+ self ._sign_bytes_helper (
458
+ credentials ,
459
+ mock_donor_credentials ,
460
+ mock_authorizedsession_sign ,
461
+ expected_url ,
462
+ )
463
+
464
+ def _sign_bytes_helper (
465
+ self ,
466
+ credentials ,
467
+ mock_donor_credentials ,
468
+ mock_authorizedsession_sign ,
469
+ expected_url ,
470
+ ):
400
471
token = "token"
401
472
402
473
expire_time = (
@@ -412,11 +483,19 @@ def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign):
412
483
request .return_value = response
413
484
414
485
credentials .refresh (request )
415
-
416
486
assert credentials .valid
417
487
assert not credentials .expired
418
488
419
489
signature = credentials .sign_bytes (b"signed bytes" )
490
+ mock_authorizedsession_sign .assert_called_with (
491
+ mock .ANY ,
492
+ "POST" ,
493
+ expected_url ,
494
+ None ,
495
+ json = {"payload" : "c2lnbmVkIGJ5dGVz" , "delegates" : []},
496
+ headers = {"Content-Type" : "application/json" },
497
+ )
498
+
420
499
assert signature == b"signature"
421
500
422
501
def test_sign_bytes_failure (self ):
@@ -563,6 +642,45 @@ def test_id_token_from_credential(
563
642
self , mock_donor_credentials , mock_authorizedsession_idtoken
564
643
):
565
644
credentials = self .make_credentials (lifetime = None )
645
+ target_credentials = self .make_credentials (lifetime = None )
646
+ expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken"
647
+ self ._test_id_token_helper (
648
+ credentials ,
649
+ target_credentials ,
650
+ mock_donor_credentials ,
651
+ mock_authorizedsession_idtoken ,
652
+ expected_url ,
653
+ )
654
+
655
+ def test_id_token_from_credential_nonGdu (
656
+ self , mock_donor_credentials , mock_authorizedsession_idtoken
657
+ ):
658
+ source_credentials = service_account .Credentials (
659
+ SIGNER , "some@email.com" , TOKEN_URI , universe_domain = "foo.bar"
660
+ )
661
+ credentials = self .make_credentials (
662
+ lifetime = None , source_credentials = source_credentials
663
+ )
664
+ target_credentials = self .make_credentials (
665
+ lifetime = None , source_credentials = source_credentials
666
+ )
667
+ expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateIdToken"
668
+ self ._test_id_token_helper (
669
+ credentials ,
670
+ target_credentials ,
671
+ mock_donor_credentials ,
672
+ mock_authorizedsession_idtoken ,
673
+ expected_url ,
674
+ )
675
+
676
+ def _test_id_token_helper (
677
+ self ,
678
+ credentials ,
679
+ target_credentials ,
680
+ mock_donor_credentials ,
681
+ mock_authorizedsession_idtoken ,
682
+ expected_url ,
683
+ ):
566
684
token = "token"
567
685
target_audience = "https://foo.bar"
568
686
@@ -580,17 +698,19 @@ def test_id_token_from_credential(
580
698
assert credentials .valid
581
699
assert not credentials .expired
582
700
583
- new_credentials = self .make_credentials (lifetime = None )
584
-
585
701
id_creds = impersonated_credentials .IDTokenCredentials (
586
702
credentials , target_audience = target_audience , include_email = True
587
703
)
588
- id_creds = id_creds .from_credentials (target_credentials = new_credentials )
704
+ id_creds = id_creds .from_credentials (target_credentials = target_credentials )
589
705
id_creds .refresh (request )
590
706
707
+ args = mock_authorizedsession_idtoken .call_args .args
708
+
709
+ assert args [2 ] == expected_url
710
+
591
711
assert id_creds .token == ID_TOKEN_DATA
592
712
assert id_creds ._include_email is True
593
- assert id_creds ._target_credentials is new_credentials
713
+ assert id_creds ._target_credentials is target_credentials
594
714
595
715
def test_id_token_with_target_audience (
596
716
self , mock_donor_credentials , mock_authorizedsession_idtoken
0 commit comments