Skip to content

ark_pcloud_accounts_service

ArkPCloudAccountsService

Bases: ArkPCloudBaseService

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
class ArkPCloudAccountsService(ArkPCloudBaseService):
    def __list_accounts_with_filters(
        self,
        search: Optional[str] = None,
        search_type: Optional[str] = None,
        sort: Optional[str] = None,
        offset: Optional[int] = None,
        limit: Optional[int] = None,
        safe_name: Optional[str] = None,
    ) -> Iterator[ArkPCloudAccountsPage]:
        query = {}
        if search:
            query['search'] = search
        if search_type:
            query['searchType'] = search_type
        if sort:
            query['sort'] = sort
        if offset:
            query['offset'] = offset
        if limit:
            query['limit'] = limit
        if safe_name:
            query['filter'] = f'safeName eq {safe_name}'
        while True:
            resp: Response = self._client.get(ACCOUNTS_URL, params=query)
            if resp.status_code == HTTPStatus.OK:
                try:
                    result = resp.json()
                    accounts = parse_obj_as(List[ArkPCloudAccount], result['value'])
                    yield ArkPCloudAccountsPage(items=accounts)
                    if 'nextLink' in result:
                        query = parse_qs(urlparse(result['nextLink']).query)
                    else:
                        break
                except (ValidationError, JSONDecodeError, KeyError) as ex:
                    self._logger.exception(f'Failed to parse list accounts response [{str(ex)}] - [{resp.text}]')
                    raise ArkServiceException(f'Failed to parse list accounts response [{str(ex)}]') from ex
            else:
                raise ArkServiceException(f'Failed to list accounts [{resp.text}] - [{resp.status_code}]')

    def list_accounts(self) -> Iterator[ArkPCloudAccountsPage]:
        """
        Yields all visible accounts to the logged in user as pages of accounts
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/GetAccounts.htm

        Yields:
            Iterator[ArkPCloudAccountsPage]: _description_
        """
        self._logger.info('Listing all accounts')
        yield from self.__list_accounts_with_filters()

    def list_accounts_by(self, accounts_filter: ArkPCloudAccountsFilter) -> Iterator[ArkPCloudAccountsPage]:
        """
        Yields visible accounts to the logged in user by filters as pages of accounts
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/GetAccounts.htm

        Args:
            accounts_filter (ArkPCloudAccountsFilter): _description_

        Yields:
            Iterator[ArkPCloudAccountsPage]: _description_
        """
        self._logger.info(f'Listing accounts by filters [{accounts_filter}]')
        yield from self.__list_accounts_with_filters(
            accounts_filter.search,
            accounts_filter.search_type,
            accounts_filter.sort,
            accounts_filter.offset,
            accounts_filter.limit,
            accounts_filter.safe_name,
        )

    def list_account_secret_versions(
        self, list_account_secret_versions: ArkPCloudListAccountSecretVersions
    ) -> List[ArkPCloudAccountSecretVersion]:
        """
        Lists the given account secret versions
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/Secrets-Get-versions.htm

        Args:
            list_account_secret_versions (ArkPCloudListAccountSecretVersions): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            List[ArkPCloudAccountSecretVersion]: _description_
        """
        self._logger.info(f'Listing account [{list_account_secret_versions.account_id}] secret versions')
        resp: Response = self._client.get(ACCOUNT_SECRET_VERSIONS.format(account_id=list_account_secret_versions.account_id))
        if resp.status_code == HTTPStatus.OK:
            try:
                return parse_obj_as(List[ArkPCloudAccountSecretVersion], resp.json()['versions'])
            except (ValidationError, JSONDecodeError, KeyError) as ex:
                self._logger.exception(f'Failed to parse list account secret versions response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse list account secret versions response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to list account secret versions [{resp.text}] - [{resp.status_code}]')

    def generate_account_credentials(
        self, generate_account_credentials: ArkPCloudGenerateAccountCredentials
    ) -> ArkPCloudAccountCredentials:
        """
        Generate a new random password for an existing account with policy restrictions
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/Secrets-Generate-Password.htm

        Args:
            generate_account_credentials (ArkPCloudGenerateAccountCredentials): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkPCloudAccountCredentials: _description_
        """
        self._logger.info(f'Generating new password for account [{generate_account_credentials.account_id}]')
        resp: Response = self._client.post(GENERATE_ACCOUNT_CREDENTIALS.format(account_id=generate_account_credentials.account_id))
        if resp.status_code == HTTPStatus.OK:
            try:
                return ArkPCloudAccountCredentials(account_id=generate_account_credentials.account_id, password=resp.json()['password'])
            except (ValidationError, JSONDecodeError, KeyError) as ex:
                self._logger.exception(f'Failed to parse genereate account credentials response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse genereate account credentials response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to generate password for account [{resp.text}] - [{resp.status_code}]')

    def verify_account_credentials(self, verify_account_credentials: ArkPCloudVerifyAccountCredentials) -> None:
        """
        Marks the account for password verification by CPM
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Verify-credentials-v9-10.htm

        Args:
            verify_account_credentials (ArkPCloudVerifyAccountCredentials): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(f'Marking account [{verify_account_credentials.account_id}] for verification')
        resp: Response = self._client.post(VERIFY_ACCOUNT_CREDENTIALS.format(account_id=verify_account_credentials.account_id))
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to mark account for password verification [{resp.text}] - [{resp.status_code}]')

    def change_account_credentials(self, change_account_credentials: ArkPCloudChangeAccountCredentials) -> None:
        """
        Marks the account for password changing immediately by CPM
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Change-credentials-immediately.htm

        Args:
            change_account_credentials (ArkPCloudChangeAccountCredentials): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(f'Marking account [{change_account_credentials.account_id}] for changing credentials immediately')
        resp: Response = self._client.post(CHANGE_ACCOUNT_CREDENTIALS.format(account_id=change_account_credentials.account_id))
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to mark account for changing credentials immediately [{resp.text}] - [{resp.status_code}]')

    def set_account_next_credentials(self, set_account_next_credentials: ArkPCloudSetAccountNextCredentials) -> None:
        """
        Marks the account to have its password changed to the given one via CPM
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/SetNextPassword.htm

        Args:
            set_account_next_credentials (ArkPCloudSetAccountNextCredentials): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(f'Marking account [{set_account_next_credentials.account_id}] for changing credentials for the given password')
        resp: Response = self._client.post(
            SET_ACCOUNT_NEXT_CREDENTIALS.format(account_id=set_account_next_credentials.account_id),
            json=set_account_next_credentials.dict(exclude={'account_id'}, by_alias=True),
        )
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to mark account for changing credentials next password [{resp.text}] - [{resp.status_code}]')

    def update_account_credentials_in_vault(self, update_account_credentials_in_vault: ArkPCloudUpdateAccountCredentialsInVault) -> None:
        """
        Updates the account credentials only in the vault without changing it on the machine itself
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/ChangeCredentialsInVault.htm

        Args:
            update_account_credentials_in_vault (ArkPCloudUpdateAccountCredentialsInVault): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(f'Updates account [{update_account_credentials_in_vault.account_id}] vault credentials')
        resp: Response = self._client.post(
            UPDATE_ACCOUNT_CREDENTIALS_IN_VAULT.format(account_id=update_account_credentials_in_vault.account_id),
            json=update_account_credentials_in_vault.dict(exclude={'account_id'}, by_alias=True),
        )
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to update credentials in vault for account [{resp.text}] - [{resp.status_code}]')

    def reconcile_account_credentials(self, reconcile_account_credentials: ArkPCloudReconcileAccountCredentials) -> None:
        """
        Marks the account for reconcilation
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Reconcile-account.htm

        Args:
            reconcile_account_credentials (ArkPCloudReconcileAccountCredentials): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(f'Marking account [{reconcile_account_credentials.account_id}] for reconcilation')
        resp: Response = self._client.post(RECONCILE_ACCOUNT_CREDENTIALS.format(account_id=reconcile_account_credentials.account_id))
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to reconcile account credentials [{resp.text}] - [{resp.status_code}]')

    def account(self, get_account: ArkPCloudGetAccount) -> ArkPCloudAccount:
        """
        Retrieves the account by id
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Get%20Account%20Details.htm?

        Args:
            get_account (ArkPCloudGetAccount): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkPCloudAccount: _description_
        """
        self._logger.info(f'Retrieving account by id [{get_account.account_id}]')
        resp: Response = self._client.get(ACCOUNT_URL.format(account_id=get_account.account_id))
        if resp.status_code == HTTPStatus.OK:
            try:
                return ArkPCloudAccount.parse_obj(resp.json())
            except (ValidationError, JSONDecodeError, KeyError) as ex:
                self._logger.exception(f'Failed to parse account response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse account response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to retrieve account [{resp.text}] - [{resp.status_code}]')

    def account_credentials(self, get_account_credentials: ArkPCloudGetAccountCredentials) -> ArkPCloudAccountCredentials:
        """
        Retrieves the account credentials
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/GetPasswordValueV10.htm?

        Args:
            get_account_credentials (ArkPCloudGetAccountCredentials): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkPCloudAccountCredentials: _description_
        """
        self._logger.info(f'Retrieving account password for details [{get_account_credentials}]')
        body = {
            k.replace('_', '').title(): v
            for k, v in get_account_credentials.dict(exclude={'account_id', 'reason'}, exclude_none=True).items()
        }
        if get_account_credentials.reason:
            body['reason'] = get_account_credentials.reason
        resp: Response = self._client.post(RETRIEVE_ACCOUNT_CREDENTIALS.format(account_id=get_account_credentials.account_id), json=body)
        if resp.status_code == HTTPStatus.OK:
            return ArkPCloudAccountCredentials(
                account_id=get_account_credentials.account_id, password=resp.text[1:-1]  # Remove leading and trailing quotes
            )
        raise ArkServiceException(f'Failed to retrieve account credentials [{resp.text}] - [{resp.status_code}]')

    def add_account(self, add_account: ArkPCloudAddAccount) -> ArkPCloudAccount:
        """
        Adds a new account with given details
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Add%20Account%20v10.htm?

        Args:
            add_account (ArkPCloudAddAccount): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkPCloudAccount: _description_
        """
        self._logger.info('Adding new account')
        if add_account.remote_machines_access:
            if (
                add_account.remote_machines_access.access_restricted_to_remote_machines
                and not add_account.remote_machines_access.remote_machines
            ):
                add_account.remote_machines_access = None
            elif not add_account.remote_machines_access.access_restricted_to_remote_machines:
                add_account.remote_machines_access.remote_machines = None

            elif add_account.remote_machines_access.remote_machines:
                add_account.remote_machines_access.remote_machines = ','.join(add_account.remote_machines_access.remote_machines)

        resp: Response = self._client.post(ACCOUNTS_URL, data=add_account.json(by_alias=True, exclude_none=True, exclude_defaults=True))
        if resp.status_code == HTTPStatus.CREATED:
            try:
                return ArkPCloudAccount.parse_obj(resp.json())
            except (ValidationError, JSONDecodeError) as ex:
                self._logger.exception(f'Failed to parse add account response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse add account response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to add account [{resp.text}] - [{resp.status_code}]')

    def update_account(self, update_account: ArkPCloudUpdateAccount) -> ArkPCloudAccount:
        """
        Updates an existing account with new details
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/UpdateAccount%20v10.htm

        Args:
            update_account (ArkPCloudUpdateAccount): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkPCloudAccount: _description_
        """
        self._logger.info(f'Updating account [{update_account.account_id}]')
        if update_account.remote_machines_access and not update_account.remote_machines_access.remote_machines:
            update_account.remote_machines_access = None
        operations = []
        for key, val in update_account.dict(exclude={'account_id'}, exclude_none=True, by_alias=True, exclude_defaults=True):
            operations.append({'op': 'replace', 'path': f'/{key}', 'value': val})
        resp: Response = self._client.put(ACCOUNT_URL.format(account_id=update_account.account_id), json=operations)
        if resp.status_code == HTTPStatus.OK:
            try:
                return ArkPCloudAccount.parse_obj(resp.json())
            except (ValidationError, JSONDecodeError) as ex:
                self._logger.exception(f'Failed to parse update account response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse update account response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to update account [{resp.text}] - [{resp.status_code}]')

    def delete_account(self, delete_account: ArkPCloudDeleteAccount) -> None:
        """
        Deletes an account by given id
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Delete%20Account.htm

        Args:
            delete_account (ArkPCloudDeleteAccount): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(f'Deleting account [{delete_account.account_id}]')
        resp: Response = self._client.delete(ACCOUNT_URL.format(account_id=delete_account.account_id))
        if resp.status_code != HTTPStatus.NO_CONTENT:
            raise ArkServiceException(f'Failed to delete account [{resp.text}] - [{resp.status_code}]')

    def link_account(self, link_account: ArkPCloudLinkAccount) -> None:
        """
        Link an account by given info
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PrivCloud-SS/Latest/en/Content/WebServices/Link-account.htm

        Args:
            link_account (ArkPCloudLinkAccount): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(
            f'Linking account [{link_account.account_id}] '
            f'to name [{link_account.name}] in safe [{link_account.safe}] in folder [{link_account.folder}] '
            f'by idx [{link_account.extra_password_index}]'
        )
        resp: Response = self._client.post(
            LINK_ACCOUNT.format(account_id=link_account.account_id), json=link_account.dict(exclude={'account_id'}, by_alias=True)
        )
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to link account [{resp.text}] - [{resp.status_code}]')

    def unlink_account(self, unlink_account: ArkPCloudUnlinkAccount) -> None:
        """
        Link an account by given info
        https://docs.cyberark.com/Product-Doc/OnlineHelp/PrivCloud-SS/Latest/en/Content/WebServices/Link-account-unlink.htm

        Args:
            unlink_account (ArkPCloudUnlinkAccount): _description_

        Raises:
            ArkServiceException: _description_
        """
        self._logger.info(f'Unlinking account [{unlink_account.account_id}] by idx [{unlink_account.extra_password_index}]')
        resp: Response = self._client.delete(
            UNLINK_ACCOUNT.format(account_id=unlink_account.account_id, extra_password_index=unlink_account.extra_password_index)
        )
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to unlink account [{resp.text}] - [{resp.status_code}]')

    def accounts_stats(self) -> ArkPCloudAccountsStats:
        """
        Calculates accounts stats for all visible accounts

        Returns:
            ArkPCloudAccountsStats: _description_
        """
        self._logger.info('Calculating accounts statistics')
        accounts = list(itertools.chain.from_iterable([p.items for p in list(self.list_accounts())]))
        accounts_stats = ArkPCloudAccountsStats.construct()
        accounts_stats.accounts_count = len(accounts)

        # Get accounts per platform id
        platform_ids: Set[str] = {a.platform_id for a in accounts}
        accounts_stats.accounts_count_by_platform_id = {pi: len([a for a in accounts if a.platform_id == pi]) for pi in platform_ids}

        # Get accounts per safe name
        safe_names: Set[str] = {a.safe_name for a in accounts}
        accounts_stats.accounts_count_by_safe_name = {sn: len([a for a in accounts if a.safe_name == sn]) for sn in safe_names}

        return accounts_stats

    @staticmethod
    @overrides
    def service_config() -> ArkServiceConfig:
        return SERVICE_CONFIG

account(get_account)

Retrieves the account by id https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Get%20Account%20Details.htm?

Parameters:

Name Type Description Default
get_account ArkPCloudGetAccount

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkPCloudAccount ArkPCloudAccount

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def account(self, get_account: ArkPCloudGetAccount) -> ArkPCloudAccount:
    """
    Retrieves the account by id
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Get%20Account%20Details.htm?

    Args:
        get_account (ArkPCloudGetAccount): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkPCloudAccount: _description_
    """
    self._logger.info(f'Retrieving account by id [{get_account.account_id}]')
    resp: Response = self._client.get(ACCOUNT_URL.format(account_id=get_account.account_id))
    if resp.status_code == HTTPStatus.OK:
        try:
            return ArkPCloudAccount.parse_obj(resp.json())
        except (ValidationError, JSONDecodeError, KeyError) as ex:
            self._logger.exception(f'Failed to parse account response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse account response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to retrieve account [{resp.text}] - [{resp.status_code}]')

account_credentials(get_account_credentials)

Retrieves the account credentials https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/GetPasswordValueV10.htm?

Parameters:

Name Type Description Default
get_account_credentials ArkPCloudGetAccountCredentials

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkPCloudAccountCredentials ArkPCloudAccountCredentials

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def account_credentials(self, get_account_credentials: ArkPCloudGetAccountCredentials) -> ArkPCloudAccountCredentials:
    """
    Retrieves the account credentials
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/GetPasswordValueV10.htm?

    Args:
        get_account_credentials (ArkPCloudGetAccountCredentials): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkPCloudAccountCredentials: _description_
    """
    self._logger.info(f'Retrieving account password for details [{get_account_credentials}]')
    body = {
        k.replace('_', '').title(): v
        for k, v in get_account_credentials.dict(exclude={'account_id', 'reason'}, exclude_none=True).items()
    }
    if get_account_credentials.reason:
        body['reason'] = get_account_credentials.reason
    resp: Response = self._client.post(RETRIEVE_ACCOUNT_CREDENTIALS.format(account_id=get_account_credentials.account_id), json=body)
    if resp.status_code == HTTPStatus.OK:
        return ArkPCloudAccountCredentials(
            account_id=get_account_credentials.account_id, password=resp.text[1:-1]  # Remove leading and trailing quotes
        )
    raise ArkServiceException(f'Failed to retrieve account credentials [{resp.text}] - [{resp.status_code}]')

accounts_stats()

Calculates accounts stats for all visible accounts

Returns:

Name Type Description
ArkPCloudAccountsStats ArkPCloudAccountsStats

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def accounts_stats(self) -> ArkPCloudAccountsStats:
    """
    Calculates accounts stats for all visible accounts

    Returns:
        ArkPCloudAccountsStats: _description_
    """
    self._logger.info('Calculating accounts statistics')
    accounts = list(itertools.chain.from_iterable([p.items for p in list(self.list_accounts())]))
    accounts_stats = ArkPCloudAccountsStats.construct()
    accounts_stats.accounts_count = len(accounts)

    # Get accounts per platform id
    platform_ids: Set[str] = {a.platform_id for a in accounts}
    accounts_stats.accounts_count_by_platform_id = {pi: len([a for a in accounts if a.platform_id == pi]) for pi in platform_ids}

    # Get accounts per safe name
    safe_names: Set[str] = {a.safe_name for a in accounts}
    accounts_stats.accounts_count_by_safe_name = {sn: len([a for a in accounts if a.safe_name == sn]) for sn in safe_names}

    return accounts_stats

add_account(add_account)

Adds a new account with given details https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Add%20Account%20v10.htm?

Parameters:

Name Type Description Default
add_account ArkPCloudAddAccount

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkPCloudAccount ArkPCloudAccount

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def add_account(self, add_account: ArkPCloudAddAccount) -> ArkPCloudAccount:
    """
    Adds a new account with given details
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Add%20Account%20v10.htm?

    Args:
        add_account (ArkPCloudAddAccount): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkPCloudAccount: _description_
    """
    self._logger.info('Adding new account')
    if add_account.remote_machines_access:
        if (
            add_account.remote_machines_access.access_restricted_to_remote_machines
            and not add_account.remote_machines_access.remote_machines
        ):
            add_account.remote_machines_access = None
        elif not add_account.remote_machines_access.access_restricted_to_remote_machines:
            add_account.remote_machines_access.remote_machines = None

        elif add_account.remote_machines_access.remote_machines:
            add_account.remote_machines_access.remote_machines = ','.join(add_account.remote_machines_access.remote_machines)

    resp: Response = self._client.post(ACCOUNTS_URL, data=add_account.json(by_alias=True, exclude_none=True, exclude_defaults=True))
    if resp.status_code == HTTPStatus.CREATED:
        try:
            return ArkPCloudAccount.parse_obj(resp.json())
        except (ValidationError, JSONDecodeError) as ex:
            self._logger.exception(f'Failed to parse add account response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse add account response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to add account [{resp.text}] - [{resp.status_code}]')

change_account_credentials(change_account_credentials)

Marks the account for password changing immediately by CPM https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Change-credentials-immediately.htm

Parameters:

Name Type Description Default
change_account_credentials ArkPCloudChangeAccountCredentials

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def change_account_credentials(self, change_account_credentials: ArkPCloudChangeAccountCredentials) -> None:
    """
    Marks the account for password changing immediately by CPM
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Change-credentials-immediately.htm

    Args:
        change_account_credentials (ArkPCloudChangeAccountCredentials): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(f'Marking account [{change_account_credentials.account_id}] for changing credentials immediately')
    resp: Response = self._client.post(CHANGE_ACCOUNT_CREDENTIALS.format(account_id=change_account_credentials.account_id))
    if resp.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to mark account for changing credentials immediately [{resp.text}] - [{resp.status_code}]')

delete_account(delete_account)

Deletes an account by given id https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Delete%20Account.htm

Parameters:

Name Type Description Default
delete_account ArkPCloudDeleteAccount

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def delete_account(self, delete_account: ArkPCloudDeleteAccount) -> None:
    """
    Deletes an account by given id
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Delete%20Account.htm

    Args:
        delete_account (ArkPCloudDeleteAccount): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(f'Deleting account [{delete_account.account_id}]')
    resp: Response = self._client.delete(ACCOUNT_URL.format(account_id=delete_account.account_id))
    if resp.status_code != HTTPStatus.NO_CONTENT:
        raise ArkServiceException(f'Failed to delete account [{resp.text}] - [{resp.status_code}]')

generate_account_credentials(generate_account_credentials)

Generate a new random password for an existing account with policy restrictions https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/Secrets-Generate-Password.htm

Parameters:

Name Type Description Default
generate_account_credentials ArkPCloudGenerateAccountCredentials

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkPCloudAccountCredentials ArkPCloudAccountCredentials

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def generate_account_credentials(
    self, generate_account_credentials: ArkPCloudGenerateAccountCredentials
) -> ArkPCloudAccountCredentials:
    """
    Generate a new random password for an existing account with policy restrictions
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/Secrets-Generate-Password.htm

    Args:
        generate_account_credentials (ArkPCloudGenerateAccountCredentials): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkPCloudAccountCredentials: _description_
    """
    self._logger.info(f'Generating new password for account [{generate_account_credentials.account_id}]')
    resp: Response = self._client.post(GENERATE_ACCOUNT_CREDENTIALS.format(account_id=generate_account_credentials.account_id))
    if resp.status_code == HTTPStatus.OK:
        try:
            return ArkPCloudAccountCredentials(account_id=generate_account_credentials.account_id, password=resp.json()['password'])
        except (ValidationError, JSONDecodeError, KeyError) as ex:
            self._logger.exception(f'Failed to parse genereate account credentials response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse genereate account credentials response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to generate password for account [{resp.text}] - [{resp.status_code}]')

Link an account by given info https://docs.cyberark.com/Product-Doc/OnlineHelp/PrivCloud-SS/Latest/en/Content/WebServices/Link-account.htm

Parameters:

Name Type Description Default
link_account ArkPCloudLinkAccount

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def link_account(self, link_account: ArkPCloudLinkAccount) -> None:
    """
    Link an account by given info
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PrivCloud-SS/Latest/en/Content/WebServices/Link-account.htm

    Args:
        link_account (ArkPCloudLinkAccount): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(
        f'Linking account [{link_account.account_id}] '
        f'to name [{link_account.name}] in safe [{link_account.safe}] in folder [{link_account.folder}] '
        f'by idx [{link_account.extra_password_index}]'
    )
    resp: Response = self._client.post(
        LINK_ACCOUNT.format(account_id=link_account.account_id), json=link_account.dict(exclude={'account_id'}, by_alias=True)
    )
    if resp.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to link account [{resp.text}] - [{resp.status_code}]')

list_account_secret_versions(list_account_secret_versions)

Lists the given account secret versions https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/Secrets-Get-versions.htm

Parameters:

Name Type Description Default
list_account_secret_versions ArkPCloudListAccountSecretVersions

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Type Description
List[ArkPCloudAccountSecretVersion]

List[ArkPCloudAccountSecretVersion]: description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def list_account_secret_versions(
    self, list_account_secret_versions: ArkPCloudListAccountSecretVersions
) -> List[ArkPCloudAccountSecretVersion]:
    """
    Lists the given account secret versions
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/Secrets-Get-versions.htm

    Args:
        list_account_secret_versions (ArkPCloudListAccountSecretVersions): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        List[ArkPCloudAccountSecretVersion]: _description_
    """
    self._logger.info(f'Listing account [{list_account_secret_versions.account_id}] secret versions')
    resp: Response = self._client.get(ACCOUNT_SECRET_VERSIONS.format(account_id=list_account_secret_versions.account_id))
    if resp.status_code == HTTPStatus.OK:
        try:
            return parse_obj_as(List[ArkPCloudAccountSecretVersion], resp.json()['versions'])
        except (ValidationError, JSONDecodeError, KeyError) as ex:
            self._logger.exception(f'Failed to parse list account secret versions response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse list account secret versions response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to list account secret versions [{resp.text}] - [{resp.status_code}]')

list_accounts()

Yields all visible accounts to the logged in user as pages of accounts https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/GetAccounts.htm

Yields:

Type Description
ArkPCloudAccountsPage

Iterator[ArkPCloudAccountsPage]: description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
 97
 98
 99
100
101
102
103
104
105
106
def list_accounts(self) -> Iterator[ArkPCloudAccountsPage]:
    """
    Yields all visible accounts to the logged in user as pages of accounts
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/GetAccounts.htm

    Yields:
        Iterator[ArkPCloudAccountsPage]: _description_
    """
    self._logger.info('Listing all accounts')
    yield from self.__list_accounts_with_filters()

list_accounts_by(accounts_filter)

Yields visible accounts to the logged in user by filters as pages of accounts https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/GetAccounts.htm

Parameters:

Name Type Description Default
accounts_filter ArkPCloudAccountsFilter

description

required

Yields:

Type Description
ArkPCloudAccountsPage

Iterator[ArkPCloudAccountsPage]: description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def list_accounts_by(self, accounts_filter: ArkPCloudAccountsFilter) -> Iterator[ArkPCloudAccountsPage]:
    """
    Yields visible accounts to the logged in user by filters as pages of accounts
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/GetAccounts.htm

    Args:
        accounts_filter (ArkPCloudAccountsFilter): _description_

    Yields:
        Iterator[ArkPCloudAccountsPage]: _description_
    """
    self._logger.info(f'Listing accounts by filters [{accounts_filter}]')
    yield from self.__list_accounts_with_filters(
        accounts_filter.search,
        accounts_filter.search_type,
        accounts_filter.sort,
        accounts_filter.offset,
        accounts_filter.limit,
        accounts_filter.safe_name,
    )

reconcile_account_credentials(reconcile_account_credentials)

Marks the account for reconcilation https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Reconcile-account.htm

Parameters:

Name Type Description Default
reconcile_account_credentials ArkPCloudReconcileAccountCredentials

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def reconcile_account_credentials(self, reconcile_account_credentials: ArkPCloudReconcileAccountCredentials) -> None:
    """
    Marks the account for reconcilation
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Reconcile-account.htm

    Args:
        reconcile_account_credentials (ArkPCloudReconcileAccountCredentials): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(f'Marking account [{reconcile_account_credentials.account_id}] for reconcilation')
    resp: Response = self._client.post(RECONCILE_ACCOUNT_CREDENTIALS.format(account_id=reconcile_account_credentials.account_id))
    if resp.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to reconcile account credentials [{resp.text}] - [{resp.status_code}]')

set_account_next_credentials(set_account_next_credentials)

Marks the account to have its password changed to the given one via CPM https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/SetNextPassword.htm

Parameters:

Name Type Description Default
set_account_next_credentials ArkPCloudSetAccountNextCredentials

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def set_account_next_credentials(self, set_account_next_credentials: ArkPCloudSetAccountNextCredentials) -> None:
    """
    Marks the account to have its password changed to the given one via CPM
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/SetNextPassword.htm

    Args:
        set_account_next_credentials (ArkPCloudSetAccountNextCredentials): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(f'Marking account [{set_account_next_credentials.account_id}] for changing credentials for the given password')
    resp: Response = self._client.post(
        SET_ACCOUNT_NEXT_CREDENTIALS.format(account_id=set_account_next_credentials.account_id),
        json=set_account_next_credentials.dict(exclude={'account_id'}, by_alias=True),
    )
    if resp.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to mark account for changing credentials next password [{resp.text}] - [{resp.status_code}]')

Link an account by given info https://docs.cyberark.com/Product-Doc/OnlineHelp/PrivCloud-SS/Latest/en/Content/WebServices/Link-account-unlink.htm

Parameters:

Name Type Description Default
unlink_account ArkPCloudUnlinkAccount

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def unlink_account(self, unlink_account: ArkPCloudUnlinkAccount) -> None:
    """
    Link an account by given info
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PrivCloud-SS/Latest/en/Content/WebServices/Link-account-unlink.htm

    Args:
        unlink_account (ArkPCloudUnlinkAccount): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(f'Unlinking account [{unlink_account.account_id}] by idx [{unlink_account.extra_password_index}]')
    resp: Response = self._client.delete(
        UNLINK_ACCOUNT.format(account_id=unlink_account.account_id, extra_password_index=unlink_account.extra_password_index)
    )
    if resp.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to unlink account [{resp.text}] - [{resp.status_code}]')

update_account(update_account)

Updates an existing account with new details https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/UpdateAccount%20v10.htm

Parameters:

Name Type Description Default
update_account ArkPCloudUpdateAccount

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkPCloudAccount ArkPCloudAccount

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def update_account(self, update_account: ArkPCloudUpdateAccount) -> ArkPCloudAccount:
    """
    Updates an existing account with new details
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/SDK/UpdateAccount%20v10.htm

    Args:
        update_account (ArkPCloudUpdateAccount): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkPCloudAccount: _description_
    """
    self._logger.info(f'Updating account [{update_account.account_id}]')
    if update_account.remote_machines_access and not update_account.remote_machines_access.remote_machines:
        update_account.remote_machines_access = None
    operations = []
    for key, val in update_account.dict(exclude={'account_id'}, exclude_none=True, by_alias=True, exclude_defaults=True):
        operations.append({'op': 'replace', 'path': f'/{key}', 'value': val})
    resp: Response = self._client.put(ACCOUNT_URL.format(account_id=update_account.account_id), json=operations)
    if resp.status_code == HTTPStatus.OK:
        try:
            return ArkPCloudAccount.parse_obj(resp.json())
        except (ValidationError, JSONDecodeError) as ex:
            self._logger.exception(f'Failed to parse update account response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse update account response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to update account [{resp.text}] - [{resp.status_code}]')

update_account_credentials_in_vault(update_account_credentials_in_vault)

Updates the account credentials only in the vault without changing it on the machine itself https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/ChangeCredentialsInVault.htm

Parameters:

Name Type Description Default
update_account_credentials_in_vault ArkPCloudUpdateAccountCredentialsInVault

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def update_account_credentials_in_vault(self, update_account_credentials_in_vault: ArkPCloudUpdateAccountCredentialsInVault) -> None:
    """
    Updates the account credentials only in the vault without changing it on the machine itself
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/ChangeCredentialsInVault.htm

    Args:
        update_account_credentials_in_vault (ArkPCloudUpdateAccountCredentialsInVault): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(f'Updates account [{update_account_credentials_in_vault.account_id}] vault credentials')
    resp: Response = self._client.post(
        UPDATE_ACCOUNT_CREDENTIALS_IN_VAULT.format(account_id=update_account_credentials_in_vault.account_id),
        json=update_account_credentials_in_vault.dict(exclude={'account_id'}, by_alias=True),
    )
    if resp.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to update credentials in vault for account [{resp.text}] - [{resp.status_code}]')

verify_account_credentials(verify_account_credentials)

Marks the account for password verification by CPM https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Verify-credentials-v9-10.htm

Parameters:

Name Type Description Default
verify_account_credentials ArkPCloudVerifyAccountCredentials

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/pcloud/accounts/ark_pcloud_accounts_service.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def verify_account_credentials(self, verify_account_credentials: ArkPCloudVerifyAccountCredentials) -> None:
    """
    Marks the account for password verification by CPM
    https://docs.cyberark.com/Product-Doc/OnlineHelp/PAS/Latest/en/Content/WebServices/Verify-credentials-v9-10.htm

    Args:
        verify_account_credentials (ArkPCloudVerifyAccountCredentials): _description_

    Raises:
        ArkServiceException: _description_
    """
    self._logger.info(f'Marking account [{verify_account_credentials.account_id}] for verification')
    resp: Response = self._client.post(VERIFY_ACCOUNT_CREDENTIALS.format(account_id=verify_account_credentials.account_id))
    if resp.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to mark account for password verification [{resp.text}] - [{resp.status_code}]')