Skip to content

common

ArkAsyncClient

Bases: ABC, ArkClient

Source code in ark_sdk_python/common/ark_async_client.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class ArkAsyncClient(ABC, ArkClient):
    def __init__(
        self,
        async_request_settings: ArkAsyncRequestSettings = None,
        base_url: Optional[str] = None,
        token: Optional[str] = None,
        token_type: str = 'Bearer',
        cookies: Optional[List] = None,
        auth_header_name: str = 'Authorization',
        auth: Optional[Tuple[str, str]] = None,
        refresh_connection_callback: Optional[Callable[['ArkClient'], None]] = None,
    ) -> None:
        super().__init__(
            base_url, token, token_type, cookies, auth_header_name, auth, refresh_connection_callback=refresh_connection_callback
        )
        self.__async_request_settings = async_request_settings or ArkAsyncRequestSettings()

    @abstractmethod
    def async_request_for(self, poll_model: ArkPollableModel, async_task: ArkAsyncTask) -> ArkAsyncRequest:
        """
        Creates an async request for the specified model and task.
        The request polls for async operations as defined by the poll model's implementation.

        Args:
            poll_model (ArkPollableModel): _description_
            async_task (ArkAsyncTask): _description_

        Returns:
            ArkAsyncRequest: _description_
        """

    @staticmethod
    @abstractmethod
    def async_task_type() -> Type[ArkAsyncTask]:
        """
        Returns the client's async task type.

        Returns:
            Type[ArkAsyncTask]: _description_
        """

    @staticmethod
    @abstractmethod
    def async_request_type() -> Type[ArkAsyncRequest]:
        """
        Returns the client's async request type.

        Returns:
            Type[ArkAsyncTask]: _description_
        """

    @property
    def async_request_settings(self) -> ArkAsyncRequestSettings:
        return self.__async_request_settings

async_request_for(poll_model, async_task) abstractmethod

Creates an async request for the specified model and task. The request polls for async operations as defined by the poll model's implementation.

Parameters:

Name Type Description Default
poll_model ArkPollableModel

description

required
async_task ArkAsyncTask

description

required

Returns:

Name Type Description
ArkAsyncRequest ArkAsyncRequest

description

Source code in ark_sdk_python/common/ark_async_client.py
28
29
30
31
32
33
34
35
36
37
38
39
40
@abstractmethod
def async_request_for(self, poll_model: ArkPollableModel, async_task: ArkAsyncTask) -> ArkAsyncRequest:
    """
    Creates an async request for the specified model and task.
    The request polls for async operations as defined by the poll model's implementation.

    Args:
        poll_model (ArkPollableModel): _description_
        async_task (ArkAsyncTask): _description_

    Returns:
        ArkAsyncRequest: _description_
    """

async_request_type() abstractmethod staticmethod

Returns the client's async request type.

Returns:

Type Description
Type[ArkAsyncRequest]

Type[ArkAsyncTask]: description

Source code in ark_sdk_python/common/ark_async_client.py
52
53
54
55
56
57
58
59
60
@staticmethod
@abstractmethod
def async_request_type() -> Type[ArkAsyncRequest]:
    """
    Returns the client's async request type.

    Returns:
        Type[ArkAsyncTask]: _description_
    """

async_task_type() abstractmethod staticmethod

Returns the client's async task type.

Returns:

Type Description
Type[ArkAsyncTask]

Type[ArkAsyncTask]: description

Source code in ark_sdk_python/common/ark_async_client.py
42
43
44
45
46
47
48
49
50
@staticmethod
@abstractmethod
def async_task_type() -> Type[ArkAsyncTask]:
    """
    Returns the client's async task type.

    Returns:
        Type[ArkAsyncTask]: _description_
    """

ArkAsyncRequest

Bases: ABC

Source code in ark_sdk_python/common/ark_async_request.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class ArkAsyncRequest(ABC):
    def __init__(self, client: ArkClient, async_task: ArkAsyncTask, async_request_settings: ArkAsyncRequestSettings):
        self._async_task = async_task
        self._client = client
        self._async_request_settings = async_request_settings
        self._logger = get_logger(self.__class__.__name__)

    @abstractmethod
    def is_finished(self) -> bool:
        """
        Checks whether or not the current async request has finished.

        Returns:
            bool: _description_
        """

    @abstractmethod
    def task_failed(self) -> bool:
        """
        Checks whether or the current async request failed.

        Returns:
            bool: _description_
        """

    @abstractmethod
    def task_timeout(self) -> bool:
        """
        Checks whether or not the current async request has timed out.

        Returns:
            bool: _description_
        """

    @abstractmethod
    def poll(self, timeout_seconds: int, progress_callback: Callable[[ArkAsyncTask, int, ArkAsyncStatus], None]) -> bool:
        """
        Polls the async request until it has completed.
        Progress callbacks can also be used to return the async request's status.

        Args:
            timeout_seconds (int): _description_
            progress_callback (Callable[[ArkAsyncTask, int, ArkAsyncStatus], None]): _description_

        Returns:
            bool: _description_
        """

    @property
    def async_task(self) -> ArkAsyncTask:
        return self._async_task

    @property
    def client(self) -> ArkClient:
        return self._client

is_finished() abstractmethod

Checks whether or not the current async request has finished.

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/common/ark_async_request.py
16
17
18
19
20
21
22
23
@abstractmethod
def is_finished(self) -> bool:
    """
    Checks whether or not the current async request has finished.

    Returns:
        bool: _description_
    """

poll(timeout_seconds, progress_callback) abstractmethod

Polls the async request until it has completed. Progress callbacks can also be used to return the async request's status.

Parameters:

Name Type Description Default
timeout_seconds int

description

required
progress_callback Callable[[ArkAsyncTask, int, ArkAsyncStatus], None]

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/common/ark_async_request.py
43
44
45
46
47
48
49
50
51
52
53
54
55
@abstractmethod
def poll(self, timeout_seconds: int, progress_callback: Callable[[ArkAsyncTask, int, ArkAsyncStatus], None]) -> bool:
    """
    Polls the async request until it has completed.
    Progress callbacks can also be used to return the async request's status.

    Args:
        timeout_seconds (int): _description_
        progress_callback (Callable[[ArkAsyncTask, int, ArkAsyncStatus], None]): _description_

    Returns:
        bool: _description_
    """

task_failed() abstractmethod

Checks whether or the current async request failed.

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/common/ark_async_request.py
25
26
27
28
29
30
31
32
@abstractmethod
def task_failed(self) -> bool:
    """
    Checks whether or the current async request failed.

    Returns:
        bool: _description_
    """

task_timeout() abstractmethod

Checks whether or not the current async request has timed out.

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/common/ark_async_request.py
34
35
36
37
38
39
40
41
@abstractmethod
def task_timeout(self) -> bool:
    """
    Checks whether or not the current async request has timed out.

    Returns:
        bool: _description_
    """

ArkClient

Source code in ark_sdk_python/common/ark_client.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class ArkClient:
    def __init__(
        self,
        base_url: Optional[str] = None,
        token: Optional[str] = None,
        token_type: str = 'Bearer',
        cookies: Optional[List] = None,
        auth_header_name: str = 'Authorization',
        auth: Optional[Tuple[str, str]] = None,
        cookie_jar: Optional[RequestsCookieJar] = None,
        verify: Optional[Union[str, bool]] = None,
        refresh_connection_callback: Optional[Callable[['ArkClient'], None]] = None,
    ) -> None:
        from fake_useragent import UserAgent

        self.__session = Session()
        self.__base_url = base_url
        self.__token = token
        self.__token_type = token_type
        self.__auth_header_name = auth_header_name
        self.__refresh_connection_callback = refresh_connection_callback
        if self.__base_url and not self.__base_url.startswith('https://'):
            self.__base_url = f'https://{self.__base_url}'
        if auth:
            self.__session.auth = auth
        self.update_token(token)
        self.update_cookies(cookies, cookie_jar)
        if verify is None:
            if ArkSystemConfig.trusted_certificate() is not None:
                verify = ArkSystemConfig.trusted_certificate()
            else:
                verify = ArkSystemConfig.is_verifiying_certificates()
        self.__session.verify = verify
        self.__session.headers['User-Agent'] = UserAgent(browsers=['chrome']).googlechrome

    @property
    def base_url(self) -> Optional[str]:
        return self.__base_url

    @property
    def session(self) -> Session:
        return self.__session

    @property
    def session_token(self) -> Optional[str]:
        return self.__token

    @property
    def refresh_connection_callback(self) -> Optional[Callable[['ArkClient'], None]]:
        return self.__refresh_connection_callback

    def add_header(self, key: str, value: str) -> None:
        self.__session.headers.update({key: value})

    def add_headers(self, headers: Dict[str, str]) -> None:
        self.__session.headers.update(headers)

    def add_cookie(self, key: str, value: str) -> None:
        self.__session.cookies[key] = value

    def generic_http_method_request(self, method: str, route: str, **kwargs) -> Response:
        url = route
        if self.__base_url:
            url = f'{self.__base_url}'
            if route and route != '':
                if self.__base_url.endswith('/') or route.startswith('/'):
                    url = f'{self.__base_url}{route}'
                else:
                    url = f'{self.__base_url}/{route}'
        http_method = getattr(self.__session, method)
        response: Response = http_method(url, **kwargs)
        return response

    def get(self, route: str, **kwargs) -> Response:
        """
        Performs a GET request with the session details and given headers and tokens.

        Args:
            route (str): _description_

        Returns:
            Response: _description_
        """
        return self.generic_http_method_request('get', route, **kwargs)

    def post(self, route: str, **kwargs) -> Response:
        """
        Performs a POST request with the session details and given headers and tokens.

        Args:
            route (str): _description_

        Returns:
            Response: _description_
        """
        return self.generic_http_method_request('post', route, **kwargs)

    def put(self, route: str, **kwargs) -> Response:
        """
        Performs a PUT request with the session details and given headers and tokens.

        Args:
            route (str): _description_

        Returns:
            Response: _description_
        """
        return self.generic_http_method_request('put', route, **kwargs)

    def delete(self, route: str, **kwargs) -> Response:
        """
        Performs a DELETE request with the session details and given headers and tokens.

        Args:
            route (str): _description_

        Returns:
            Response: _description_
        """
        return self.generic_http_method_request('delete', route, **kwargs)

    def patch(self, route: str, **kwargs) -> Response:
        """
        Performs a PATCH request with the session details and given headers and tokens.

        Args:
            route (str): _description_

        Returns:
            Response: _description_
        """
        return self.generic_http_method_request('patch', route, **kwargs)

    def options(self, route: str, **kwargs) -> Response:
        """
        Performs a OPTIONS request with the session details and given headers and tokens.

        Args:
            route (str): _description_

        Returns:
            Response: _description_
        """
        return self.generic_http_method_request('options', route, **kwargs)

    def update_token(self, token: Optional[str] = None) -> None:
        """
        Updates a session token.

        Args:
            token (Optional[str], optional): _description_. Defaults to None.
        """
        self.__token = token
        if token:
            if self.__token_type == 'Basic':
                user, password = b64decode(token.encode('ascii')).decode('ascii').split(':')
                self.__session.auth = (user, password)
            else:
                if len(self.__token_type) == 0:
                    self.__session.headers.update({self.__auth_header_name: f'{self.__token}'})
                else:
                    self.__session.headers.update({self.__auth_header_name: f'{self.__token_type} {self.__token}'})

    def update_cookies(self, cookies: Optional[List] = None, cookie_jar: Optional[RequestsCookieJar] = None) -> None:
        """
        Updates session cookies.

        Args:
            cookies (Optional[List], optional): _description_. Defaults to None.
            cookie_jar (Optional[RequestsCookieJar], optional): _description_. Defaults to None.
        """
        if cookies:
            for c in cookies:
                self.__session.cookies.set_cookie(c)
        if cookie_jar:
            self.__session.cookies.update(cookie_jar)

delete(route, **kwargs)

Performs a DELETE request with the session details and given headers and tokens.

Parameters:

Name Type Description Default
route str

description

required

Returns:

Name Type Description
Response Response

description

Source code in ark_sdk_python/common/ark_client.py
128
129
130
131
132
133
134
135
136
137
138
def delete(self, route: str, **kwargs) -> Response:
    """
    Performs a DELETE request with the session details and given headers and tokens.

    Args:
        route (str): _description_

    Returns:
        Response: _description_
    """
    return self.generic_http_method_request('delete', route, **kwargs)

get(route, **kwargs)

Performs a GET request with the session details and given headers and tokens.

Parameters:

Name Type Description Default
route str

description

required

Returns:

Name Type Description
Response Response

description

Source code in ark_sdk_python/common/ark_client.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get(self, route: str, **kwargs) -> Response:
    """
    Performs a GET request with the session details and given headers and tokens.

    Args:
        route (str): _description_

    Returns:
        Response: _description_
    """
    return self.generic_http_method_request('get', route, **kwargs)

options(route, **kwargs)

Performs a OPTIONS request with the session details and given headers and tokens.

Parameters:

Name Type Description Default
route str

description

required

Returns:

Name Type Description
Response Response

description

Source code in ark_sdk_python/common/ark_client.py
152
153
154
155
156
157
158
159
160
161
162
def options(self, route: str, **kwargs) -> Response:
    """
    Performs a OPTIONS request with the session details and given headers and tokens.

    Args:
        route (str): _description_

    Returns:
        Response: _description_
    """
    return self.generic_http_method_request('options', route, **kwargs)

patch(route, **kwargs)

Performs a PATCH request with the session details and given headers and tokens.

Parameters:

Name Type Description Default
route str

description

required

Returns:

Name Type Description
Response Response

description

Source code in ark_sdk_python/common/ark_client.py
140
141
142
143
144
145
146
147
148
149
150
def patch(self, route: str, **kwargs) -> Response:
    """
    Performs a PATCH request with the session details and given headers and tokens.

    Args:
        route (str): _description_

    Returns:
        Response: _description_
    """
    return self.generic_http_method_request('patch', route, **kwargs)

post(route, **kwargs)

Performs a POST request with the session details and given headers and tokens.

Parameters:

Name Type Description Default
route str

description

required

Returns:

Name Type Description
Response Response

description

Source code in ark_sdk_python/common/ark_client.py
104
105
106
107
108
109
110
111
112
113
114
def post(self, route: str, **kwargs) -> Response:
    """
    Performs a POST request with the session details and given headers and tokens.

    Args:
        route (str): _description_

    Returns:
        Response: _description_
    """
    return self.generic_http_method_request('post', route, **kwargs)

put(route, **kwargs)

Performs a PUT request with the session details and given headers and tokens.

Parameters:

Name Type Description Default
route str

description

required

Returns:

Name Type Description
Response Response

description

Source code in ark_sdk_python/common/ark_client.py
116
117
118
119
120
121
122
123
124
125
126
def put(self, route: str, **kwargs) -> Response:
    """
    Performs a PUT request with the session details and given headers and tokens.

    Args:
        route (str): _description_

    Returns:
        Response: _description_
    """
    return self.generic_http_method_request('put', route, **kwargs)

update_cookies(cookies=None, cookie_jar=None)

Updates session cookies.

Parameters:

Name Type Description Default
cookies Optional[List]

description. Defaults to None.

None
cookie_jar Optional[RequestsCookieJar]

description. Defaults to None.

None
Source code in ark_sdk_python/common/ark_client.py
182
183
184
185
186
187
188
189
190
191
192
193
194
def update_cookies(self, cookies: Optional[List] = None, cookie_jar: Optional[RequestsCookieJar] = None) -> None:
    """
    Updates session cookies.

    Args:
        cookies (Optional[List], optional): _description_. Defaults to None.
        cookie_jar (Optional[RequestsCookieJar], optional): _description_. Defaults to None.
    """
    if cookies:
        for c in cookies:
            self.__session.cookies.set_cookie(c)
    if cookie_jar:
        self.__session.cookies.update(cookie_jar)

update_token(token=None)

Updates a session token.

Parameters:

Name Type Description Default
token Optional[str]

description. Defaults to None.

None
Source code in ark_sdk_python/common/ark_client.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def update_token(self, token: Optional[str] = None) -> None:
    """
    Updates a session token.

    Args:
        token (Optional[str], optional): _description_. Defaults to None.
    """
    self.__token = token
    if token:
        if self.__token_type == 'Basic':
            user, password = b64decode(token.encode('ascii')).decode('ascii').split(':')
            self.__session.auth = (user, password)
        else:
            if len(self.__token_type) == 0:
                self.__session.headers.update({self.__auth_header_name: f'{self.__token}'})
            else:
                self.__session.headers.update({self.__auth_header_name: f'{self.__token_type} {self.__token}'})

ArkKeyring

Source code in ark_sdk_python/common/ark_keyring.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class ArkKeyring:
    def __init__(self, service_name: str) -> None:
        self.__service_name = service_name
        self.__logger = get_logger(self.__class__.__name__)

    @staticmethod
    def __is_docker():
        path = '/proc/self/cgroup'
        return os.path.exists('/.dockerenv') or os.path.isfile(path) and any('docker' in line for line in open(path, encoding='utf-8'))

    @staticmethod
    def get_keyring(enforce_basic_keyring: bool = False):
        try:
            # Docker or WSL
            if (
                ArkKeyring.__is_docker()
                or 'Microsoft' in uname().release
                or ARK_BASIC_KEYRING_OVERRIDE_ENV_VAR in os.environ
                or enforce_basic_keyring
            ):
                return BasicKeyring()
            if sys.platform == 'win32':
                from keyrings.cryptfile.cryptfile import CryptFileKeyring  # pylint: disable=import-error

                kr = CryptFileKeyring()
                kr.keyring_key = socket.gethostname()
                return kr
            elif sys.platform == 'darwin' or os.path.exists('/etc/redhat-release'):
                return BasicKeyring()
            else:
                if DBUS_SESSION_ENV_VAR not in os.environ:
                    return BasicKeyring()
                from keyring.backends import SecretService  # pylint: disable=unused-import

                return SecretService.Keyring()
        except Exception:
            return BasicKeyring()

    def save_token(self, profile: ArkProfile, token: ArkToken, postfix: str, enforce_basic_keyring: bool = False) -> None:
        """
        Saves the specified token for a profile in the keyring.
        The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.

        Args:
            profile (ArkProfile): _description_
            token (ArkToken): _description_
            postfix (str): _description_
            enforce_basic_keyring (bool): _description_
        """
        try:
            self.__logger.info(f'Trying to save token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
            kr = self.get_keyring(enforce_basic_keyring)
            kr.set_password(f'{self.__service_name}-{postfix}', profile.profile_name, token.json())
            self.__logger.info('Saved token successfully')
        except Exception as ex:
            # Last resort fallback to basic keyring
            if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
                self.__logger.warning(f'Falling back to basic keyring as we failed to save token with keyring [{str(kr)}]')
                return self.save_token(profile, token, postfix, True)
            self.__logger.warning(f'Failed to save token [{str(ex)}]')

    def load_token(self, profile: ArkProfile, postfix: str, enforce_basic_keyring: bool = False) -> Optional[ArkToken]:
        """
        Loads a token for a profile from the keyring.
        The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.
        When the token has expired and no refresh token exists, the token is deleted from the keyring and nothing is returned.
        When the token has expired but a refresh token exists, the token is only deleted if the max token time has passed (48 hours).

        Args:
            profile (ArkProfile): _description_
            postfix (str): _description_
            enforce_basic_keyring (bool): _description_

        Returns:
            Optional[ArkToken]: _description_
        """
        try:
            kr = self.get_keyring(enforce_basic_keyring)
            self.__logger.info(f'Trying to load token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
            token_val = kr.get_password(f'{self.__service_name}-{postfix}', profile.profile_name)
            if not token_val:
                self.__logger.info('No token found')
                return None
            token = ArkToken.parse_raw(token_val)
            if token.expires_in:
                if (
                    not token.refresh_token
                    and token.token_type != ArkTokenType.Internal
                    and (token.expires_in.replace(tzinfo=None) - timedelta(seconds=DEFAULT_EXPIRATION_GRACE_DELTA_SECONDS)) < datetime.now()
                ):
                    self.__logger.info('Token is expired and no refresh token exists')
                    kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                    return None
                elif (
                    token.refresh_token
                    and (token.expires_in.replace(tzinfo=None) + timedelta(hours=MAX_KEYRING_RECORD_TIME_HOURS)) < datetime.now()
                ):
                    self.__logger.info('Token is expired and has been in the cache for too long before another usage')
                    kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                    return None
            self.__logger.info('Loaded token successfully')
            return token
        except Exception as ex:
            # Last resort fallback to basic keyring
            if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
                self.__logger.warning(f'Falling back to basic keyring as we failed to load token with keyring [{str(kr)}]')
                return self.load_token(profile, postfix, True)
            self.__logger.warning(f'Failed to load cached token [{str(ex)}]')
            try:
                kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
            except Exception as ex_deletion:
                self.__logger.warning(f'Failed to delete failed loaded cached token [{str(ex_deletion)}]')
            return None

load_token(profile, postfix, enforce_basic_keyring=False)

Loads a token for a profile from the keyring. The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used. When the token has expired and no refresh token exists, the token is deleted from the keyring and nothing is returned. When the token has expired but a refresh token exists, the token is only deleted if the max token time has passed (48 hours).

Parameters:

Name Type Description Default
profile ArkProfile

description

required
postfix str

description

required
enforce_basic_keyring bool

description

False

Returns:

Type Description
Optional[ArkToken]

Optional[ArkToken]: description

Source code in ark_sdk_python/common/ark_keyring.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def load_token(self, profile: ArkProfile, postfix: str, enforce_basic_keyring: bool = False) -> Optional[ArkToken]:
    """
    Loads a token for a profile from the keyring.
    The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.
    When the token has expired and no refresh token exists, the token is deleted from the keyring and nothing is returned.
    When the token has expired but a refresh token exists, the token is only deleted if the max token time has passed (48 hours).

    Args:
        profile (ArkProfile): _description_
        postfix (str): _description_
        enforce_basic_keyring (bool): _description_

    Returns:
        Optional[ArkToken]: _description_
    """
    try:
        kr = self.get_keyring(enforce_basic_keyring)
        self.__logger.info(f'Trying to load token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
        token_val = kr.get_password(f'{self.__service_name}-{postfix}', profile.profile_name)
        if not token_val:
            self.__logger.info('No token found')
            return None
        token = ArkToken.parse_raw(token_val)
        if token.expires_in:
            if (
                not token.refresh_token
                and token.token_type != ArkTokenType.Internal
                and (token.expires_in.replace(tzinfo=None) - timedelta(seconds=DEFAULT_EXPIRATION_GRACE_DELTA_SECONDS)) < datetime.now()
            ):
                self.__logger.info('Token is expired and no refresh token exists')
                kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                return None
            elif (
                token.refresh_token
                and (token.expires_in.replace(tzinfo=None) + timedelta(hours=MAX_KEYRING_RECORD_TIME_HOURS)) < datetime.now()
            ):
                self.__logger.info('Token is expired and has been in the cache for too long before another usage')
                kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
                return None
        self.__logger.info('Loaded token successfully')
        return token
    except Exception as ex:
        # Last resort fallback to basic keyring
        if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
            self.__logger.warning(f'Falling back to basic keyring as we failed to load token with keyring [{str(kr)}]')
            return self.load_token(profile, postfix, True)
        self.__logger.warning(f'Failed to load cached token [{str(ex)}]')
        try:
            kr.delete_password(f'{self.__service_name}-{postfix}', profile.profile_name)
        except Exception as ex_deletion:
            self.__logger.warning(f'Failed to delete failed loaded cached token [{str(ex_deletion)}]')
        return None

save_token(profile, token, postfix, enforce_basic_keyring=False)

Saves the specified token for a profile in the keyring. The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.

Parameters:

Name Type Description Default
profile ArkProfile

description

required
token ArkToken

description

required
postfix str

description

required
enforce_basic_keyring bool

description

False
Source code in ark_sdk_python/common/ark_keyring.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def save_token(self, profile: ArkProfile, token: ArkToken, postfix: str, enforce_basic_keyring: bool = False) -> None:
    """
    Saves the specified token for a profile in the keyring.
    The keyring is the OS-based implementation or, when unavailable, a fallback to BasicKeyring is used.

    Args:
        profile (ArkProfile): _description_
        token (ArkToken): _description_
        postfix (str): _description_
        enforce_basic_keyring (bool): _description_
    """
    try:
        self.__logger.info(f'Trying to save token [{self.__service_name}-{postfix}] of profile [{profile.profile_name}]')
        kr = self.get_keyring(enforce_basic_keyring)
        kr.set_password(f'{self.__service_name}-{postfix}', profile.profile_name, token.json())
        self.__logger.info('Saved token successfully')
    except Exception as ex:
        # Last resort fallback to basic keyring
        if not isinstance(kr, BasicKeyring) or not enforce_basic_keyring:
            self.__logger.warning(f'Falling back to basic keyring as we failed to save token with keyring [{str(kr)}]')
            return self.save_token(profile, token, postfix, True)
        self.__logger.warning(f'Failed to save token [{str(ex)}]')