Skip to content

actions

ArkAction

Bases: ABC

Source code in ark_sdk_python/actions/ark_action.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class ArkAction(ABC):
    def __init__(self):
        self._logger = get_logger(app=self.__class__.__name__)

    def _common_actions_configuration(self, parser: argparse.ArgumentParser) -> None:
        parser.add_argument('-r', '--raw', action='store_true', help='Whether to raw output')
        parser.add_argument('-s', '--silent', action='store_true', help='Silent execution, no interactiveness')
        parser.add_argument('-ao', '--allow-output', action='store_true', help='Allow stdout / stderr even when silent and not interactive')
        parser.add_argument('-v', '--verbose', action='store_true', help='Whether to verbose log')
        parser.add_argument('-ls', '--logger-style', choices=['default'], help='Which verbose logger style to use', default='default')
        parser.add_argument(
            '-ll',
            '--log-level',
            help='Log level to use while verbose',
            choices=['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'],
            default='INFO',
        )
        parser.add_argument(
            '-dcv', '--disable-cert-verification', action='store_true', help='Disables certificate verification on HTTPS calls, unsafe!'
        )
        parser.add_argument('-tc', '--trusted-cert', help='Certificate to use for HTTPS calls')

    def _common_actions_execution(self, args: argparse.Namespace) -> None:
        ArkSystemConfig.enable_color()
        ArkSystemConfig.enable_interactive()
        ArkSystemConfig.disable_verbose_logging()
        ArkSystemConfig.disallow_output()
        ArkSystemConfig.set_logger_style(args.logger_style)
        ArkSystemConfig.enable_certificate_verification()
        if args.raw:
            ArkSystemConfig.disable_color()
        if args.silent:
            ArkSystemConfig.disable_interactive()
        if args.verbose:
            ArkSystemConfig.enable_verbose_logging(args.log_level)
        if args.allow_output:
            ArkSystemConfig.allow_output()
        if args.disable_cert_verification:
            ArkSystemConfig.disable_certificate_verification()
        elif args.trusted_cert is not None:
            ArkSystemConfig.set_trusted_certificate(args.trusted_cert)
        self._logger = get_logger(app=self.__class__.__name__)
        if 'profile-name' in args:
            args.profile_name = ArkProfileLoader.deduce_profile_name(args.profile_name)
        if 'DEPLOY_ENV' not in os.environ:
            # Last fallback on deploy env
            os.environ['DEPLOY_ENV'] = 'prod'

    @abstractmethod
    def define_action(self, subparsers: argparse._SubParsersAction) -> None:
        """
        Defines the action as part of the specified subparsers group.

        Args:
            subparsers (argparse._SubParsersAction): _description_
        """

    @abstractmethod
    def run_action(self, args: argparse.Namespace) -> None:
        """
        Runs the actual action with the given arguments.

        Args:
            args (argparse.Namespace): _description_
        """

    @abstractmethod
    def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
        """
        Checks whether the given action can be run with the specified arguments.

        Args:
            action_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """

can_run_action(action_name, args) abstractmethod

Checks whether the given action can be run with the specified arguments.

Parameters:

Name Type Description Default
action_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_action.py
75
76
77
78
79
80
81
82
83
84
85
86
@abstractmethod
def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
    """
    Checks whether the given action can be run with the specified arguments.

    Args:
        action_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """

define_action(subparsers) abstractmethod

Defines the action as part of the specified subparsers group.

Parameters:

Name Type Description Default
subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_action.py
57
58
59
60
61
62
63
64
@abstractmethod
def define_action(self, subparsers: argparse._SubParsersAction) -> None:
    """
    Defines the action as part of the specified subparsers group.

    Args:
        subparsers (argparse._SubParsersAction): _description_
    """

run_action(args) abstractmethod

Runs the actual action with the given arguments.

Parameters:

Name Type Description Default
args Namespace

description

required
Source code in ark_sdk_python/actions/ark_action.py
66
67
68
69
70
71
72
73
@abstractmethod
def run_action(self, args: argparse.Namespace) -> None:
    """
    Runs the actual action with the given arguments.

    Args:
        args (argparse.Namespace): _description_
    """

ArkCacheAction

Bases: ArkAction

Source code in ark_sdk_python/actions/ark_cache_action.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ArkCacheAction(ArkAction):
    @overrides
    def define_action(self, subparsers: argparse._SubParsersAction) -> None:
        """
        Defines the CLI `cache` action, and adds the clear cache function.

        Args:
            subparsers (argparse._SubParsersAction): _description_
        """
        cache_parser: argparse.ArgumentParser = subparsers.add_parser('cache')
        self._common_actions_configuration(cache_parser)
        cache_cmd_subparsers = cache_parser.add_subparsers(dest="cache_cmd")
        cache_cmd_subparsers.required = True
        cache_cmd_subparsers.add_parser('clear', help='Clears all profiles cache')

    def __run_clear_cache_action(self) -> None:
        if isinstance(ArkKeyring.get_keyring(), BasicKeyring):
            cache_folder_path = os.path.join(os.path.expanduser('~'), DEFAULT_BASIC_KEYRING_FOLDER)
            if ARK_BASIC_KEYRING_FOLDER_ENV_VAR in os.environ:
                cache_folder_path = os.environ[ARK_BASIC_KEYRING_FOLDER_ENV_VAR]
            os.unlink(f'{cache_folder_path}{os.sep}keyring')
            os.unlink(f'{cache_folder_path}{os.sep}mac')
        else:
            ArkArgsFormatter.print_normal('Cache clear is only valid for basic keyring implementation at the moment')

    @overrides
    def run_action(self, args: argparse.Namespace) -> None:
        """
        Runs the cache action.

        Args:
            args (argparse.Namespace): _description_

        Raises:
            ArkException: _description_
        """
        if args.cache_cmd == 'clear':
            self.__run_clear_cache_action()
        else:
            raise ArkException(f'Invalid command {args.profile_cmd} given')

    @overrides
    def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
        """
        Asserts the action is `cache`.

        Args:
            action_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """
        return action_name == 'cache'

can_run_action(action_name, args)

Asserts the action is cache.

Parameters:

Name Type Description Default
action_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_cache_action.py
53
54
55
56
57
58
59
60
61
62
63
64
65
@overrides
def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
    """
    Asserts the action is `cache`.

    Args:
        action_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """
    return action_name == 'cache'

define_action(subparsers)

Defines the CLI cache action, and adds the clear cache function.

Parameters:

Name Type Description Default
subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_cache_action.py
13
14
15
16
17
18
19
20
21
22
23
24
25
@overrides
def define_action(self, subparsers: argparse._SubParsersAction) -> None:
    """
    Defines the CLI `cache` action, and adds the clear cache function.

    Args:
        subparsers (argparse._SubParsersAction): _description_
    """
    cache_parser: argparse.ArgumentParser = subparsers.add_parser('cache')
    self._common_actions_configuration(cache_parser)
    cache_cmd_subparsers = cache_parser.add_subparsers(dest="cache_cmd")
    cache_cmd_subparsers.required = True
    cache_cmd_subparsers.add_parser('clear', help='Clears all profiles cache')

run_action(args)

Runs the cache action.

Parameters:

Name Type Description Default
args Namespace

description

required

Raises:

Type Description
ArkException

description

Source code in ark_sdk_python/actions/ark_cache_action.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@overrides
def run_action(self, args: argparse.Namespace) -> None:
    """
    Runs the cache action.

    Args:
        args (argparse.Namespace): _description_

    Raises:
        ArkException: _description_
    """
    if args.cache_cmd == 'clear':
        self.__run_clear_cache_action()
    else:
        raise ArkException(f'Invalid command {args.profile_cmd} given')

ArkConfigureAction

Bases: ArkAction

Source code in ark_sdk_python/actions/ark_configure_action.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
class ArkConfigureAction(ArkAction):
    @overrides
    def define_action(self, subparsers: argparse._SubParsersAction) -> None:
        """
        Defines the CLI `configure` action.
        For each supported authenticator, sets whether it is used and adds the appropriate parameters.

        Args:
            subparsers (argparse._SubParsersAction): _description_
        """
        conf_parser: argparse.ArgumentParser = subparsers.add_parser('configure')
        self._common_actions_configuration(conf_parser)

        # Add the profile settings to the arguments
        ArkPydanticArgparse.schema_to_argparse(
            ArkProfile.schema(by_alias=False), conf_parser, ignore_keys=CONFIGURATION_IGNORED_DEFINITION_KEYS
        )

        # Add the supported authenticator settings and whether to work with them or not
        for authenticator in SUPPORTED_AUTHENTICATORS_LIST:
            conf_parser.add_argument(
                f'-ww{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}',
                f'--work-with-{authenticator.authenticator_name().replace("_", "-")}',
                action='store_true',
                help=f'Whether to work with {authenticator.authenticator_human_readable_name()} services',
            )
            if len(authenticator.supported_auth_methods()) > 1:
                conf_parser.add_argument(
                    f'-{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}am',
                    f'--{authenticator.authenticator_name().replace("_", "-")}-auth-method',
                    choices=[am.value for am in authenticator.supported_auth_methods()],
                    default=ArkAuthMethod.Default.value,
                )
            # Add the rest of the ark auth profile params
            ArkPydanticArgparse.schema_to_argparse(
                ArkAuthProfile.schema(by_alias=False),
                conf_parser,
                key_prefix=authenticator.authenticator_name().replace('_', '-'),
                ignore_keys=CONFIGURATION_IGNORED_DEFINITION_KEYS,
            )

            # Add the supported authentication methods settings of the authenticators
            for auth_method in authenticator.supported_auth_methods():
                auth_settings = ArkAuthMethodSettingsMap[auth_method]
                ArkPydanticArgparse.schema_to_argparse(
                    auth_settings.schema(by_alias=False),
                    conf_parser,
                    key_prefix=authenticator.authenticator_name().replace('_', '-'),
                    ignore_keys=CONFIGURATION_IGNORED_DEFINITION_KEYS
                    + CONFIGURATION_AUTHENTICATOR_IGNORED_DEFNITION_KEYS.get(authenticator.authenticator_name(), []),
                )

    def __run_interactive_action(self, args: argparse.Namespace) -> ArkProfile:
        """
        Performs an interactive configuration.
        The user is prompted for each Ark profile setting, with the default/defined CLI arguments.
        Selected authenticators are also configured with the user's auth methods and settings.

        Args:
            args (argparse.Namespace): _description_

        Returns:
            ArkProfile: _description_
        """
        # Load the profile first with the given profile name from the user
        profile_name = ArkArgsFormatter.get_arg(
            args, 'profile_name', 'Profile Name', ArkProfileLoader.deduce_profile_name(), prioritize_existing_val=True
        )
        profile = ArkProfileLoader.load_profile(profile_name) or ArkProfile(profile_name=profile_name)

        # Fill the rest of the profile settings
        profile_vals = ArkPydanticArgparse.argparse_to_schema_interactive(
            ArkProfile.schema(by_alias=False), args, ignored_keys=CONFIGURATION_IGNORED_INTERACTIVE_KEYS, existing_values=profile.dict()
        )
        profile = ArkPydanticArgparse.merge_by_model(ArkProfile, profile, profile_vals)
        if len(SUPPORTED_AUTHENTICATORS_LIST) == 1:
            work_with_authenticators = [SUPPORTED_AUTHENTICATORS_LIST[0].authenticator_human_readable_name()]
        else:
            work_with_authenticators = ArkArgsFormatter.get_checkbox_args(
                args,
                [f'work_with_{a.authenticator_name().replace("-", "_")}' for a in SUPPORTED_AUTHENTICATORS_LIST],
                'Which authenticators would you like to connect to',
                [a.authenticator_human_readable_name() for a in SUPPORTED_AUTHENTICATORS_LIST],
                {
                    f'work_with_{a.authenticator_name().replace("-", "_")}': a.authenticator_human_readable_name()
                    for a in SUPPORTED_AUTHENTICATORS_LIST
                    if a.authenticator_name() in profile.auth_profiles
                },
                True,
            )
        for idx, authenticator in enumerate(SUPPORTED_AUTHENTICATORS_LIST):
            # Find out if we are working with the authenticator
            auth_profile = profile.auth_profiles.get(authenticator.authenticator_name(), None) or ArkAuthProfile()
            if authenticator.authenticator_human_readable_name() in work_with_authenticators:
                # Get the authenticator auth method
                ArkArgsFormatter.print_success_bright(
                    ('\n' if idx != 0 else '') + f'â—‰ Configuring {authenticator.authenticator_human_readable_name()}',
                )
                if len(authenticator.supported_auth_methods()) > 1:
                    auth_method = ArkArgsFormatter.get_switch_arg(
                        args,
                        f'{authenticator.authenticator_name().replace("-", "_")}_auth_method',
                        'Authentication Method',
                        [ArkAuthMethodsDescriptionMap[m] for m in authenticator.supported_auth_methods()],
                        (
                            ArkAuthMethodsDescriptionMap[auth_profile.auth_method]
                            if authenticator.authenticator_name() in profile.auth_profiles
                            else ArkAuthMethodsDescriptionMap[authenticator.default_auth_method()[0]]
                        ),
                        prioritize_existing_val=True,
                    )
                    auth_method = next(
                        filter(lambda k: ArkAuthMethodsDescriptionMap[k] == auth_method, ArkAuthMethodsDescriptionMap.keys())
                    )
                    # If the default is chosen, override it by the authenticator's default
                    if auth_method == ArkAuthMethod.Default:
                        auth_method, _ = authenticator.default_auth_method()
                else:
                    auth_method, _ = authenticator.default_auth_method()
                ignored_keys = CONFIGURATION_IGNORED_INTERACTIVE_KEYS + CONFIGURATION_AUTHENTICATOR_IGNORED_INTERACTIVE_KEYS.get(
                    authenticator.authenticator_name(), []
                )
                # Get the auth profile general settings
                auth_profile_vals = ArkPydanticArgparse.argparse_to_schema_interactive(
                    ArkAuthProfile.schema(by_alias=False),
                    args,
                    ignored_keys=ignored_keys,
                    existing_values=auth_profile.dict(),
                    key_prefix=authenticator.authenticator_name(),
                )
                auth_profile = ArkPydanticArgparse.merge_by_model(
                    ArkAuthProfile,
                    auth_profile,
                    auth_profile_vals,
                    key_prefix=authenticator.authenticator_name(),
                    ignore_keys=['auth_method_settings'],
                )

                # Get the auth method settings and fill them
                method_settings = auth_profile.auth_method_settings
                if auth_method != auth_profile.auth_method:
                    method_settings = ArkAuthMethodSettingsMap[auth_method]()
                else:
                    method_settings = ArkAuthMethodSettingsMap[auth_method].parse_obj(auth_profile.auth_method_settings)
                method_settings_vals = ArkPydanticArgparse.argparse_to_schema_interactive(
                    method_settings.schema(by_alias=False),
                    args,
                    existing_values=method_settings.dict(),
                    override_aliases=CONFIGURATION_OVERRIDE_ALIASES,
                    key_prefix=authenticator.authenticator_name(),
                    ignored_keys=CONFIGURATION_IGNORED_INTERACTIVE_KEYS
                    + CONFIGURATION_AUTHENTICATOR_IGNORED_INTERACTIVE_KEYS.get(authenticator.authenticator_name(), []),
                    empty_allowed_keys=CONFIGURATION_ALLOWED_EMPTY_VALUES,
                    default_values=CONFIGURATION_AUTHENTICATORS_DEFAULTS,
                )

                # Finalize the auth profile
                auth_profile.auth_method = auth_method
                auth_profile.auth_method_settings = ArkPydanticArgparse.merge_by_model(
                    ArkAuthMethodSettingsMap[auth_method],
                    method_settings,
                    method_settings_vals,
                    key_prefix=authenticator.authenticator_name(),
                )
                profile.auth_profiles[authenticator.authenticator_name()] = auth_profile
            elif authenticator.authenticator_name() in profile.auth_profiles:
                del profile.auth_profiles[authenticator.authenticator_name()]
        return profile

    def __run_silent_action(self, args: argparse.Namespace) -> ArkProfile:
        """
        Runs the CLI configure action silently, without user interaction.

        Args:
            args (argparse.Namespace): _description_

        Raises:
            ArkException: _description_

        Returns:
            ArkProfile: _description_
        """
        # Load the profile based on the cli params and merge the rest of the params
        profile = ArkPydanticArgparse.merge_by_model(
            ArkProfile,
            ArkProfileLoader.load_profile(args.profile_name) or ArkProfile(profile_name=args.profile_name),
            ArkPydanticArgparse.argparse_to_schema(ArkProfile.schema(by_alias=False), args),
        )

        # Load the authenticators
        for authenticator in SUPPORTED_AUTHENTICATORS_LIST:
            auth_profile = profile.auth_profiles.get(authenticator.authenticator_name(), None) or ArkAuthProfile()
            if args.__dict__[f'work_with_{authenticator.authenticator_name()}']:
                if len(authenticator.supported_auth_methods()) > 1:
                    # Load the auth method
                    auth_method = ArkAuthMethod(args.__dict__[f'{authenticator.authenticator_name()}_auth_method'])

                    # If default, fallback to default auth method of the authenticator
                    if auth_method == ArkAuthMethod.Default:
                        auth_method, _ = authenticator.default_auth_method()
                else:
                    auth_method, _ = authenticator.default_auth_method()

                # Load the rest of the auth profile
                auth_profile = ArkPydanticArgparse.merge_by_model(
                    ArkAuthProfile,
                    auth_profile,
                    ArkPydanticArgparse.argparse_to_schema(
                        ArkAuthProfile.schema(by_alias=False), args, key_prefix=authenticator.authenticator_name()
                    ),
                    key_prefix=authenticator.authenticator_name(),
                )

                # Make sure if the type requires credentials, a username was supplied
                if auth_method in ArkAuthMethodsRequireCredentials and not auth_profile.username:
                    raise ArkException(f'Missing username for authenticator [{authenticator.authenticator_human_readable_name()}]')

                # Load the method settings
                method_settings = auth_profile.auth_method_settings
                if auth_method != auth_profile.auth_method:
                    method_settings = ArkAuthMethodSettingsMap[auth_method]()
                else:
                    method_settings = ArkAuthMethodSettingsMap[auth_method].parse_obj(method_settings)

                # Parse and merge the method settings from the cli
                method_settings_vals = ArkPydanticArgparse.argparse_to_schema(
                    method_settings.schema(by_alias=False), args, key_prefix=authenticator.authenticator_name()
                )

                # Remove the postfix
                method_settings_vals = {k.replace(f'{authenticator.authenticator_name()}_', ''): v for k, v in method_settings_vals.items()}

                # Finalize the auth profile
                auth_profile.auth_method = auth_method
                auth_profile.auth_method_settings = ArkPydanticArgparse.merge_by_model(
                    ArkAuthMethodSettingsMap[auth_method],
                    method_settings,
                    method_settings_vals,
                    key_prefix=authenticator.authenticator_name(),
                    defaults={
                        k.replace(f'{authenticator.authenticator_name()}_', ''): v
                        for k, v in CONFIGURATION_AUTHENTICATORS_DEFAULTS.items()
                        if k.startswith(f'{authenticator.authenticator_name()}_')
                    },
                )
                profile.auth_profiles[authenticator.authenticator_name()] = auth_profile
            elif authenticator.authenticator_name() in profile.auth_profiles:
                del profile.auth_profiles[authenticator.authenticator_name()]
        return profile

    @overrides
    def run_action(self, args: argparse.Namespace) -> None:
        """
        Runs the configure action.
        Prompts the user when interactive mode is run, based on the associated authenticators,
        and saves the configured profile when completed.

        Args:
            args (argparse.Namespace): _description_
        """
        # Parse the profile
        self._common_actions_execution(args)
        profile: Optional[ArkProfile] = None
        if ArkSystemConfig.is_interactive():
            profile = self.__run_interactive_action(args)
        else:
            profile = self.__run_silent_action(args)

        # Store it
        ArkProfileLoader.save_profile(profile)

        # Print out results
        ArkArgsFormatter.print_success(profile.json(indent=4))
        ArkArgsFormatter.print_success_bright(
            f"Profile has been saved to {ArkProfileLoader.profiles_folder()}",
        )

    @overrides
    def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
        """
        Asserts the action is `configure`.

        Args:
            action_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """
        return action_name == 'configure'

__run_interactive_action(args)

Performs an interactive configuration. The user is prompted for each Ark profile setting, with the default/defined CLI arguments. Selected authenticators are also configured with the user's auth methods and settings.

Parameters:

Name Type Description Default
args Namespace

description

required

Returns:

Name Type Description
ArkProfile ArkProfile

description

Source code in ark_sdk_python/actions/ark_configure_action.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def __run_interactive_action(self, args: argparse.Namespace) -> ArkProfile:
    """
    Performs an interactive configuration.
    The user is prompted for each Ark profile setting, with the default/defined CLI arguments.
    Selected authenticators are also configured with the user's auth methods and settings.

    Args:
        args (argparse.Namespace): _description_

    Returns:
        ArkProfile: _description_
    """
    # Load the profile first with the given profile name from the user
    profile_name = ArkArgsFormatter.get_arg(
        args, 'profile_name', 'Profile Name', ArkProfileLoader.deduce_profile_name(), prioritize_existing_val=True
    )
    profile = ArkProfileLoader.load_profile(profile_name) or ArkProfile(profile_name=profile_name)

    # Fill the rest of the profile settings
    profile_vals = ArkPydanticArgparse.argparse_to_schema_interactive(
        ArkProfile.schema(by_alias=False), args, ignored_keys=CONFIGURATION_IGNORED_INTERACTIVE_KEYS, existing_values=profile.dict()
    )
    profile = ArkPydanticArgparse.merge_by_model(ArkProfile, profile, profile_vals)
    if len(SUPPORTED_AUTHENTICATORS_LIST) == 1:
        work_with_authenticators = [SUPPORTED_AUTHENTICATORS_LIST[0].authenticator_human_readable_name()]
    else:
        work_with_authenticators = ArkArgsFormatter.get_checkbox_args(
            args,
            [f'work_with_{a.authenticator_name().replace("-", "_")}' for a in SUPPORTED_AUTHENTICATORS_LIST],
            'Which authenticators would you like to connect to',
            [a.authenticator_human_readable_name() for a in SUPPORTED_AUTHENTICATORS_LIST],
            {
                f'work_with_{a.authenticator_name().replace("-", "_")}': a.authenticator_human_readable_name()
                for a in SUPPORTED_AUTHENTICATORS_LIST
                if a.authenticator_name() in profile.auth_profiles
            },
            True,
        )
    for idx, authenticator in enumerate(SUPPORTED_AUTHENTICATORS_LIST):
        # Find out if we are working with the authenticator
        auth_profile = profile.auth_profiles.get(authenticator.authenticator_name(), None) or ArkAuthProfile()
        if authenticator.authenticator_human_readable_name() in work_with_authenticators:
            # Get the authenticator auth method
            ArkArgsFormatter.print_success_bright(
                ('\n' if idx != 0 else '') + f'â—‰ Configuring {authenticator.authenticator_human_readable_name()}',
            )
            if len(authenticator.supported_auth_methods()) > 1:
                auth_method = ArkArgsFormatter.get_switch_arg(
                    args,
                    f'{authenticator.authenticator_name().replace("-", "_")}_auth_method',
                    'Authentication Method',
                    [ArkAuthMethodsDescriptionMap[m] for m in authenticator.supported_auth_methods()],
                    (
                        ArkAuthMethodsDescriptionMap[auth_profile.auth_method]
                        if authenticator.authenticator_name() in profile.auth_profiles
                        else ArkAuthMethodsDescriptionMap[authenticator.default_auth_method()[0]]
                    ),
                    prioritize_existing_val=True,
                )
                auth_method = next(
                    filter(lambda k: ArkAuthMethodsDescriptionMap[k] == auth_method, ArkAuthMethodsDescriptionMap.keys())
                )
                # If the default is chosen, override it by the authenticator's default
                if auth_method == ArkAuthMethod.Default:
                    auth_method, _ = authenticator.default_auth_method()
            else:
                auth_method, _ = authenticator.default_auth_method()
            ignored_keys = CONFIGURATION_IGNORED_INTERACTIVE_KEYS + CONFIGURATION_AUTHENTICATOR_IGNORED_INTERACTIVE_KEYS.get(
                authenticator.authenticator_name(), []
            )
            # Get the auth profile general settings
            auth_profile_vals = ArkPydanticArgparse.argparse_to_schema_interactive(
                ArkAuthProfile.schema(by_alias=False),
                args,
                ignored_keys=ignored_keys,
                existing_values=auth_profile.dict(),
                key_prefix=authenticator.authenticator_name(),
            )
            auth_profile = ArkPydanticArgparse.merge_by_model(
                ArkAuthProfile,
                auth_profile,
                auth_profile_vals,
                key_prefix=authenticator.authenticator_name(),
                ignore_keys=['auth_method_settings'],
            )

            # Get the auth method settings and fill them
            method_settings = auth_profile.auth_method_settings
            if auth_method != auth_profile.auth_method:
                method_settings = ArkAuthMethodSettingsMap[auth_method]()
            else:
                method_settings = ArkAuthMethodSettingsMap[auth_method].parse_obj(auth_profile.auth_method_settings)
            method_settings_vals = ArkPydanticArgparse.argparse_to_schema_interactive(
                method_settings.schema(by_alias=False),
                args,
                existing_values=method_settings.dict(),
                override_aliases=CONFIGURATION_OVERRIDE_ALIASES,
                key_prefix=authenticator.authenticator_name(),
                ignored_keys=CONFIGURATION_IGNORED_INTERACTIVE_KEYS
                + CONFIGURATION_AUTHENTICATOR_IGNORED_INTERACTIVE_KEYS.get(authenticator.authenticator_name(), []),
                empty_allowed_keys=CONFIGURATION_ALLOWED_EMPTY_VALUES,
                default_values=CONFIGURATION_AUTHENTICATORS_DEFAULTS,
            )

            # Finalize the auth profile
            auth_profile.auth_method = auth_method
            auth_profile.auth_method_settings = ArkPydanticArgparse.merge_by_model(
                ArkAuthMethodSettingsMap[auth_method],
                method_settings,
                method_settings_vals,
                key_prefix=authenticator.authenticator_name(),
            )
            profile.auth_profiles[authenticator.authenticator_name()] = auth_profile
        elif authenticator.authenticator_name() in profile.auth_profiles:
            del profile.auth_profiles[authenticator.authenticator_name()]
    return profile

__run_silent_action(args)

Runs the CLI configure action silently, without user interaction.

Parameters:

Name Type Description Default
args Namespace

description

required

Raises:

Type Description
ArkException

description

Returns:

Name Type Description
ArkProfile ArkProfile

description

Source code in ark_sdk_python/actions/ark_configure_action.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def __run_silent_action(self, args: argparse.Namespace) -> ArkProfile:
    """
    Runs the CLI configure action silently, without user interaction.

    Args:
        args (argparse.Namespace): _description_

    Raises:
        ArkException: _description_

    Returns:
        ArkProfile: _description_
    """
    # Load the profile based on the cli params and merge the rest of the params
    profile = ArkPydanticArgparse.merge_by_model(
        ArkProfile,
        ArkProfileLoader.load_profile(args.profile_name) or ArkProfile(profile_name=args.profile_name),
        ArkPydanticArgparse.argparse_to_schema(ArkProfile.schema(by_alias=False), args),
    )

    # Load the authenticators
    for authenticator in SUPPORTED_AUTHENTICATORS_LIST:
        auth_profile = profile.auth_profiles.get(authenticator.authenticator_name(), None) or ArkAuthProfile()
        if args.__dict__[f'work_with_{authenticator.authenticator_name()}']:
            if len(authenticator.supported_auth_methods()) > 1:
                # Load the auth method
                auth_method = ArkAuthMethod(args.__dict__[f'{authenticator.authenticator_name()}_auth_method'])

                # If default, fallback to default auth method of the authenticator
                if auth_method == ArkAuthMethod.Default:
                    auth_method, _ = authenticator.default_auth_method()
            else:
                auth_method, _ = authenticator.default_auth_method()

            # Load the rest of the auth profile
            auth_profile = ArkPydanticArgparse.merge_by_model(
                ArkAuthProfile,
                auth_profile,
                ArkPydanticArgparse.argparse_to_schema(
                    ArkAuthProfile.schema(by_alias=False), args, key_prefix=authenticator.authenticator_name()
                ),
                key_prefix=authenticator.authenticator_name(),
            )

            # Make sure if the type requires credentials, a username was supplied
            if auth_method in ArkAuthMethodsRequireCredentials and not auth_profile.username:
                raise ArkException(f'Missing username for authenticator [{authenticator.authenticator_human_readable_name()}]')

            # Load the method settings
            method_settings = auth_profile.auth_method_settings
            if auth_method != auth_profile.auth_method:
                method_settings = ArkAuthMethodSettingsMap[auth_method]()
            else:
                method_settings = ArkAuthMethodSettingsMap[auth_method].parse_obj(method_settings)

            # Parse and merge the method settings from the cli
            method_settings_vals = ArkPydanticArgparse.argparse_to_schema(
                method_settings.schema(by_alias=False), args, key_prefix=authenticator.authenticator_name()
            )

            # Remove the postfix
            method_settings_vals = {k.replace(f'{authenticator.authenticator_name()}_', ''): v for k, v in method_settings_vals.items()}

            # Finalize the auth profile
            auth_profile.auth_method = auth_method
            auth_profile.auth_method_settings = ArkPydanticArgparse.merge_by_model(
                ArkAuthMethodSettingsMap[auth_method],
                method_settings,
                method_settings_vals,
                key_prefix=authenticator.authenticator_name(),
                defaults={
                    k.replace(f'{authenticator.authenticator_name()}_', ''): v
                    for k, v in CONFIGURATION_AUTHENTICATORS_DEFAULTS.items()
                    if k.startswith(f'{authenticator.authenticator_name()}_')
                },
            )
            profile.auth_profiles[authenticator.authenticator_name()] = auth_profile
        elif authenticator.authenticator_name() in profile.auth_profiles:
            del profile.auth_profiles[authenticator.authenticator_name()]
    return profile

can_run_action(action_name, args)

Asserts the action is configure.

Parameters:

Name Type Description Default
action_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_configure_action.py
306
307
308
309
310
311
312
313
314
315
316
317
318
@overrides
def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
    """
    Asserts the action is `configure`.

    Args:
        action_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """
    return action_name == 'configure'

define_action(subparsers)

Defines the CLI configure action. For each supported authenticator, sets whether it is used and adds the appropriate parameters.

Parameters:

Name Type Description Default
subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_configure_action.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@overrides
def define_action(self, subparsers: argparse._SubParsersAction) -> None:
    """
    Defines the CLI `configure` action.
    For each supported authenticator, sets whether it is used and adds the appropriate parameters.

    Args:
        subparsers (argparse._SubParsersAction): _description_
    """
    conf_parser: argparse.ArgumentParser = subparsers.add_parser('configure')
    self._common_actions_configuration(conf_parser)

    # Add the profile settings to the arguments
    ArkPydanticArgparse.schema_to_argparse(
        ArkProfile.schema(by_alias=False), conf_parser, ignore_keys=CONFIGURATION_IGNORED_DEFINITION_KEYS
    )

    # Add the supported authenticator settings and whether to work with them or not
    for authenticator in SUPPORTED_AUTHENTICATORS_LIST:
        conf_parser.add_argument(
            f'-ww{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}',
            f'--work-with-{authenticator.authenticator_name().replace("_", "-")}',
            action='store_true',
            help=f'Whether to work with {authenticator.authenticator_human_readable_name()} services',
        )
        if len(authenticator.supported_auth_methods()) > 1:
            conf_parser.add_argument(
                f'-{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}am',
                f'--{authenticator.authenticator_name().replace("_", "-")}-auth-method',
                choices=[am.value for am in authenticator.supported_auth_methods()],
                default=ArkAuthMethod.Default.value,
            )
        # Add the rest of the ark auth profile params
        ArkPydanticArgparse.schema_to_argparse(
            ArkAuthProfile.schema(by_alias=False),
            conf_parser,
            key_prefix=authenticator.authenticator_name().replace('_', '-'),
            ignore_keys=CONFIGURATION_IGNORED_DEFINITION_KEYS,
        )

        # Add the supported authentication methods settings of the authenticators
        for auth_method in authenticator.supported_auth_methods():
            auth_settings = ArkAuthMethodSettingsMap[auth_method]
            ArkPydanticArgparse.schema_to_argparse(
                auth_settings.schema(by_alias=False),
                conf_parser,
                key_prefix=authenticator.authenticator_name().replace('_', '-'),
                ignore_keys=CONFIGURATION_IGNORED_DEFINITION_KEYS
                + CONFIGURATION_AUTHENTICATOR_IGNORED_DEFNITION_KEYS.get(authenticator.authenticator_name(), []),
            )

run_action(args)

Runs the configure action. Prompts the user when interactive mode is run, based on the associated authenticators, and saves the configured profile when completed.

Parameters:

Name Type Description Default
args Namespace

description

required
Source code in ark_sdk_python/actions/ark_configure_action.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@overrides
def run_action(self, args: argparse.Namespace) -> None:
    """
    Runs the configure action.
    Prompts the user when interactive mode is run, based on the associated authenticators,
    and saves the configured profile when completed.

    Args:
        args (argparse.Namespace): _description_
    """
    # Parse the profile
    self._common_actions_execution(args)
    profile: Optional[ArkProfile] = None
    if ArkSystemConfig.is_interactive():
        profile = self.__run_interactive_action(args)
    else:
        profile = self.__run_silent_action(args)

    # Store it
    ArkProfileLoader.save_profile(profile)

    # Print out results
    ArkArgsFormatter.print_success(profile.json(indent=4))
    ArkArgsFormatter.print_success_bright(
        f"Profile has been saved to {ArkProfileLoader.profiles_folder()}",
    )

ArkExecAction

Bases: ArkAction

Source code in ark_sdk_python/actions/ark_exec_action.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
class ArkExecAction(ArkAction):
    def _serialize_output(self, output: Optional[Union[List, Dict, ArkModel, Generator, Tuple, Any]]) -> str:
        if output is None:
            return ''
        if isinstance(output, Generator):
            return self._serialize_output(list(itertools.chain.from_iterable([p.items for p in output])))
        if isinstance(output, list):
            return json.dumps(
                [json.loads(a.json(by_alias=False, exclude={'poll_progress_callback'})) for a in output if a is not None], indent=4
            )
        elif isinstance(output, tuple):
            return json.dumps(
                [json.loads(a.json(by_alias=False, exclude={'poll_progress_callback'})) for a in output if a is not None], indent=4
            )
        elif isinstance(output, dict):
            return json.dumps(
                {
                    k: json.loads(v.json(indent=4, by_alias=False, exclude={'poll_progress_callback'}))
                    for k, v in output.items()
                    if k is not None and v is not None
                },
                indent=4,
            )
        elif issubclass(type(output), ArkModel):
            return output.json(indent=4, by_alias=False, exclude={'poll_progress_callback'})
        elif issubclass(type(output), ArkAsyncRequest):
            return output.async_task.json(indent=4, by_alias=False)
        return str(output)

    def _write_output_to_file(self, output_path: str, serialized_output: str) -> None:
        output_path = os.path.abspath(output_path)
        if not os.path.exists(os.path.dirname(output_path)):
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(serialized_output)

    def _run_async_action(
        self, service: ArkService, schemas_map: Dict[str, Optional[Type[ArkModel]]], action: str, args: argparse.Namespace
    ) -> None:
        try:
            model_type: Type[ArkPollableModel] = schemas_map[action.replace('_', '-')]
            model: ArkPollableModel = model_type.parse_obj(ArkPydanticArgparse.argparse_to_schema(model_type.schema(), args))
            model.poll_progress_callback = ArkPollers.default_poller()
            output = getattr(service, action.replace('-', '_'))(model)
            async_req = None
            if issubclass(type(output), ArkAsyncRequest):
                async_req = output
            elif isinstance(output, tuple):
                for a in output:
                    if issubclass(type(a), ArkAsyncRequest):
                        async_req = a
                        break
            if async_req is not None:
                if args.output_path:
                    self._write_output_to_file(
                        args.output_path, async_req.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})
                    )
                if async_req.task_failed():
                    if async_req.task_timeout():
                        ArkArgsFormatter.print_warning(
                            f'Failed to execute async command due to timeout, error:\n{async_req.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                        )
                        raise ArkException(
                            f'Failed to execute async command due to timeout, error:\n{async_req.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                        )
                    else:
                        ArkArgsFormatter.print_failure(
                            f'Failed to execute async command, error:\n{async_req.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                        )
                        raise ArkException(
                            f'Failed to execute async command, error:\n{async_req.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                        )
                else:
                    ArkArgsFormatter.print_success(self._serialize_output(output))
            elif isinstance(output, list) and all(issubclass(type(ar), ArkAsyncRequest) for ar in output):
                if args.output_path:
                    self._write_output_to_file(
                        args.output_path,
                        json.dumps([ar.async_task.dict(indent=4, by_alias=False, exclude={"poll_progress_callback"}) for ar in output]),
                    )
                for ar in output:
                    if ar.task_failed():
                        if ar.task_timeout():
                            ArkArgsFormatter.print_warning(
                                f'Failed to execute async command due to timeout, error:\n{ar.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                            )
                            raise ArkException(
                                f'Failed to execute async command due to timeout, error:\n{ar.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                            )
                        else:
                            ArkArgsFormatter.print_failure(
                                f'Failed to execute async command, error:\n{ar.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                            )
                            raise ArkException(
                                f'Failed to execute async command, error:\n{ar.async_task.json(indent=4, by_alias=False, exclude={"poll_progress_callback"})}',
                            )
                    else:
                        ArkArgsFormatter.print_success(self._serialize_output(ar))
            elif output is not None:
                ArkArgsFormatter.print_success(self._serialize_output(output))
        except Exception as ex:
            self._logger.exception(f'Failed running async command {action}')
            ArkArgsFormatter.print_failure(f'Failed to execute async command, error:\n{str(ex)}')
            self._logger.debug(traceback.format_exc())
            raise ex

    def _run_sync_action(
        self, service: ArkService, schemas_map: Dict[str, Optional[Type[ArkModel]]], action: str, args: argparse.Namespace
    ) -> None:
        try:
            model_type: Type[ArkPollableModel] = schemas_map[action.replace('_', '-')]
            if model_type:
                model: ArkModel = model_type.parse_obj(ArkPydanticArgparse.argparse_to_schema(model_type.schema(), args))
                output = getattr(service, action.replace('-', '_'))(model)
            else:
                output = getattr(service, action.replace('-', '_'))()
            if output is not None:
                serialized_output: str = self._serialize_output(output)
                if args.output_path:
                    self._write_output_to_file(args.output_path, serialized_output)
                ArkArgsFormatter.print_success(serialized_output)
            else:
                ArkArgsFormatter.print_success(f'{action.replace("-", " ").title()} finished successfully')
        except Exception as ex:
            self._logger.exception(f'Failed running command {action}')
            ArkArgsFormatter.print_failure(f'Failed to execute command, error:\n{str(ex)}')
            self._logger.debug(traceback.format_exc())
            raise ex

    def _define_actions_by_schemas(
        self,
        subparsers: argparse._SubParsersAction,
        schemas_map: Dict[str, Optional[Type[ArkModel]]],
        defaults_map: Optional[Dict[str, Dict[str, Any]]] = None,
    ):
        for action, schema in schemas_map.items():
            parser = subparsers.add_parser(action)
            if schema:
                ArkPydanticArgparse.schema_to_argparse(
                    schema.schema(), parser, defaults=defaults_map.get(action, None) if defaults_map else None
                )

    @overrides
    def define_action(self, subparsers: argparse._SubParsersAction) -> None:
        """
        Defines the CLI `exec` action, with its subparsers (args) for the service.

        Args:
            subparsers (argparse._SubParsersAction): _description_
        """
        exec_parser = None
        exec_subparsers = None
        # Check if the exec subparser already exists from previous definitions of a service
        if 'exec' in subparsers._name_parser_map.keys():  # pylint: disable=protected-access
            # Retrieve the existing exec parser
            exec_parser = subparsers._name_parser_map['exec']  # pylint: disable=protected-access
            exec_subparsers = exec_parser._subparsers._group_actions[0]  # pylint: disable=protected-access
        else:
            # Create a new exec parser
            exec_parser = subparsers.add_parser('exec')
            self._common_actions_configuration(exec_parser)
            exec_parser.add_argument('-pn', '--profile-name', default=ArkProfileLoader.default_profile_name(), help='Profile name to load')
            exec_parser.add_argument('-op', '--output-path', help='Output file to write data to')
            exec_parser.add_argument('-rf', '--request-file', help='Request file containing the parameters for the exec action')
            exec_parser.add_argument('-rc', '--retry-count', type=int, help='Retry count for execution', default=1)
            exec_parser.add_argument(
                '-ra',
                '--refresh-auth',
                action='store_true',
                help='If possible, will try to refresh the active authentication before running the actual command',
            )
            exec_subparsers = exec_parser.add_subparsers(dest="command")
            exec_subparsers.required = True
        self.define_exec_action(exec_subparsers)

    @overrides
    def run_action(self, args: argparse.Namespace) -> None:
        """
        Runs the exec action.
        Loads the authenticators from the cache and connects to the API using the loaded authenticators.
        Each service is created from the API, based on the given authenticators, and then
        runs the exec action using the API.

        Args:
            args (argparse.Namespace): _description_

        Raises:
            ArkException: _description_
            ArkException: _description_
        """
        self._common_actions_execution(args)
        profile = ArkProfileLoader.load_profile(ArkProfileLoader.deduce_profile_name(args.profile_name))
        if not profile:
            raise ArkException('Please configure a profile and login before trying to exec')

        # Load token from cache for each auth profile
        authenticators: List[ArkAuth] = []
        for authenticator_name in profile.auth_profiles.keys():
            authenticator = SUPPORTED_AUTHENTICATORS[authenticator_name]()
            if not authenticator.load_authentication(profile, args.refresh_auth):
                continue
            authenticators.append(authenticator)

        if len(authenticators) == 0:
            raise ArkException(
                'Failed to load authenticators, tokens are either expired or authenticators are not logged in, please login first'
            )
        if len(authenticators) != len(profile.auth_profiles) and ArkSystemConfig.is_interactive():
            ArkArgsFormatter.print_colored('Not all authenticators are logged in, some of the functionality will be disabled')

        # Create the CLI API with the authenticators
        api = ArkCLIAPI(authenticators, profile)

        # Run the actual exec fitting action with the api
        # Run it with retries as per defined by user
        retry_call(
            self.run_exec_action,
            fargs=[api, args],
            tries=args.retry_count,
            delay=1,
            logger=namedtuple("logger", ("warning"))(
                warning=lambda _1, _2, delay: ArkArgsFormatter.print_failure(f"Retrying in {delay} seconds")
            ),
        )

    @overrides
    def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
        """
        Asserts the action is `exec`.

        Args:
            action_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """
        return action_name == 'exec' and self.can_run_exec_action(args.command, args)

    @abstractmethod
    def define_exec_action(self, exec_subparsers: argparse._SubParsersAction) -> None:
        """
        Defines an exec action, and its specified configurations and args, for a service.

        Args:
            exec_subparsers (argparse._SubParsersAction): _description_
        """

    @abstractmethod
    def run_exec_action(self, api: ArkCLIAPI, args: argparse.Namespace) -> None:
        """
        Runs the exec action for a service with the specified arguments and API.

        Args:
            api (ArkCLIAPI): _description_
            args (argparse.Namespace): _description_
        """

    @abstractmethod
    def can_run_exec_action(self, command_name: str, args: argparse.Namespace) -> bool:
        """
        Checks whether the specified exec service action can be run.

        Args:
            command_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """

can_run_action(action_name, args)

Asserts the action is exec.

Parameters:

Name Type Description Default
action_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_exec_action.py
250
251
252
253
254
255
256
257
258
259
260
261
262
@overrides
def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
    """
    Asserts the action is `exec`.

    Args:
        action_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """
    return action_name == 'exec' and self.can_run_exec_action(args.command, args)

can_run_exec_action(command_name, args) abstractmethod

Checks whether the specified exec service action can be run.

Parameters:

Name Type Description Default
command_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_exec_action.py
283
284
285
286
287
288
289
290
291
292
293
294
@abstractmethod
def can_run_exec_action(self, command_name: str, args: argparse.Namespace) -> bool:
    """
    Checks whether the specified exec service action can be run.

    Args:
        command_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """

define_action(subparsers)

Defines the CLI exec action, with its subparsers (args) for the service.

Parameters:

Name Type Description Default
subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_exec_action.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@overrides
def define_action(self, subparsers: argparse._SubParsersAction) -> None:
    """
    Defines the CLI `exec` action, with its subparsers (args) for the service.

    Args:
        subparsers (argparse._SubParsersAction): _description_
    """
    exec_parser = None
    exec_subparsers = None
    # Check if the exec subparser already exists from previous definitions of a service
    if 'exec' in subparsers._name_parser_map.keys():  # pylint: disable=protected-access
        # Retrieve the existing exec parser
        exec_parser = subparsers._name_parser_map['exec']  # pylint: disable=protected-access
        exec_subparsers = exec_parser._subparsers._group_actions[0]  # pylint: disable=protected-access
    else:
        # Create a new exec parser
        exec_parser = subparsers.add_parser('exec')
        self._common_actions_configuration(exec_parser)
        exec_parser.add_argument('-pn', '--profile-name', default=ArkProfileLoader.default_profile_name(), help='Profile name to load')
        exec_parser.add_argument('-op', '--output-path', help='Output file to write data to')
        exec_parser.add_argument('-rf', '--request-file', help='Request file containing the parameters for the exec action')
        exec_parser.add_argument('-rc', '--retry-count', type=int, help='Retry count for execution', default=1)
        exec_parser.add_argument(
            '-ra',
            '--refresh-auth',
            action='store_true',
            help='If possible, will try to refresh the active authentication before running the actual command',
        )
        exec_subparsers = exec_parser.add_subparsers(dest="command")
        exec_subparsers.required = True
    self.define_exec_action(exec_subparsers)

define_exec_action(exec_subparsers) abstractmethod

Defines an exec action, and its specified configurations and args, for a service.

Parameters:

Name Type Description Default
exec_subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_exec_action.py
264
265
266
267
268
269
270
271
@abstractmethod
def define_exec_action(self, exec_subparsers: argparse._SubParsersAction) -> None:
    """
    Defines an exec action, and its specified configurations and args, for a service.

    Args:
        exec_subparsers (argparse._SubParsersAction): _description_
    """

run_action(args)

Runs the exec action. Loads the authenticators from the cache and connects to the API using the loaded authenticators. Each service is created from the API, based on the given authenticators, and then runs the exec action using the API.

Parameters:

Name Type Description Default
args Namespace

description

required

Raises:

Type Description
ArkException

description

ArkException

description

Source code in ark_sdk_python/actions/ark_exec_action.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
@overrides
def run_action(self, args: argparse.Namespace) -> None:
    """
    Runs the exec action.
    Loads the authenticators from the cache and connects to the API using the loaded authenticators.
    Each service is created from the API, based on the given authenticators, and then
    runs the exec action using the API.

    Args:
        args (argparse.Namespace): _description_

    Raises:
        ArkException: _description_
        ArkException: _description_
    """
    self._common_actions_execution(args)
    profile = ArkProfileLoader.load_profile(ArkProfileLoader.deduce_profile_name(args.profile_name))
    if not profile:
        raise ArkException('Please configure a profile and login before trying to exec')

    # Load token from cache for each auth profile
    authenticators: List[ArkAuth] = []
    for authenticator_name in profile.auth_profiles.keys():
        authenticator = SUPPORTED_AUTHENTICATORS[authenticator_name]()
        if not authenticator.load_authentication(profile, args.refresh_auth):
            continue
        authenticators.append(authenticator)

    if len(authenticators) == 0:
        raise ArkException(
            'Failed to load authenticators, tokens are either expired or authenticators are not logged in, please login first'
        )
    if len(authenticators) != len(profile.auth_profiles) and ArkSystemConfig.is_interactive():
        ArkArgsFormatter.print_colored('Not all authenticators are logged in, some of the functionality will be disabled')

    # Create the CLI API with the authenticators
    api = ArkCLIAPI(authenticators, profile)

    # Run the actual exec fitting action with the api
    # Run it with retries as per defined by user
    retry_call(
        self.run_exec_action,
        fargs=[api, args],
        tries=args.retry_count,
        delay=1,
        logger=namedtuple("logger", ("warning"))(
            warning=lambda _1, _2, delay: ArkArgsFormatter.print_failure(f"Retrying in {delay} seconds")
        ),
    )

run_exec_action(api, args) abstractmethod

Runs the exec action for a service with the specified arguments and API.

Parameters:

Name Type Description Default
api ArkCLIAPI

description

required
args Namespace

description

required
Source code in ark_sdk_python/actions/ark_exec_action.py
273
274
275
276
277
278
279
280
281
@abstractmethod
def run_exec_action(self, api: ArkCLIAPI, args: argparse.Namespace) -> None:
    """
    Runs the exec action for a service with the specified arguments and API.

    Args:
        api (ArkCLIAPI): _description_
        args (argparse.Namespace): _description_
    """

ArkLoginAction

Bases: ArkAction

Source code in ark_sdk_python/actions/ark_login_action.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class ArkLoginAction(ArkAction):
    @overrides
    def define_action(self, subparsers: argparse._SubParsersAction) -> None:
        """
        Defines the CLI `login` action.
        For each supported authenticator, adds the username/secret params for logging in.

        Args:
            subparsers (argparse._SubParsersAction): _description_
        """
        login_parser: argparse.ArgumentParser = subparsers.add_parser('login')
        self._common_actions_configuration(login_parser)
        login_parser.add_argument('-pn', '--profile-name', default=ArkProfileLoader.default_profile_name(), help='Profile name to load')
        login_parser.add_argument('-f', '--force', action='store_true', help='Whether to force login even thou token has not expired yet')
        login_parser.add_argument(
            '-nss',
            '--no-shared-secrets',
            action='store_true',
            help='Do not share secrets between different authenticators with the same username',
        )
        login_parser.add_argument('-st', '--show-tokens', action='store_true', help='Print out tokens as well if not silent')
        login_parser.add_argument('-ra', '--refresh-auth', action='store_true', help='If a cache exists, will also try to refresh it')

        # Add username and secret for each authenticator for logging in
        for authenticator in SUPPORTED_AUTHENTICATORS_LIST:
            login_parser.add_argument(
                f'-{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}u',
                f'--{authenticator.authenticator_name().replace("_", "-")}-username',
                help=f'Username to authenticate with to {authenticator.authenticator_human_readable_name()}',
            )
            login_parser.add_argument(
                f'-{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}s',
                f'--{authenticator.authenticator_name().replace("_", "-")}-secret',
                help=f'Secret to authenticate with to {authenticator.authenticator_human_readable_name()}',
            )

    @overrides
    def run_action(self, args: argparse.Namespace) -> None:
        """
        Runs the login action for each authenticator.
        After a login completes, credentials are stored in the keyring for future use.

        Args:
            args (argparse.Namespace): _description_

        Raises:
            ArkException: _description_
            ArkException: _description_
        """
        self._common_actions_execution(args)
        # Load up the profile
        profile = ArkProfileLoader.load_profile(ArkProfileLoader.deduce_profile_name(args.profile_name))
        if not profile:
            raise ArkException('Please configure a profile before trying to login')

        # Login for each auth profile
        # Share secrets between authenticators for allowed auth methods
        shared_secrets_map: Dict[ArkAuthMethod, List[Tuple[str, ArkSecret]]] = {k: [] for k in ArkAuthMethodSharableCredentials}
        # Save tokens for finalization output
        tokens_map: Dict[str, ArkToken] = dict()
        for authenticator_name, auth_profile in profile.auth_profiles.items():
            authenticator = SUPPORTED_AUTHENTICATORS[authenticator_name]()

            # Only perform the authentication if not already authenticated and not forced
            if authenticator.is_authenticated(profile) and not args.force:
                try:
                    if args.refresh_auth:
                        token = authenticator.load_authentication(profile, True)
                        if token:
                            ArkArgsFormatter.print_success(
                                f'{authenticator.authenticator_human_readable_name()} Authentication Refreshed',
                            )
                        else:
                            raise ArkException(f'{authenticator.authenticator_name()} authentication failed to be refreshed')
                    else:
                        ArkArgsFormatter.print_success(
                            f'{authenticator.authenticator_human_readable_name()} Already Authenticated',
                        )
                    continue
                except Exception as ex:
                    self._logger.info(
                        f'{authenticator.authenticator_human_readable_name()} Failed to refresh token, performing normal login [{str(ex)}]'
                    )
            secret = (
                ArkSecret(secret=args.__dict__[f'{authenticator_name}_secret']) if args.__dict__[f'{authenticator_name}_secret'] else None
            )
            auth_profile.username = args.__dict__[f'{authenticator_name}_username'] or auth_profile.username

            # Ask the user for the user and secret if interactive
            if ArkSystemConfig.is_interactive() and auth_profile.auth_method in ArkAuthMethodsRequireCredentials:
                auth_profile.username = ArkArgsFormatter.get_arg(
                    args,
                    f'{authenticator_name}_username',
                    f'{authenticator.authenticator_human_readable_name()} Username',
                    auth_profile.username,
                )
                # Check if there is an existing secret who is sharable
                if (
                    auth_profile.auth_method in ArkAuthMethodSharableCredentials
                    and auth_profile.auth_method in shared_secrets_map
                    and any(auth_profile.username == s[0] for s in shared_secrets_map[auth_profile.auth_method])
                    and not args.__dict__[f'{authenticator_name}_secret']
                    and not args.no_shared_secrets
                ):
                    secret = next(filter(lambda s: auth_profile.username == s[0], shared_secrets_map[auth_profile.auth_method]))[1]
                else:
                    if not args.force and (
                        (
                            auth_profile.auth_method == ArkAuthMethod.Identity
                            and ArkIdentity.has_cache_record(profile, auth_profile.username, args.refresh_auth)
                        )
                    ):
                        # Check if there is a secret already cached, if there is, no need to ask for password
                        secret = ArkSecret(secret='')
                    else:
                        # Check if we really need to ask for a password in specific use cases
                        if (
                            authenticator_name == 'isp'
                            and auth_profile.auth_method == ArkAuthMethod.Identity
                            and ArkIdentity.is_idp_user(
                                auth_profile.username,
                                (
                                    auth_profile.auth_method_settings.identity_url
                                    if isinstance(auth_profile.auth_method_settings, IdentityArkAuthMethodSettings)
                                    else None
                                ),
                                (
                                    auth_profile.auth_method_settings.identity_tenant_subdomain
                                    if isinstance(auth_profile.auth_method_settings, IdentityArkAuthMethodSettings)
                                    else None
                                ),
                            )
                        ):
                            secret = ArkSecret(secret='')
                        else:
                            secret = ArkSecret(
                                secret=ArkArgsFormatter.get_arg(
                                    args,
                                    f'{authenticator_name}_secret',
                                    f'{authenticator.authenticator_human_readable_name()} Secret',
                                    hidden=True,
                                )
                            )
            elif (
                not ArkSystemConfig.is_interactive()
                and auth_profile.auth_method in ArkAuthMethodsRequireCredentials
                and not args.__dict__[f'{authenticator_name}_secret']
            ):
                raise ArkException(
                    f'{authenticator_name}-secret argument is required if authenticating to {authenticator.authenticator_human_readable_name()}'
                )
            # Perform the authentication, will also cache the token
            token = authenticator.authenticate(profile=profile, secret=secret, force=args.force, refresh_auth=args.refresh_auth)
            if not args.no_shared_secrets and auth_profile.auth_method in ArkAuthMethodSharableCredentials:
                shared_secrets_map[auth_profile.auth_method].append((auth_profile.username, secret))
            tokens_map[authenticator.authenticator_human_readable_name()] = token
        if not args.show_tokens and tokens_map:
            ArkArgsFormatter.print_success('Login tokens are hidden')
        for k, v in tokens_map.items():
            if 'cookies' in v.metadata:
                del v.metadata['cookies']
            ArkArgsFormatter.print_success(
                f'{k} Token\n{v.json(indent=4, exclude={} if args.show_tokens else {"token", "refresh_token"})}',
            )

    @overrides
    def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
        """
        Asserts the action is `login`.

        Args:
            action_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """
        return action_name == 'login'

can_run_action(action_name, args)

Asserts the action is login.

Parameters:

Name Type Description Default
action_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_login_action.py
188
189
190
191
192
193
194
195
196
197
198
199
200
@overrides
def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
    """
    Asserts the action is `login`.

    Args:
        action_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """
    return action_name == 'login'

define_action(subparsers)

Defines the CLI login action. For each supported authenticator, adds the username/secret params for logging in.

Parameters:

Name Type Description Default
subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_login_action.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@overrides
def define_action(self, subparsers: argparse._SubParsersAction) -> None:
    """
    Defines the CLI `login` action.
    For each supported authenticator, adds the username/secret params for logging in.

    Args:
        subparsers (argparse._SubParsersAction): _description_
    """
    login_parser: argparse.ArgumentParser = subparsers.add_parser('login')
    self._common_actions_configuration(login_parser)
    login_parser.add_argument('-pn', '--profile-name', default=ArkProfileLoader.default_profile_name(), help='Profile name to load')
    login_parser.add_argument('-f', '--force', action='store_true', help='Whether to force login even thou token has not expired yet')
    login_parser.add_argument(
        '-nss',
        '--no-shared-secrets',
        action='store_true',
        help='Do not share secrets between different authenticators with the same username',
    )
    login_parser.add_argument('-st', '--show-tokens', action='store_true', help='Print out tokens as well if not silent')
    login_parser.add_argument('-ra', '--refresh-auth', action='store_true', help='If a cache exists, will also try to refresh it')

    # Add username and secret for each authenticator for logging in
    for authenticator in SUPPORTED_AUTHENTICATORS_LIST:
        login_parser.add_argument(
            f'-{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}u',
            f'--{authenticator.authenticator_name().replace("_", "-")}-username',
            help=f'Username to authenticate with to {authenticator.authenticator_human_readable_name()}',
        )
        login_parser.add_argument(
            f'-{"".join([s[:2] for s in authenticator.authenticator_name().split("_")])}s',
            f'--{authenticator.authenticator_name().replace("_", "-")}-secret',
            help=f'Secret to authenticate with to {authenticator.authenticator_human_readable_name()}',
        )

run_action(args)

Runs the login action for each authenticator. After a login completes, credentials are stored in the keyring for future use.

Parameters:

Name Type Description Default
args Namespace

description

required

Raises:

Type Description
ArkException

description

ArkException

description

Source code in ark_sdk_python/actions/ark_login_action.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@overrides
def run_action(self, args: argparse.Namespace) -> None:
    """
    Runs the login action for each authenticator.
    After a login completes, credentials are stored in the keyring for future use.

    Args:
        args (argparse.Namespace): _description_

    Raises:
        ArkException: _description_
        ArkException: _description_
    """
    self._common_actions_execution(args)
    # Load up the profile
    profile = ArkProfileLoader.load_profile(ArkProfileLoader.deduce_profile_name(args.profile_name))
    if not profile:
        raise ArkException('Please configure a profile before trying to login')

    # Login for each auth profile
    # Share secrets between authenticators for allowed auth methods
    shared_secrets_map: Dict[ArkAuthMethod, List[Tuple[str, ArkSecret]]] = {k: [] for k in ArkAuthMethodSharableCredentials}
    # Save tokens for finalization output
    tokens_map: Dict[str, ArkToken] = dict()
    for authenticator_name, auth_profile in profile.auth_profiles.items():
        authenticator = SUPPORTED_AUTHENTICATORS[authenticator_name]()

        # Only perform the authentication if not already authenticated and not forced
        if authenticator.is_authenticated(profile) and not args.force:
            try:
                if args.refresh_auth:
                    token = authenticator.load_authentication(profile, True)
                    if token:
                        ArkArgsFormatter.print_success(
                            f'{authenticator.authenticator_human_readable_name()} Authentication Refreshed',
                        )
                    else:
                        raise ArkException(f'{authenticator.authenticator_name()} authentication failed to be refreshed')
                else:
                    ArkArgsFormatter.print_success(
                        f'{authenticator.authenticator_human_readable_name()} Already Authenticated',
                    )
                continue
            except Exception as ex:
                self._logger.info(
                    f'{authenticator.authenticator_human_readable_name()} Failed to refresh token, performing normal login [{str(ex)}]'
                )
        secret = (
            ArkSecret(secret=args.__dict__[f'{authenticator_name}_secret']) if args.__dict__[f'{authenticator_name}_secret'] else None
        )
        auth_profile.username = args.__dict__[f'{authenticator_name}_username'] or auth_profile.username

        # Ask the user for the user and secret if interactive
        if ArkSystemConfig.is_interactive() and auth_profile.auth_method in ArkAuthMethodsRequireCredentials:
            auth_profile.username = ArkArgsFormatter.get_arg(
                args,
                f'{authenticator_name}_username',
                f'{authenticator.authenticator_human_readable_name()} Username',
                auth_profile.username,
            )
            # Check if there is an existing secret who is sharable
            if (
                auth_profile.auth_method in ArkAuthMethodSharableCredentials
                and auth_profile.auth_method in shared_secrets_map
                and any(auth_profile.username == s[0] for s in shared_secrets_map[auth_profile.auth_method])
                and not args.__dict__[f'{authenticator_name}_secret']
                and not args.no_shared_secrets
            ):
                secret = next(filter(lambda s: auth_profile.username == s[0], shared_secrets_map[auth_profile.auth_method]))[1]
            else:
                if not args.force and (
                    (
                        auth_profile.auth_method == ArkAuthMethod.Identity
                        and ArkIdentity.has_cache_record(profile, auth_profile.username, args.refresh_auth)
                    )
                ):
                    # Check if there is a secret already cached, if there is, no need to ask for password
                    secret = ArkSecret(secret='')
                else:
                    # Check if we really need to ask for a password in specific use cases
                    if (
                        authenticator_name == 'isp'
                        and auth_profile.auth_method == ArkAuthMethod.Identity
                        and ArkIdentity.is_idp_user(
                            auth_profile.username,
                            (
                                auth_profile.auth_method_settings.identity_url
                                if isinstance(auth_profile.auth_method_settings, IdentityArkAuthMethodSettings)
                                else None
                            ),
                            (
                                auth_profile.auth_method_settings.identity_tenant_subdomain
                                if isinstance(auth_profile.auth_method_settings, IdentityArkAuthMethodSettings)
                                else None
                            ),
                        )
                    ):
                        secret = ArkSecret(secret='')
                    else:
                        secret = ArkSecret(
                            secret=ArkArgsFormatter.get_arg(
                                args,
                                f'{authenticator_name}_secret',
                                f'{authenticator.authenticator_human_readable_name()} Secret',
                                hidden=True,
                            )
                        )
        elif (
            not ArkSystemConfig.is_interactive()
            and auth_profile.auth_method in ArkAuthMethodsRequireCredentials
            and not args.__dict__[f'{authenticator_name}_secret']
        ):
            raise ArkException(
                f'{authenticator_name}-secret argument is required if authenticating to {authenticator.authenticator_human_readable_name()}'
            )
        # Perform the authentication, will also cache the token
        token = authenticator.authenticate(profile=profile, secret=secret, force=args.force, refresh_auth=args.refresh_auth)
        if not args.no_shared_secrets and auth_profile.auth_method in ArkAuthMethodSharableCredentials:
            shared_secrets_map[auth_profile.auth_method].append((auth_profile.username, secret))
        tokens_map[authenticator.authenticator_human_readable_name()] = token
    if not args.show_tokens and tokens_map:
        ArkArgsFormatter.print_success('Login tokens are hidden')
    for k, v in tokens_map.items():
        if 'cookies' in v.metadata:
            del v.metadata['cookies']
        ArkArgsFormatter.print_success(
            f'{k} Token\n{v.json(indent=4, exclude={} if args.show_tokens else {"token", "refresh_token"})}',
        )

ArkProfilesAction

Bases: ArkAction

Source code in ark_sdk_python/actions/ark_profiles_action.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class ArkProfilesAction(ArkAction):
    @overrides
    def define_action(self, subparsers: argparse._SubParsersAction) -> None:
        """
        Defines the CLI `profile` action, and adds actions for managing multiple profiles.

        Args:
            subparsers (argparse._SubParsersAction): _description_
        """
        profile_parser: argparse.ArgumentParser = subparsers.add_parser('profiles')
        self._common_actions_configuration(profile_parser)
        profile_cmd_subparsers = profile_parser.add_subparsers(dest="profile_cmd")
        profile_cmd_subparsers.required = True
        list_profiles_parser = profile_cmd_subparsers.add_parser('list', help='List all profiles')
        list_profiles_parser.add_argument('-n', '--name', help='Profile name to filter with by wildcard')
        list_profiles_parser.add_argument('-ap', '--auth-profile', help='Filter profiles by auth types')
        list_profiles_parser.add_argument(
            '-a', '--all', action='store_true', help='Whether to show all profiles data as well and not only their names'
        )
        show_profile_parser = profile_cmd_subparsers.add_parser('show', help='Show a profile')
        show_profile_parser.add_argument(
            '-pn', '--profile-name', required=False, help='Profile name to show, if not given, shows the current one'
        )
        delete_profile_parser = profile_cmd_subparsers.add_parser('delete', help='Delete a specific profile')
        delete_profile_parser.add_argument('-pn', '--profile-name', required=True, help='Profile name to delete')
        delete_profile_parser.add_argument('-y', '--yes', action='store_true', help='Whether to approve deletion non interactively')
        clear_profiles_parser = profile_cmd_subparsers.add_parser('clear', help='Clear all profiles')
        clear_profiles_parser.add_argument('-y', '--yes', action='store_true', help='Whether to approve clear non interactively')
        clone_profile_parser = profile_cmd_subparsers.add_parser('clone', help='Clones a profile')
        clone_profile_parser.add_argument('-pn', '--profile-name', required=True, help='Profile name to clone')
        clone_profile_parser.add_argument(
            '-npn', '--new-profile-name', help='New cloned profile name, if not given, will add _clone as part of the name'
        )
        clone_profile_parser.add_argument('-y', '--yes', action='store_true', help='Whether to override existing profile if exists')
        add_profile_parser = profile_cmd_subparsers.add_parser('add', help='Adds a profile to the profiles folder from a given path')
        add_profile_parser.add_argument('-pp', '--profile-path', required=True, help='Profile file path to be added')
        edit_profile_parser = profile_cmd_subparsers.add_parser('edit', help='Edits a profile interactively')
        edit_profile_parser.add_argument(
            '-pn', '--profile-name', required=False, help='Profile name to edit, if not given, edits the current one'
        )

    def __run_list_action(self, args: argparse.Namespace) -> None:
        # Start by loading all the profiles
        profiles: Optional[List[ArkProfile]] = ArkProfileLoader.load_all_profiles()
        if not profiles:
            ArkArgsFormatter.print_warning(
                'No profiles were found',
            )
            return
        # Filter profiles
        if args.name:
            profiles = [p for p in profiles if fnmatch(p.profile_name, args.name)]
        if args.auth_profile:
            profiles = [p for p in profiles if args.auth_profile in list(p.auth_profiles.keys())]
        # Print them based on request
        if args.all:
            ArkArgsFormatter.print_success(json.dumps([p.dict() for p in profiles], indent=4))
        else:
            ArkArgsFormatter.print_success(json.dumps([p.profile_name for p in profiles], indent=4))

    def __run_show_action(self, args: argparse.Namespace) -> None:
        profile_name = args.profile_name or ArkProfileLoader.deduce_profile_name()
        profile: Optional[ArkProfile] = ArkProfileLoader.load_profile(profile_name)
        if not profile:
            ArkArgsFormatter.print_warning(
                f'No profile was found for the name {profile_name}',
            )
            return
        ArkArgsFormatter.print_success(profile.json(indent=4))

    def __run_delete_action(self, args: argparse.Namespace) -> None:
        profile: Optional[ArkProfile] = ArkProfileLoader.load_profile(args.profile_name)
        if not profile:
            ArkArgsFormatter.print_warning(
                f'No profile was found for the name {args.profile_name}',
            )
            return
        if not args.yes:
            answer = inquirer.prompt(
                [inquirer.Confirm('answer', message=f'Are you sure you want to delete profile {args.profile_name}')],
                render=ArkInquirerRender(),
            )
            if not answer or not answer['answer']:
                return
        ArkProfileLoader.delete_profile(args.profile_name)

    def __run_clear_action(self, args: argparse.Namespace) -> None:
        if not args.yes:
            answer = inquirer.prompt(
                [inquirer.Confirm('answer', message='Are you sure you want to clear all profiles')], render=ArkInquirerRender()
            )
            if not answer or not answer['answer']:
                return
        ArkProfileLoader.clear_all_profiles()

    def __run_clone_action(self, args: argparse.Namespace) -> None:
        profile: Optional[ArkProfile] = ArkProfileLoader.load_profile(args.profile_name)
        if not profile:
            ArkArgsFormatter.print_warning(
                f'No profile was found for the name {args.profile_name}',
            )
            return
        cloned_profile: ArkProfile = profile.copy(update={'profile_name': args.new_profile_name or f'{profile.profile_name}_clone'})
        if ArkProfileLoader.profile_exists(cloned_profile.profile_name):
            if not args.yes:
                answer = inquirer.prompt(
                    [
                        inquirer.Confirm(
                            'answer', message=f'Are you sure you want to override existing profile {cloned_profile.profile_name}'
                        )
                    ],
                    render=ArkInquirerRender(),
                )
                if not answer or not answer['answer']:
                    return
        ArkProfileLoader.save_profile(cloned_profile)

    def __run_add_action(self, args: argparse.Namespace) -> None:
        if not os.path.exists(args.profile_path):
            ArkArgsFormatter.print_warning(
                f'Profile path [{args.profile_path}] does not exist, ignoring',
            )
            return
        try:
            profile: ArkProfile = ArkProfile.parse_file(args.profile_path)
            ArkProfileLoader.save_profile(profile)
        except Exception as ex:
            self._logger.exception(f'Failed to parser profile [{str(ex)}]')
            ArkArgsFormatter.print_failure(
                f'Profile path [{args.profile_path}] failed to be parsed, aborting',
            )
            return

    def __run_edit_action(self, args: argparse.Namespace) -> None:
        profile_name = args.profile_name or ArkProfileLoader.deduce_profile_name()
        profile: Optional[ArkProfile] = ArkProfileLoader.load_profile(profile_name)
        if not profile:
            ArkArgsFormatter.print_warning(
                f'No profile was found for the name {profile_name}',
            )
            return
        answer = inquirer.prompt(
            [inquirer.Editor('profile_edit', message=f'Chosen profile [{profile_name}] is about to be edited')],
            render=ArkInquirerRender(),
            answers={'profile_edit': profile.json(indent=4)},
        )
        edited_profile = ArkProfile.parse_raw(answer['profile_edit'])
        ArkProfileLoader.save_profile(edited_profile)

    @overrides
    def run_action(self, args: argparse.Namespace) -> None:
        """
        Runs the profile action.

        Args:
            args (argparse.Namespace): _description_

        Raises:
            ArkException: _description_
            ArkException: _description_
        """
        if args.profile_cmd == 'list':
            self.__run_list_action(args)
        elif args.profile_cmd == 'show':
            self.__run_show_action(args)
        elif args.profile_cmd == 'delete':
            self.__run_delete_action(args)
        elif args.profile_cmd == 'clear':
            self.__run_clear_action(args)
        elif args.profile_cmd == 'clone':
            self.__run_clone_action(args)
        elif args.profile_cmd == 'add':
            self.__run_add_action(args)
        elif args.profile_cmd == 'edit':
            self.__run_edit_action(args)
        else:
            raise ArkException(f'Invalid command {args.profile_cmd} given')

    @overrides
    def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
        """
        Asserts the action is `profile`.

        Args:
            action_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """
        return action_name == 'profiles'

can_run_action(action_name, args)

Asserts the action is profile.

Parameters:

Name Type Description Default
action_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_profiles_action.py
193
194
195
196
197
198
199
200
201
202
203
204
205
@overrides
def can_run_action(self, action_name: str, args: argparse.Namespace) -> bool:
    """
    Asserts the action is `profile`.

    Args:
        action_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """
    return action_name == 'profiles'

define_action(subparsers)

Defines the CLI profile action, and adds actions for managing multiple profiles.

Parameters:

Name Type Description Default
subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_profiles_action.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@overrides
def define_action(self, subparsers: argparse._SubParsersAction) -> None:
    """
    Defines the CLI `profile` action, and adds actions for managing multiple profiles.

    Args:
        subparsers (argparse._SubParsersAction): _description_
    """
    profile_parser: argparse.ArgumentParser = subparsers.add_parser('profiles')
    self._common_actions_configuration(profile_parser)
    profile_cmd_subparsers = profile_parser.add_subparsers(dest="profile_cmd")
    profile_cmd_subparsers.required = True
    list_profiles_parser = profile_cmd_subparsers.add_parser('list', help='List all profiles')
    list_profiles_parser.add_argument('-n', '--name', help='Profile name to filter with by wildcard')
    list_profiles_parser.add_argument('-ap', '--auth-profile', help='Filter profiles by auth types')
    list_profiles_parser.add_argument(
        '-a', '--all', action='store_true', help='Whether to show all profiles data as well and not only their names'
    )
    show_profile_parser = profile_cmd_subparsers.add_parser('show', help='Show a profile')
    show_profile_parser.add_argument(
        '-pn', '--profile-name', required=False, help='Profile name to show, if not given, shows the current one'
    )
    delete_profile_parser = profile_cmd_subparsers.add_parser('delete', help='Delete a specific profile')
    delete_profile_parser.add_argument('-pn', '--profile-name', required=True, help='Profile name to delete')
    delete_profile_parser.add_argument('-y', '--yes', action='store_true', help='Whether to approve deletion non interactively')
    clear_profiles_parser = profile_cmd_subparsers.add_parser('clear', help='Clear all profiles')
    clear_profiles_parser.add_argument('-y', '--yes', action='store_true', help='Whether to approve clear non interactively')
    clone_profile_parser = profile_cmd_subparsers.add_parser('clone', help='Clones a profile')
    clone_profile_parser.add_argument('-pn', '--profile-name', required=True, help='Profile name to clone')
    clone_profile_parser.add_argument(
        '-npn', '--new-profile-name', help='New cloned profile name, if not given, will add _clone as part of the name'
    )
    clone_profile_parser.add_argument('-y', '--yes', action='store_true', help='Whether to override existing profile if exists')
    add_profile_parser = profile_cmd_subparsers.add_parser('add', help='Adds a profile to the profiles folder from a given path')
    add_profile_parser.add_argument('-pp', '--profile-path', required=True, help='Profile file path to be added')
    edit_profile_parser = profile_cmd_subparsers.add_parser('edit', help='Edits a profile interactively')
    edit_profile_parser.add_argument(
        '-pn', '--profile-name', required=False, help='Profile name to edit, if not given, edits the current one'
    )

run_action(args)

Runs the profile action.

Parameters:

Name Type Description Default
args Namespace

description

required

Raises:

Type Description
ArkException

description

ArkException

description

Source code in ark_sdk_python/actions/ark_profiles_action.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@overrides
def run_action(self, args: argparse.Namespace) -> None:
    """
    Runs the profile action.

    Args:
        args (argparse.Namespace): _description_

    Raises:
        ArkException: _description_
        ArkException: _description_
    """
    if args.profile_cmd == 'list':
        self.__run_list_action(args)
    elif args.profile_cmd == 'show':
        self.__run_show_action(args)
    elif args.profile_cmd == 'delete':
        self.__run_delete_action(args)
    elif args.profile_cmd == 'clear':
        self.__run_clear_action(args)
    elif args.profile_cmd == 'clone':
        self.__run_clone_action(args)
    elif args.profile_cmd == 'add':
        self.__run_add_action(args)
    elif args.profile_cmd == 'edit':
        self.__run_edit_action(args)
    else:
        raise ArkException(f'Invalid command {args.profile_cmd} given')

ArkServiceExecAction

Bases: ArkExecAction

Source code in ark_sdk_python/actions/ark_service_exec_action.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class ArkServiceExecAction(ArkExecAction):
    def __define_service_exec_action(
        self,
        action_def: ArkServiceActionDefinition,
        subparsers: argparse._SubParsersAction,
        parent_actions_def: Optional[List[ArkServiceActionDefinition]] = None,
    ) -> argparse._SubParsersAction:
        action_parser = subparsers.add_parser(action_def.action_name)
        action_dest = action_def.action_name
        if parent_actions_def:
            action_dest = '_'.join([p.action_name for p in parent_actions_def]) + f'_{action_def.action_name}'
        action_subparsers = action_parser.add_subparsers(dest=f"{action_dest}_action")
        action_subparsers.required = True
        if action_def.schemas:
            self._define_actions_by_schemas(action_subparsers, action_def.schemas, action_def.defaults)
        return action_subparsers

    def __define_service_exec_actions(
        self,
        action_def: ArkServiceActionDefinition,
        subparsers: argparse._SubParsersAction,
        parent_actions_def: Optional[List[ArkServiceActionDefinition]] = None,
    ) -> None:
        action_subparsers = self.__define_service_exec_action(action_def, subparsers, parent_actions_def)
        if action_def.subactions:
            for subaction in action_def.subactions:
                self.__define_service_exec_actions(
                    subaction, action_subparsers, parent_actions_def + [action_def] if parent_actions_def else [action_def]
                )

    def __deduce_action_def(
        self,
        args: argparse.Namespace,
        action_def: ArkServiceActionDefinition,
        parent_actions_def: Optional[List[ArkServiceActionDefinition]] = None,
    ) -> Tuple[ArkServiceActionDefinition, str]:
        action_dest = f'{action_def.action_name}_action'
        if parent_actions_def:
            action_dest = '_'.join([p.action_name for p in parent_actions_def]) + f'_{action_def.action_name}_action'
        if action_dest in args.__dict__:
            action_value = args.__dict__[action_dest]
            if action_def.subactions:
                for subaction in action_def.subactions:
                    if subaction.action_name == action_value:
                        return self.__deduce_action_def(
                            args, subaction, parent_actions_def + [action_def] if parent_actions_def else [action_def]
                        )
            return action_def, action_dest

    def __deduce_action_command_def(self, command_name: str, args: argparse.Namespace) -> Tuple[ArkServiceActionDefinition, str]:
        for action_def in SUPPORTED_SERVICE_ACTIONS:
            if action_def.action_name == command_name:
                # Find the fitting action
                return self.__deduce_action_def(args, action_def)

    @overrides
    def define_exec_action(self, exec_subparsers: argparse._SubParsersAction) -> None:
        """
        Defines all the supported service actions as CLI actions, with its associated arguments and schemas.

        Args:
            exec_subparsers (argparse._SubParsersAction): _description_
        """
        for actions in SUPPORTED_SERVICE_ACTIONS:
            self.__define_service_exec_actions(actions, exec_subparsers)

    @overrides
    def run_exec_action(self, api: ArkCLIAPI, args: argparse.Namespace) -> None:
        """
        Deduces from the arguments the appropriate service definition and action.
        Finds the appropriate service using the definition and executes the sync or async service action.

        Args:
            api (ArkCLIAPI): _description_
            args (argparse.Namespace): _description_
        """
        action_def, action_dest = self.__deduce_action_command_def(args.command, args)
        action_value = args.__dict__[action_dest]
        api_name = action_dest.replace('_action', '')
        while '_' in api_name and not hasattr(api, api_name):
            api_name = api_name.rsplit('_', 1)[0]
        service = getattr(api, api_name)
        if action_def.async_actions and action_value in action_def.async_actions:
            self._run_async_action(service, action_def.schemas, action_value, args)
        else:
            self._run_sync_action(service, action_def.schemas, action_value, args)

    @overrides
    def can_run_exec_action(self, command_name: str, args: argparse.Namespace) -> bool:
        """
        Checks whether there is a service definition for the command and its actions.

        Args:
            command_name (str): _description_
            args (argparse.Namespace): _description_

        Returns:
            bool: _description_
        """
        return self.__deduce_action_command_def(command_name, args) is not None

can_run_exec_action(command_name, args)

Checks whether there is a service definition for the command and its actions.

Parameters:

Name Type Description Default
command_name str

description

required
args Namespace

description

required

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/actions/ark_service_exec_action.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
@overrides
def can_run_exec_action(self, command_name: str, args: argparse.Namespace) -> bool:
    """
    Checks whether there is a service definition for the command and its actions.

    Args:
        command_name (str): _description_
        args (argparse.Namespace): _description_

    Returns:
        bool: _description_
    """
    return self.__deduce_action_command_def(command_name, args) is not None

define_exec_action(exec_subparsers)

Defines all the supported service actions as CLI actions, with its associated arguments and schemas.

Parameters:

Name Type Description Default
exec_subparsers _SubParsersAction

description

required
Source code in ark_sdk_python/actions/ark_service_exec_action.py
67
68
69
70
71
72
73
74
75
76
@overrides
def define_exec_action(self, exec_subparsers: argparse._SubParsersAction) -> None:
    """
    Defines all the supported service actions as CLI actions, with its associated arguments and schemas.

    Args:
        exec_subparsers (argparse._SubParsersAction): _description_
    """
    for actions in SUPPORTED_SERVICE_ACTIONS:
        self.__define_service_exec_actions(actions, exec_subparsers)

run_exec_action(api, args)

Deduces from the arguments the appropriate service definition and action. Finds the appropriate service using the definition and executes the sync or async service action.

Parameters:

Name Type Description Default
api ArkCLIAPI

description

required
args Namespace

description

required
Source code in ark_sdk_python/actions/ark_service_exec_action.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@overrides
def run_exec_action(self, api: ArkCLIAPI, args: argparse.Namespace) -> None:
    """
    Deduces from the arguments the appropriate service definition and action.
    Finds the appropriate service using the definition and executes the sync or async service action.

    Args:
        api (ArkCLIAPI): _description_
        args (argparse.Namespace): _description_
    """
    action_def, action_dest = self.__deduce_action_command_def(args.command, args)
    action_value = args.__dict__[action_dest]
    api_name = action_dest.replace('_action', '')
    while '_' in api_name and not hasattr(api, api_name):
        api_name = api_name.rsplit('_', 1)[0]
    service = getattr(api, api_name)
    if action_def.async_actions and action_value in action_def.async_actions:
        self._run_async_action(service, action_def.schemas, action_value, args)
    else:
        self._run_sync_action(service, action_def.schemas, action_value, args)