Skip to content

ark_keyring

ArkKeyring

Source code in ark_sdk_python/common/ark_keyring.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class ArkKeyring:
    def __init__(self, service_name: str) -> None:
        self.__service_name = service_name
        self.__logger = get_logger(self.__class__.__name__)

    @staticmethod
    def __is_docker():
        path = '/proc/self/cgroup'
        return os.path.exists('/.dockerenv') or os.path.isfile(path) and any('docker' in line for line in open(path, encoding='utf-8'))

    @staticmethod
    def get_keyring(enforce_basic_keyring: bool = False):
        try:
            # Docker or WSL
            if (
                ArkKeyring.__is_docker()
                or 'Microsoft' in uname().release
                or ARK_BASIC_KEYRING_OVERRIDE_ENV_VAR in os.environ
                or enforce_basic_keyring
            ):
                return BasicKeyring()
            if sys.platform == 'win32':
                from keyrings.cryptfile.cryptfile import CryptFileKeyring  # pylint: disable=import-error

                kr = CryptFileKeyring()
                kr.keyring_key = socket.gethostname()
                return kr
            elif sys.platform == 'darwin' or os.path.exists('/etc/redhat-release'):
                return BasicKeyring()
            else:
                if DBUS_SESSION_ENV_VAR not in os.environ:
                    return BasicKeyring()
                from keyring.backends import SecretService  # pylint: disable=unused-import

                return SecretService.Keyring()
        except Exception:
            return BasicKeyring()

    def save_token(self, profile: ArkProfile, token: ArkToken, postfix: str, enforce_basic_keyring: bool = False) -> None:
        """
        Saves the specified token for a profile in the keyring.
        The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.

        Args:
            profile (ArkProfile): _description_
            token (ArkToken): _description_
            postfix (str): _description_
            enforce_basic_keyring (bool): _description_
        """
        try:
            self.__logger.info(f'Trying to save token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
            kr = self.get_keyring(enforce_basic_keyring)
            kr.set_password(f'{self.__service_name}-{postfix}', profile.profile_name, token.json())
            self.__logger.info('Saved token successfully')
        except Exception as ex:
            # Last resort fallback to basic keyring
            if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
                self.__logger.warning(f'Falling back to basic keyring as we failed to save token with keyring [{str(kr)}]')
                return self.save_token(profile, token, postfix, True)
            self.__logger.warning(f'Failed to save token [{str(ex)}]')

    def load_token(self, profile: ArkProfile, postfix: str, enforce_basic_keyring: bool = False) -> Optional[ArkToken]:
        """
        Loads a token for a profile from the keyring.
        The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.
        When the token has expired and no refresh token exists, the token is deleted from the keyring and nothing is returned.
        When the token has expired but a refresh token exists, the token is only deleted if the max token time has passed (48 hours).

        Args:
            profile (ArkProfile): _description_
            postfix (str): _description_
            enforce_basic_keyring (bool): _description_

        Returns:
            Optional[ArkToken]: _description_
        """
        try:
            kr = self.get_keyring(enforce_basic_keyring)
            self.__logger.info(f'Trying to load token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
            token_val = kr.get_password(f'{self.__service_name}-{postfix}', profile.profile_name)
            if not token_val:
                self.__logger.info('No token found')
                return None
            token = ArkToken.parse_raw(token_val)
            if token.expires_in:
                if (
                    not token.refresh_token
                    and token.token_type != ArkTokenType.Internal
                    and (token.expires_in.replace(tzinfo=None) - timedelta(seconds=DEFAULT_EXPIRATION_GRACE_DELTA_SECONDS)) < datetime.now()
                ):
                    self.__logger.info('Token is expired and no refresh token exists')
                    kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                    return None
                elif (
                    token.refresh_token
                    and (token.expires_in.replace(tzinfo=None) + timedelta(hours=MAX_KEYRING_RECORD_TIME_HOURS)) < datetime.now()
                ):
                    self.__logger.info('Token is expired and has been in the cache for too long before another usage')
                    kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                    return None
            self.__logger.info('Loaded token successfully')
            return token
        except Exception as ex:
            # Last resort fallback to basic keyring
            if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
                self.__logger.warning(f'Falling back to basic keyring as we failed to load token with keyring [{str(kr)}]')
                return self.load_token(profile, postfix, True)
            self.__logger.warning(f'Failed to load cached token [{str(ex)}]')
            try:
                kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
            except Exception as ex_deletion:
                self.__logger.warning(f'Failed to delete failed loaded cached token [{str(ex_deletion)}]')
            return None

load_token(profile, postfix, enforce_basic_keyring=False)

Loads a token for a profile from the keyring. The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used. When the token has expired and no refresh token exists, the token is deleted from the keyring and nothing is returned. When the token has expired but a refresh token exists, the token is only deleted if the max token time has passed (48 hours).

Parameters:

Name Type Description Default
profile ArkProfile

description

required
postfix str

description

required
enforce_basic_keyring bool

description

False

Returns:

Type Description
Optional[ArkToken]

Optional[ArkToken]: description

Source code in ark_sdk_python/common/ark_keyring.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def load_token(self, profile: ArkProfile, postfix: str, enforce_basic_keyring: bool = False) -> Optional[ArkToken]:
    """
    Loads a token for a profile from the keyring.
    The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.
    When the token has expired and no refresh token exists, the token is deleted from the keyring and nothing is returned.
    When the token has expired but a refresh token exists, the token is only deleted if the max token time has passed (48 hours).

    Args:
        profile (ArkProfile): _description_
        postfix (str): _description_
        enforce_basic_keyring (bool): _description_

    Returns:
        Optional[ArkToken]: _description_
    """
    try:
        kr = self.get_keyring(enforce_basic_keyring)
        self.__logger.info(f'Trying to load token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
        token_val = kr.get_password(f'{self.__service_name}-{postfix}', profile.profile_name)
        if not token_val:
            self.__logger.info('No token found')
            return None
        token = ArkToken.parse_raw(token_val)
        if token.expires_in:
            if (
                not token.refresh_token
                and token.token_type != ArkTokenType.Internal
                and (token.expires_in.replace(tzinfo=None) - timedelta(seconds=DEFAULT_EXPIRATION_GRACE_DELTA_SECONDS)) < datetime.now()
            ):
                self.__logger.info('Token is expired and no refresh token exists')
                kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                return None
            elif (
                token.refresh_token
                and (token.expires_in.replace(tzinfo=None) + timedelta(hours=MAX_KEYRING_RECORD_TIME_HOURS)) < datetime.now()
            ):
                self.__logger.info('Token is expired and has been in the cache for too long before another usage')
                kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                return None
        self.__logger.info('Loaded token successfully')
        return token
    except Exception as ex:
        # Last resort fallback to basic keyring
        if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
            self.__logger.warning(f'Falling back to basic keyring as we failed to load token with keyring [{str(kr)}]')
            return self.load_token(profile, postfix, True)
        self.__logger.warning(f'Failed to load cached token [{str(ex)}]')
        try:
            kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
        except Exception as ex_deletion:
            self.__logger.warning(f'Failed to delete failed loaded cached token [{str(ex_deletion)}]')
        return None

save_token(profile, token, postfix, enforce_basic_keyring=False)

Saves the specified token for a profile in the keyring. The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.

Parameters:

Name Type Description Default
profile ArkProfile

description

required
token ArkToken

description

required
postfix str

description

required
enforce_basic_keyring bool

description

False
Source code in ark_sdk_python/common/ark_keyring.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def save_token(self, profile: ArkProfile, token: ArkToken, postfix: str, enforce_basic_keyring: bool = False) -> None:
    """
    Saves the specified token for a profile in the keyring.
    The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.

    Args:
        profile (ArkProfile): _description_
        token (ArkToken): _description_
        postfix (str): _description_
        enforce_basic_keyring (bool): _description_
    """
    try:
        self.__logger.info(f'Trying to save token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
        kr = self.get_keyring(enforce_basic_keyring)
        kr.set_password(f'{self.__service_name}-{postfix}', profile.profile_name, token.json())
        self.__logger.info('Saved token successfully')
    except Exception as ex:
        # Last resort fallback to basic keyring
        if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
            self.__logger.warning(f'Falling back to basic keyring as we failed to save token with keyring [{str(kr)}]')
            return self.save_token(profile, token, postfix, True)
        self.__logger.warning(f'Failed to save token [{str(ex)}]')