Skip to content

ark_identity_service_user

ArkIdentityServiceUser

Source code in ark_sdk_python/auth/identity/ark_identity_service_user.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class ArkIdentityServiceUser:
    def __init__(
        self,
        username: str,
        token: str,
        app_name: str,
        identity_url: Optional[str] = None,
        env: Optional[AwsEnv] = None,
        logger: Optional[logging.Logger] = None,
        cache_authentication: bool = True,
        verify: Optional[Union[str, bool]] = None,
        load_cache: bool = False,
        cache_profile: Optional[ArkProfile] = None,
    ) -> None:
        self.__username = username
        self.__token = token
        self.__app_name = app_name
        self.__env = env or AwsEnv(os.getenv(DEPLOY_ENV, AwsEnv.PROD.value))
        self.__identity_url = identity_url or self.__resolve_fqdn_from_username()
        self.__logger = logger or get_logger(app=self.__class__.__name__)
        self.__keyring = ArkKeyring(self.__class__.__name__.lower()) if cache_authentication else None
        self.__cache_authentication = cache_authentication

        self.__session = Session()
        self.__session_token = None
        self.__session_exp = None
        self.__session.headers.update(ArkIdentityFQDNResolver.default_system_headers())
        if verify is None:
            if ArkSystemConfig.trusted_certificate() is not None:
                verify = ArkSystemConfig.trusted_certificate()
            else:
                verify = ArkSystemConfig.is_verifiying_certificates()
        self.__session.verify = verify
        if load_cache and cache_authentication and cache_profile:
            self.__load_cache(cache_profile)

    def __load_cache(self, profile: Optional[ArkProfile] = None) -> bool:
        if self.__keyring and profile:
            token = self.__keyring.load_token(profile, f'{self.__username}_identity_service_user')
            if token and token.username == self.__username:
                self.__session_token = token.token.get_secret_value()
                self.__session_exp = token.expires_in
                self.__session.headers.update({'Authorization': f'Bearer {self.__session_token}'})
                return True
        return False

    def __save_cache(self, profile: Optional[ArkProfile] = None) -> None:
        if self.__keyring and profile and self.__session_token:
            self.__session_exp = datetime.now() + timedelta(hours=4)
            self.__keyring.save_token(
                profile,
                ArkToken(
                    token=self.__session_token,
                    username=self.__username,
                    endpoint=self.__identity_url,
                    token_type=ArkTokenType.Internal,
                    auth_method=ArkAuthMethod.Other,
                    expires_in=self.__session_exp,
                ),
                f'{self.__username}_identity_service_user',
            )

    def __resolve_fqdn_from_username(self) -> str:
        tenant_suffix = self.__username[self.__username.index('@') :]
        return ArkIdentityFQDNResolver.resolve_tenant_fqdn_from_tenant_suffix(
            tenant_suffix=tenant_suffix, identity_env_url=IDENTITY_ENV_URLS[self.__env]
        )

    def auth_identity(self, profile: Optional[ArkProfile] = None, force: bool = False) -> None:
        """
        Authenticates to Identity with a service user.
        This method creates an auth token and authorizes to the service.

        Args:
            profile (Optional[ArkProfile]): Profile to be used to load from caching, if available
            force (bool): Determines whether to discard existing cache, defaults to `False`

        Raises:
            ArkAuthException: _description_
        """
        # Login to identity with the service service user
        self.__logger.info(f'Authenticating to service user via endpoint [{self.__identity_url}]')
        if self.__cache_authentication and not force and self.__load_cache(profile):
            # Check if expired
            if self.__session_exp.replace(tzinfo=None) > datetime.now():
                self.__logger.info('Loaded identity service user details from cache')
                return

        token_response: Response = self.__session.post(
            url=f'{self.__identity_url}/Oauth2/Token/{self.__app_name}',
            auth=HTTPBasicAuth(self.__username, self.__token),
            verify=True,
            data={'grant_type': 'client_credentials', 'scope': 'api'},
        )
        if token_response.status_code != HTTPStatus.OK:
            raise ArkAuthException('Failed logging in to identity service user')
        auth_result = json.loads(token_response.text)
        if 'access_token' not in auth_result.keys():
            raise ArkAuthException('Failed logging in to identity service user, access token not found')
        access_token = auth_result['access_token']

        # Authorize to the application with the service user
        params = {
            'client_id': self.__app_name,
            'response_type': 'id_token',
            'scope': 'openid profile api',
            'redirect_uri': 'https://cyberark.cloud/redirect',
        }
        self.__logger.info(f'Trying to request a platform authorization with params [{params}]')
        authorize_response = self.__session.get(
            url=f'{self.__identity_url}/OAuth2/Authorize/{self.__app_name}',
            headers={'Authorization': f'Bearer {access_token}'},
            params=params,
            allow_redirects=False,
        )
        if authorize_response.status_code != HTTPStatus.FOUND or 'Location' not in authorize_response.headers:
            raise ArkAuthException('Failed to authorize to application')
        # Parse the authorized token and return the session with it
        location_header_splitted = authorize_response.headers['Location'].split('#', 1)
        if len(location_header_splitted) != 2:
            raise ArkAuthException('Failed to parse location header to retrieve token from')
        parsed_query = parse_qs(location_header_splitted[1])
        if 'id_token' not in parsed_query or len(parsed_query['id_token']) != 1:
            raise ArkAuthException('Failed to parse id token from location header')
        self.__session_token = parsed_query['id_token'][0]
        self.__session.headers.update({'Authorization': f'Bearer {self.__session_token}', **ArkIdentityFQDNResolver.default_headers()})
        self.__session_exp = datetime.now() + timedelta(hours=4)
        self.__logger.info(
            f'Created a service user session via endpoint [{self.__identity_url}] ' f'with user [{self.__username}] to platform'
        )
        if self.__cache_authentication:
            self.__save_cache(profile)

    @property
    def session(self) -> Session:
        return self.__session

    @property
    def session_token(self) -> Optional[str]:
        return self.__session_token

    @property
    def identity_url(self) -> str:
        return self.__identity_url

auth_identity(profile=None, force=False)

Authenticates to Identity with a service user. This method creates an auth token and authorizes to the service.

Parameters:

Name Type Description Default
profile Optional[ArkProfile]

Profile to be used to load from caching, if available

None
force bool

Determines whether to discard existing cache, defaults to False

False

Raises:

Type Description
ArkAuthException

description

Source code in ark_sdk_python/auth/identity/ark_identity_service_user.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def auth_identity(self, profile: Optional[ArkProfile] = None, force: bool = False) -> None:
    """
    Authenticates to Identity with a service user.
    This method creates an auth token and authorizes to the service.

    Args:
        profile (Optional[ArkProfile]): Profile to be used to load from caching, if available
        force (bool): Determines whether to discard existing cache, defaults to `False`

    Raises:
        ArkAuthException: _description_
    """
    # Login to identity with the service service user
    self.__logger.info(f'Authenticating to service user via endpoint [{self.__identity_url}]')
    if self.__cache_authentication and not force and self.__load_cache(profile):
        # Check if expired
        if self.__session_exp.replace(tzinfo=None) > datetime.now():
            self.__logger.info('Loaded identity service user details from cache')
            return

    token_response: Response = self.__session.post(
        url=f'{self.__identity_url}/Oauth2/Token/{self.__app_name}',
        auth=HTTPBasicAuth(self.__username, self.__token),
        verify=True,
        data={'grant_type': 'client_credentials', 'scope': 'api'},
    )
    if token_response.status_code != HTTPStatus.OK:
        raise ArkAuthException('Failed logging in to identity service user')
    auth_result = json.loads(token_response.text)
    if 'access_token' not in auth_result.keys():
        raise ArkAuthException('Failed logging in to identity service user, access token not found')
    access_token = auth_result['access_token']

    # Authorize to the application with the service user
    params = {
        'client_id': self.__app_name,
        'response_type': 'id_token',
        'scope': 'openid profile api',
        'redirect_uri': 'https://cyberark.cloud/redirect',
    }
    self.__logger.info(f'Trying to request a platform authorization with params [{params}]')
    authorize_response = self.__session.get(
        url=f'{self.__identity_url}/OAuth2/Authorize/{self.__app_name}',
        headers={'Authorization': f'Bearer {access_token}'},
        params=params,
        allow_redirects=False,
    )
    if authorize_response.status_code != HTTPStatus.FOUND or 'Location' not in authorize_response.headers:
        raise ArkAuthException('Failed to authorize to application')
    # Parse the authorized token and return the session with it
    location_header_splitted = authorize_response.headers['Location'].split('#', 1)
    if len(location_header_splitted) != 2:
        raise ArkAuthException('Failed to parse location header to retrieve token from')
    parsed_query = parse_qs(location_header_splitted[1])
    if 'id_token' not in parsed_query or len(parsed_query['id_token']) != 1:
        raise ArkAuthException('Failed to parse id token from location header')
    self.__session_token = parsed_query['id_token'][0]
    self.__session.headers.update({'Authorization': f'Bearer {self.__session_token}', **ArkIdentityFQDNResolver.default_headers()})
    self.__session_exp = datetime.now() + timedelta(hours=4)
    self.__logger.info(
        f'Created a service user session via endpoint [{self.__identity_url}] ' f'with user [{self.__username}] to platform'
    )
    if self.__cache_authentication:
        self.__save_cache(profile)