Skip to content

ark_sia_vm_policies_service

ArkSIAVMPoliciesService

Bases: ArkService

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
class ArkSIAVMPoliciesService(ArkService):
    def __init__(self, isp_auth: ArkISPAuth) -> None:
        super().__init__(isp_auth)
        self.__isp_auth = isp_auth
        self.__client: ArkISPServiceClient = ArkISPServiceClient.from_isp_auth(
            isp_auth=self.__isp_auth,
            service_name='dpa',
            refresh_connection_callback=self.__refresh_sia_auth,
        )

    def __refresh_sia_auth(self, client: ArkISPServiceClient) -> None:
        ArkISPServiceClient.refresh_client(client, self.__isp_auth)

    @property
    def isp_client(self) -> ArkISPServiceClient:
        return self.__client

    def __policy_id_by_name(self, policy_name: str) -> str:
        policies = list(
            itertools.chain.from_iterable([p.items for p in list(self.list_policies_by(ArkSIAVMPoliciesFilter(name=policy_name)))])
        )

        if not policies:
            raise ArkServiceException(f'Failed to find vm policy id by name [{policy_name}]')
        return policies[0].policy_id

    @staticmethod
    def __serialize_providers_dict(providers_data: ArkSIAVMProvidersDict) -> Dict:
        serialized_providers_data = {}
        for k in list(providers_data.keys()):
            serialized_providers_data[serialize_sia_vm_policies_workspace_type(k)] = providers_data[k].model_dump(by_alias=True)
        return serialized_providers_data

    @staticmethod
    def __serialize_authorization_rules_dict(authorization_rules: List[Dict]) -> None:
        for rule in authorization_rules:
            for k in list(rule['connectionInformation']['connectAs'].keys()):
                for pk in list(rule['connectionInformation']['connectAs'][k].keys()):
                    item = rule['connectionInformation']['connectAs'][k][pk]
                    del rule['connectionInformation']['connectAs'][k][pk]
                    rule['connectionInformation']['connectAs'][k][serialize_sia_vm_policies_protocol_type(pk)] = item
                item = rule['connectionInformation']['connectAs'][k]
                del rule['connectionInformation']['connectAs'][k]
                rule['connectionInformation']['connectAs'][serialize_sia_vm_policies_workspace_type(k)] = item

    def add_policy(self, add_policy: ArkSIAVMAddPolicy) -> ArkSIAVMPolicy:
        """
        Adds a new VM policy with the specified information.

        Args:
            add_policy (ArkDPVMAAddPolicy): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkSIAVMPolicy: _description_
        """
        self._logger.info(f'Adding new vm policy [{add_policy.policy_name}]')
        add_policy_dict = add_policy.model_dump(by_alias=True)
        add_policy_dict['providersData'] = self.__serialize_providers_dict(add_policy.providers_data)
        self.__serialize_authorization_rules_dict(add_policy_dict['userAccessRules'])
        resp: Response = self.__client.post(VM_POLICIES_API, json=add_policy_dict)
        if resp.status_code == HTTPStatus.CREATED:
            try:
                policy_id = resp.json()['policyId']
                return self.policy(ArkSIAGetPolicy(policy_id=policy_id))
            except (ValidationError, JSONDecodeError, KeyError) as ex:
                self._logger.exception(f'Failed to parse add vm policy response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse add vm policy response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to add vm policy [{resp.text}] - [{resp.status_code}]')

    def delete_policy(self, delete_policy: ArkSIADeletePolicy) -> None:
        """
        Deletes the specified (ID or name) VM policy.

        Args:
            delete_policy (ArkSIADeletePolicy): _description_

        Raises:
            ArkServiceException: _description_
        """
        if delete_policy.policy_name and not delete_policy.policy_id:
            delete_policy.policy_id = self.__policy_id_by_name(delete_policy.policy_name)
        self._logger.info(f'Deleting vm policy [{delete_policy.policy_id}]')
        resp: Response = self.__client.delete(VM_POLICY_API.format(policy_id=delete_policy.policy_id))
        if resp.status_code != HTTPStatus.NO_CONTENT:
            raise ArkServiceException(f'Failed to delete vm policy [{resp.text}] - [{resp.status_code}]')

    def update_policy(self, update_policy: ArkSIAVMUpdatePolicy) -> ArkSIAVMPolicy:
        """
        Updates a VM policy.

        Args:
            update_policy (ArkSIAVMUpdatePolicy): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkSIAVMPolicy: _description_
        """
        if update_policy.policy_name and not update_policy.policy_id:
            update_policy.policy_id = self.__policy_id_by_name(update_policy.policy_name)
        self._logger.info(f'Updating vm policy [{update_policy.policy_id}]')
        update_dict = json.loads(
            update_policy.model_dump_json(by_alias=True, exclude_none=True, exclude={'new_policy_name', 'policy_name'})
        )
        if update_policy.new_policy_name:
            update_dict['policyName'] = update_policy.new_policy_name
        else:
            update_dict['policyName'] = update_policy.policy_name
        if update_policy.providers_data:
            update_dict['providersData'] = self.__serialize_providers_dict(update_policy.providers_data)
        if 'userAccessRules' in update_dict:
            self.__serialize_authorization_rules_dict(update_dict['userAccessRules'])
        resp: Response = self.__client.put(VM_POLICY_API.format(policy_id=update_policy.policy_id), json=update_dict)
        if resp.status_code == HTTPStatus.OK:
            try:
                return ArkSIAVMPolicy.model_validate(resp.json())
            except (ValidationError, JSONDecodeError) as ex:
                self._logger.exception(f'Failed to parse update vm policy response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse update vm policy response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to update vm policy [{resp.text}] - [{resp.status_code}]')

    def update_policy_status(self, update_policy_status: ArkSIAUpdatePolicyStatus) -> ArkSIAVMPolicy:
        """
        Updates the status of the specified (by ID) VM policy.

        Args:
            update_policy_status (ArkSIAUpdatePolicyStatus): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkSIAVMPolicy: _description_
        """
        if update_policy_status.policy_name and not update_policy_status.policy_id:
            update_policy_status.policy_id = self.__policy_id_by_name(update_policy_status.policy_name)
        self._logger.info(f'Updating vm policy status [{update_policy_status.policy_id}]')
        resp: Response = self.__client.put(
            VM_UPDATE_POLICY_STATUS_API.format(policy_id=update_policy_status.policy_id),
            json=update_policy_status.model_dump(exclude={'policy_id'}),
        )
        if resp.status_code == HTTPStatus.OK:
            return self.policy(ArkSIAGetPolicy(policy_id=update_policy_status.policy_id))
        raise ArkServiceException(f'Failed to update vm policy status [{resp.text}] - [{resp.status_code}]')

    def __get_policies(self, params: dict) -> Tuple[List[Union[ArkSIABasePolicyListItemExtended, ArkSIAVMPolicyListItem]], int]:
        resp: Response = self.__client.get(VM_POLICIES_API, params=params)
        if resp.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to list vm policies [{resp.text}] - [{resp.status_code}]')
        try:
            return self.__parse_policies(resp, params.get('extended', False)), json.loads(resp.text)['totalCount']
        except (ValidationError, JSONDecodeError, KeyError) as ex:
            self._logger.exception(f'Failed to parse list vm policies response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse list vm policies response [{str(ex)}]') from ex

    def query_policies(self, policies_filter: ArkSIAVMQueryPolicies = None) -> Iterator[ArkPolicyListItemPage]:
        """
        Lists VM policies that match the specified query.

        Raises:
            ArkServiceException: _description_

        Returns:
            Iterator[ArkPolicyListItemPage]: _description_
        """
        if policies_filter is not None and not isinstance(policies_filter, ArkSIAVMQueryPolicies):
            raise TypeError('policies_filter must be an instance of ArkSIAVMPoliciesFilterByQuery')

        self._logger.info(f'Retrieving all vm policies that comply to the filter: {policies_filter=}')
        params = self.__build_url_params(policies_filter=policies_filter)
        offset = params.get('offset', 0)

        iteration_count = 0
        while True:
            params['offset'] = offset
            parsed_policies, total_policies = self.__get_policies(params=params)
            yield ArkPolicyListItemPage(items=parsed_policies)

            offset += len(parsed_policies)
            if offset >= total_policies or (policies_filter and policies_filter.limit):
                break

            iteration_count += 1
            if iteration_count >= MAX_ITERATIONS:
                raise ArkException('Reached maximum number of iterations for listing policies.')

    def __build_url_params(self, policies_filter: ArkSIAVMQueryPolicies = None) -> Dict:
        if not policies_filter:
            return {}

        query_params = {}
        if policies_filter.filter_string:
            query_params['filter'] = policies_filter.filter_string
        if policies_filter.limit:
            query_params['limit'] = policies_filter.limit
        else:
            query_params['limit'] = POLICIES_QUERY_MAX_LIMIT
        if policies_filter.extended:
            query_params['extended'] = policies_filter.extended
        if policies_filter.sort:
            query_params['sort'] = policies_filter.sort
        if policies_filter.offset:
            query_params['offset'] = policies_filter.offset
        return query_params

    def __parse_policies(
        self, response: Response, is_extended: bool
    ) -> List[Union[ArkSIABasePolicyListItemExtended, ArkSIAVMPolicyListItem]]:
        response_items = response.json()['items']
        if is_extended:
            for item in response_items:
                # Inserting the provider name into the provider data for parsing reasons
                if 'providersData' in item:
                    for provider, provider_data in item['providersData'].items():
                        provider_data['provider_name'] = provider
        return TypeAdapter(List[ArkSIABasePolicyListItemExtended if is_extended else ArkSIAVMPolicyListItem]).validate_python(
            response_items
        )

    def list_policies(self) -> Iterator[ArkPolicyListItemPage]:
        """
        Lists all the tenants' VM policies.

        Raises:
            ArkServiceException: _description_

        Returns:
            List[ArkSIAVMPolicyListItem]: _description_
        """
        return self.query_policies()

    def list_policies_by(self, policies_filter: ArkSIAVMPoliciesFilter) -> Iterator[ArkPolicyListItemPage]:
        """
        Lists VM policies that match the specified filters.

        Args:
            policies_filter (ArkSIAVMPoliciesFilter): _description_

        Returns:
            Iterator[ArkPolicyListItemPage]: _description_
        """
        self._logger.info(f'Retrieving vm policies by filter [{policies_filter}]')
        filter_pairs = []

        # Filter by statuses
        if policies_filter.statuses:
            filter_pairs = [('status', status.value, 'eq') for status in policies_filter.statuses]

        # Filter by name wildcard
        if policies_filter.name:
            filter_pairs.append(('policyName', policies_filter.name, 'eq'))

        # Filter by cloud providers
        if policies_filter.providers:
            platform_pairs = [('platforms', platform.upper(), 'contains') for platform in policies_filter.providers]
            filter_pairs.extend(platform_pairs)

        final_filter = self.__build_filter_string(filter_pairs)
        return self.query_policies(ArkSIAVMQueryPolicies(filter_string=final_filter))

    def __build_filter_string(self, filter_pairs):
        final_filter = ""
        for index in range(len(filter_pairs)):
            if index == 0:
                final_filter = f"({filter_pairs[index][0]} {filter_pairs[index][2]} '{filter_pairs[index][1]}')"
            else:
                if filter_pairs[index][0] == 'status':
                    final_filter = f"({final_filter} or ({filter_pairs[index][0]} {filter_pairs[index][2]} '{filter_pairs[index][1]}'))"
                else:
                    final_filter = f"({final_filter} and ({filter_pairs[index][0]} {filter_pairs[index][2]} '{filter_pairs[index][1]}'))"
        return final_filter

    def policy(self, get_policy: ArkSIAGetPolicy) -> ArkSIAVMPolicy:
        """
        Retrieves a VM policy by ID.

        Args:
            get_policy (ArkSIAGetPolicy): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkSIAVMPolicy: _description_
        """
        if get_policy.policy_name and not get_policy.policy_id:
            get_policy.policy_id = self.__policy_id_by_name(get_policy.policy_name)
        self._logger.info(f'Retrieving vm policy [{get_policy.policy_id}]')
        resp: Response = self.__client.get(VM_POLICY_API.format(policy_id=get_policy.policy_id))
        if resp.status_code == HTTPStatus.OK:
            try:
                return ArkSIAVMPolicy.model_validate(resp.json())
            except (ValidationError, JSONDecodeError) as ex:
                self._logger.exception(f'Failed to parse vm policy response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse vm policy response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to retrieve vm policy [{get_policy.policy_id}] [{resp.text}] - [{resp.status_code}]')

    def policies_stats(self) -> ArkSIAVMPoliciesStats:
        """
        Calculates VM policy statistics.

        Returns:
            ArkSIAVMPoliciesStats: _description_
        """
        self._logger.info('Calculating vm policies stats')
        policies = list(itertools.chain.from_iterable([p.items for p in list(self.list_policies())]))
        policies_stats = ArkSIAVMPoliciesStats.model_construct()
        policies_stats.policies_count = len(policies)

        # Count policies per status
        status_types: Set[ArkSIARuleStatus] = {p.status for p in policies if p.status}
        policies_stats.policies_count_per_status = {st: len([p for p in policies if p.status and p.status == st]) for st in status_types}

        # Count policies per platforms
        policies_stats.policies_count_per_provider = {}
        for policy in policies:
            for platform in policy.platforms:
                if platform not in policies_stats.policies_count_per_provider:
                    policies_stats.policies_count_per_provider[platform] = 0
                policies_stats.policies_count_per_provider[platform] += 1

        return policies_stats

    @staticmethod
    @overrides
    def service_config() -> ArkServiceConfig:
        return SERVICE_CONFIG

add_policy(add_policy)

Adds a new VM policy with the specified information.

Parameters:

Name Type Description Default
add_policy ArkDPVMAAddPolicy

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkSIAVMPolicy ArkSIAVMPolicy

description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def add_policy(self, add_policy: ArkSIAVMAddPolicy) -> ArkSIAVMPolicy:
    """
    Adds a new VM policy with the specified information.

    Args:
        add_policy (ArkDPVMAAddPolicy): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkSIAVMPolicy: _description_
    """
    self._logger.info(f'Adding new vm policy [{add_policy.policy_name}]')
    add_policy_dict = add_policy.model_dump(by_alias=True)
    add_policy_dict['providersData'] = self.__serialize_providers_dict(add_policy.providers_data)
    self.__serialize_authorization_rules_dict(add_policy_dict['userAccessRules'])
    resp: Response = self.__client.post(VM_POLICIES_API, json=add_policy_dict)
    if resp.status_code == HTTPStatus.CREATED:
        try:
            policy_id = resp.json()['policyId']
            return self.policy(ArkSIAGetPolicy(policy_id=policy_id))
        except (ValidationError, JSONDecodeError, KeyError) as ex:
            self._logger.exception(f'Failed to parse add vm policy response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse add vm policy response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to add vm policy [{resp.text}] - [{resp.status_code}]')

delete_policy(delete_policy)

Deletes the specified (ID or name) VM policy.

Parameters:

Name Type Description Default
delete_policy ArkSIADeletePolicy

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def delete_policy(self, delete_policy: ArkSIADeletePolicy) -> None:
    """
    Deletes the specified (ID or name) VM policy.

    Args:
        delete_policy (ArkSIADeletePolicy): _description_

    Raises:
        ArkServiceException: _description_
    """
    if delete_policy.policy_name and not delete_policy.policy_id:
        delete_policy.policy_id = self.__policy_id_by_name(delete_policy.policy_name)
    self._logger.info(f'Deleting vm policy [{delete_policy.policy_id}]')
    resp: Response = self.__client.delete(VM_POLICY_API.format(policy_id=delete_policy.policy_id))
    if resp.status_code != HTTPStatus.NO_CONTENT:
        raise ArkServiceException(f'Failed to delete vm policy [{resp.text}] - [{resp.status_code}]')

list_policies()

Lists all the tenants' VM policies.

Raises:

Type Description
ArkServiceException

description

Returns:

Type Description
Iterator[ArkPolicyListItemPage]

List[ArkSIAVMPolicyListItem]: description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
276
277
278
279
280
281
282
283
284
285
286
def list_policies(self) -> Iterator[ArkPolicyListItemPage]:
    """
    Lists all the tenants' VM policies.

    Raises:
        ArkServiceException: _description_

    Returns:
        List[ArkSIAVMPolicyListItem]: _description_
    """
    return self.query_policies()

list_policies_by(policies_filter)

Lists VM policies that match the specified filters.

Parameters:

Name Type Description Default
policies_filter ArkSIAVMPoliciesFilter

description

required

Returns:

Type Description
Iterator[ArkPolicyListItemPage]

Iterator[ArkPolicyListItemPage]: description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def list_policies_by(self, policies_filter: ArkSIAVMPoliciesFilter) -> Iterator[ArkPolicyListItemPage]:
    """
    Lists VM policies that match the specified filters.

    Args:
        policies_filter (ArkSIAVMPoliciesFilter): _description_

    Returns:
        Iterator[ArkPolicyListItemPage]: _description_
    """
    self._logger.info(f'Retrieving vm policies by filter [{policies_filter}]')
    filter_pairs = []

    # Filter by statuses
    if policies_filter.statuses:
        filter_pairs = [('status', status.value, 'eq') for status in policies_filter.statuses]

    # Filter by name wildcard
    if policies_filter.name:
        filter_pairs.append(('policyName', policies_filter.name, 'eq'))

    # Filter by cloud providers
    if policies_filter.providers:
        platform_pairs = [('platforms', platform.upper(), 'contains') for platform in policies_filter.providers]
        filter_pairs.extend(platform_pairs)

    final_filter = self.__build_filter_string(filter_pairs)
    return self.query_policies(ArkSIAVMQueryPolicies(filter_string=final_filter))

policies_stats()

Calculates VM policy statistics.

Returns:

Name Type Description
ArkSIAVMPoliciesStats ArkSIAVMPoliciesStats

description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def policies_stats(self) -> ArkSIAVMPoliciesStats:
    """
    Calculates VM policy statistics.

    Returns:
        ArkSIAVMPoliciesStats: _description_
    """
    self._logger.info('Calculating vm policies stats')
    policies = list(itertools.chain.from_iterable([p.items for p in list(self.list_policies())]))
    policies_stats = ArkSIAVMPoliciesStats.model_construct()
    policies_stats.policies_count = len(policies)

    # Count policies per status
    status_types: Set[ArkSIARuleStatus] = {p.status for p in policies if p.status}
    policies_stats.policies_count_per_status = {st: len([p for p in policies if p.status and p.status == st]) for st in status_types}

    # Count policies per platforms
    policies_stats.policies_count_per_provider = {}
    for policy in policies:
        for platform in policy.platforms:
            if platform not in policies_stats.policies_count_per_provider:
                policies_stats.policies_count_per_provider[platform] = 0
            policies_stats.policies_count_per_provider[platform] += 1

    return policies_stats

policy(get_policy)

Retrieves a VM policy by ID.

Parameters:

Name Type Description Default
get_policy ArkSIAGetPolicy

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkSIAVMPolicy ArkSIAVMPolicy

description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def policy(self, get_policy: ArkSIAGetPolicy) -> ArkSIAVMPolicy:
    """
    Retrieves a VM policy by ID.

    Args:
        get_policy (ArkSIAGetPolicy): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkSIAVMPolicy: _description_
    """
    if get_policy.policy_name and not get_policy.policy_id:
        get_policy.policy_id = self.__policy_id_by_name(get_policy.policy_name)
    self._logger.info(f'Retrieving vm policy [{get_policy.policy_id}]')
    resp: Response = self.__client.get(VM_POLICY_API.format(policy_id=get_policy.policy_id))
    if resp.status_code == HTTPStatus.OK:
        try:
            return ArkSIAVMPolicy.model_validate(resp.json())
        except (ValidationError, JSONDecodeError) as ex:
            self._logger.exception(f'Failed to parse vm policy response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse vm policy response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to retrieve vm policy [{get_policy.policy_id}] [{resp.text}] - [{resp.status_code}]')

query_policies(policies_filter=None)

Lists VM policies that match the specified query.

Raises:

Type Description
ArkServiceException

description

Returns:

Type Description
Iterator[ArkPolicyListItemPage]

Iterator[ArkPolicyListItemPage]: description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def query_policies(self, policies_filter: ArkSIAVMQueryPolicies = None) -> Iterator[ArkPolicyListItemPage]:
    """
    Lists VM policies that match the specified query.

    Raises:
        ArkServiceException: _description_

    Returns:
        Iterator[ArkPolicyListItemPage]: _description_
    """
    if policies_filter is not None and not isinstance(policies_filter, ArkSIAVMQueryPolicies):
        raise TypeError('policies_filter must be an instance of ArkSIAVMPoliciesFilterByQuery')

    self._logger.info(f'Retrieving all vm policies that comply to the filter: {policies_filter=}')
    params = self.__build_url_params(policies_filter=policies_filter)
    offset = params.get('offset', 0)

    iteration_count = 0
    while True:
        params['offset'] = offset
        parsed_policies, total_policies = self.__get_policies(params=params)
        yield ArkPolicyListItemPage(items=parsed_policies)

        offset += len(parsed_policies)
        if offset >= total_policies or (policies_filter and policies_filter.limit):
            break

        iteration_count += 1
        if iteration_count >= MAX_ITERATIONS:
            raise ArkException('Reached maximum number of iterations for listing policies.')

update_policy(update_policy)

Updates a VM policy.

Parameters:

Name Type Description Default
update_policy ArkSIAVMUpdatePolicy

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkSIAVMPolicy ArkSIAVMPolicy

description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def update_policy(self, update_policy: ArkSIAVMUpdatePolicy) -> ArkSIAVMPolicy:
    """
    Updates a VM policy.

    Args:
        update_policy (ArkSIAVMUpdatePolicy): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkSIAVMPolicy: _description_
    """
    if update_policy.policy_name and not update_policy.policy_id:
        update_policy.policy_id = self.__policy_id_by_name(update_policy.policy_name)
    self._logger.info(f'Updating vm policy [{update_policy.policy_id}]')
    update_dict = json.loads(
        update_policy.model_dump_json(by_alias=True, exclude_none=True, exclude={'new_policy_name', 'policy_name'})
    )
    if update_policy.new_policy_name:
        update_dict['policyName'] = update_policy.new_policy_name
    else:
        update_dict['policyName'] = update_policy.policy_name
    if update_policy.providers_data:
        update_dict['providersData'] = self.__serialize_providers_dict(update_policy.providers_data)
    if 'userAccessRules' in update_dict:
        self.__serialize_authorization_rules_dict(update_dict['userAccessRules'])
    resp: Response = self.__client.put(VM_POLICY_API.format(policy_id=update_policy.policy_id), json=update_dict)
    if resp.status_code == HTTPStatus.OK:
        try:
            return ArkSIAVMPolicy.model_validate(resp.json())
        except (ValidationError, JSONDecodeError) as ex:
            self._logger.exception(f'Failed to parse update vm policy response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse update vm policy response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to update vm policy [{resp.text}] - [{resp.status_code}]')

update_policy_status(update_policy_status)

Updates the status of the specified (by ID) VM policy.

Parameters:

Name Type Description Default
update_policy_status ArkSIAUpdatePolicyStatus

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkSIAVMPolicy ArkSIAVMPolicy

description

Source code in ark_sdk_python/services/sia/policies/vm/ark_sia_vm_policies_service.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def update_policy_status(self, update_policy_status: ArkSIAUpdatePolicyStatus) -> ArkSIAVMPolicy:
    """
    Updates the status of the specified (by ID) VM policy.

    Args:
        update_policy_status (ArkSIAUpdatePolicyStatus): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkSIAVMPolicy: _description_
    """
    if update_policy_status.policy_name and not update_policy_status.policy_id:
        update_policy_status.policy_id = self.__policy_id_by_name(update_policy_status.policy_name)
    self._logger.info(f'Updating vm policy status [{update_policy_status.policy_id}]')
    resp: Response = self.__client.put(
        VM_UPDATE_POLICY_STATUS_API.format(policy_id=update_policy_status.policy_id),
        json=update_policy_status.model_dump(exclude={'policy_id'}),
    )
    if resp.status_code == HTTPStatus.OK:
        return self.policy(ArkSIAGetPolicy(policy_id=update_policy_status.policy_id))
    raise ArkServiceException(f'Failed to update vm policy status [{resp.text}] - [{resp.status_code}]')