# from oauth2client.service_account import ServiceAccountCredentialsimporttimeimportrequestsfromgoogle.authimportjwtfromgoogle.oauth2importservice_accountimportosimportjsonfromgoogle.authimportcryptGOOGLE_APPLICATION_CREDENTIALS=os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')defget_credential_file_path()->str:returnos.getenv('GOOGLE_APPLICATION_CREDENTIALS')defget_jwt_signer()->crypt.RSASigner:returncrypt.Signer(get_private_key())defget_creds(scopes:list)->service_account.Credentials:creds=service_account.Credentials.from_service_account_file(filename=get_credential_file_path(),scopes=scopes)returncredsdefget_credential_file_asdict():withopen(get_credential_file_path())asf:cred_json=json.load(f)returncred_jsondefget_key_from_credential_file(key_to_get:str)->str:withopen(get_credential_file_path())asf:cred_json=json.load(f)returncred_json.get(key_to_get)defget_private_key()->str:returnget_key_from_credential_file('private_key')defget_client_email()->str:returnget_key_from_credential_file('client_email')defget_jwt_token(scopes:list):client_email=get_client_email()iat=int(time.time())exp=iat+3600header={'alg':'RS256'}claim_set={"iss":client_email,# "sub": "test@leanscale.com","sub":client_email,# "email": "test@leanscale.com","email":client_email,"scope":" ".join(scopes),"aud":"https://oauth2.googleapis.com/token","exp":exp,"iat":iat}# s = jwt.encode(header, claim_set, get_private_key())s=jwt.encode(signer=get_jwt_signer(),payload=claim_set,header=header,key_id=get_private_key())r=requests.post("https://oauth2.googleapis.com/token",params={"grant_type":"urn:ietf:params:oauth:grant-type:jwt-bearer","assertion":s})# Right now you are getting an access token for each time.# If you put this code into a server, you have to control# your token expiration before creating a new token.returnr.json()['access_token']defget_jwt_header(scopes:list):token=get_jwt_token(scopes)return{"Authorization":f"Bearer {token}",}
defget_jwt_token(scopes:list):client_email=get_client_email()iat=int(time.time())exp=iat+3600header={'alg':'RS256'}claim_set={"iss":client_email,# "sub": "test@leanscale.com","sub":client_email,# "email": "test@leanscale.com","email":client_email,"scope":" ".join(scopes),"aud":"https://oauth2.googleapis.com/token","exp":exp,"iat":iat}# s = jwt.encode(header, claim_set, get_private_key())s=jwt.encode(signer=get_jwt_signer(),payload=claim_set,header=header,key_id=get_private_key())r=requests.post("https://oauth2.googleapis.com/token",params={"grant_type":"urn:ietf:params:oauth:grant-type:jwt-bearer","assertion":s})# Right now you are getting an access token for each time.# If you put this code into a server, you have to control# your token expiration before creating a new token.returnr.json()['access_token']