Skip to content

ark_dpa_vm_policies_service

ArkDPAVMPoliciesService

Bases: ArkService

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
class ArkDPAVMPoliciesService(ArkService):
    def __init__(self, isp_auth: ArkISPAuth) -> None:
        super().__init__(isp_auth)
        self.__isp_auth = isp_auth
        self.__client: ArkISPServiceClient = ArkISPServiceClient.from_isp_auth(self.__isp_auth, 'dpa')

    @property
    def isp_client(self) -> ArkISPServiceClient:
        return self.__client

    def __policy_id_by_name(self, policy_name: str) -> str:
        policies = self.list_policies_by(ArkDPAVMPoliciesFilter(name=policy_name))
        if not policies:
            raise ArkServiceException(f'Failed to find vm policy id by name [{policy_name}]')
        return policies[0].policy_id

    @staticmethod
    def __serialize_providers_dict(providers_data: ArkDPAVMProvidersDict) -> Dict:
        serialized_providers_data = {}
        for k in list(providers_data.keys()):
            serialized_providers_data[serialize_dpa_vm_policies_workspace_type(k)] = providers_data[k].dict(by_alias=True)
        return serialized_providers_data

    @staticmethod
    def __serialize_authorization_rules_dict(authorization_rules: List[Dict]) -> None:
        for rule in authorization_rules:
            for k in list(rule['connectionInformation']['connectAs'].keys()):
                for pk in list(rule['connectionInformation']['connectAs'][k].keys()):
                    item = rule['connectionInformation']['connectAs'][k][pk]
                    del rule['connectionInformation']['connectAs'][k][pk]
                    rule['connectionInformation']['connectAs'][k][serialize_dpa_vm_policies_protocol_type(pk)] = item
                item = rule['connectionInformation']['connectAs'][k]
                del rule['connectionInformation']['connectAs'][k]
                rule['connectionInformation']['connectAs'][serialize_dpa_vm_policies_workspace_type(k)] = item

    def add_policy(self, add_policy: ArkDPAVMAddPolicy) -> ArkDPAVMPolicy:
        """
        Adds a new VM policy with the specified information.

        Args:
            add_policy (ArkDPVMAAddPolicy): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkDPAVMPolicy: _description_
        """
        self._logger.info(f'Adding new vm policy [{add_policy.policy_name}]')
        add_policy_dict = add_policy.dict(by_alias=True)
        add_policy_dict['providersData'] = self.__serialize_providers_dict(add_policy.providers_data)
        self.__serialize_authorization_rules_dict(add_policy_dict['userAccessRules'])
        resp: Response = self.__client.post(VM_POLICIES_API, json=add_policy_dict)
        if resp.status_code == HTTPStatus.CREATED:
            try:
                policy_id = resp.json()['policyId']
                return self.policy(ArkDPAGetPolicy(policy_id=policy_id))
            except (ValidationError, JSONDecodeError, KeyError) as ex:
                self._logger.exception(f'Failed to parse add vm policy response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse add vm policy response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to add vm policy [{resp.text}] - [{resp.status_code}]')

    def delete_policy(self, delete_policy: ArkDPADeletePolicy) -> None:
        """
        Deletes the specified (ID or name) VM policy.

        Args:
            delete_policy (ArkDPADeletePolicy): _description_

        Raises:
            ArkServiceException: _description_
        """
        if delete_policy.policy_name and not delete_policy.policy_id:
            delete_policy.policy_id = self.__policy_id_by_name(delete_policy.policy_name)
        self._logger.info(f'Deleting vm policy [{delete_policy.policy_id}]')
        resp: Response = self.__client.delete(VM_POLICY_API.format(policy_id=delete_policy.policy_id))
        if resp.status_code != HTTPStatus.NO_CONTENT:
            raise ArkServiceException(f'Failed to delete vm policy [{resp.text}] - [{resp.status_code}]')

    def update_policy(self, update_policy: ArkDPAVMUpdatePolicy) -> ArkDPAVMPolicy:
        """
        Updates a VM policy.

        Args:
            update_policy (ArkDPAVMUpdatePolicy): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkDPAVMPolicy: _description_
        """
        if update_policy.policy_name and not update_policy.policy_id:
            update_policy.policy_id = self.__policy_id_by_name(update_policy.policy_name)
        self._logger.info(f'Updating vm policy [{update_policy.policy_id}]')
        update_dict = json.loads(update_policy.json(by_alias=True, exclude_none=True, exclude={'new_policy_name', 'policy_name'}))
        if update_policy.new_policy_name:
            update_dict['policyName'] = update_policy.new_policy_name
        else:
            update_dict['policyName'] = update_policy.policy_name
        if update_policy.providers_data:
            update_dict['providersData'] = self.__serialize_providers_dict(update_policy.providers_data)
        if 'userAccessRules' in update_dict:
            self.__serialize_authorization_rules_dict(update_dict['userAccessRules'])
        resp: Response = self.__client.put(VM_POLICY_API.format(policy_id=update_policy.policy_id), json=update_dict)
        if resp.status_code == HTTPStatus.OK:
            try:
                return ArkDPAVMPolicy.parse_obj(resp.json())
            except (ValidationError, JSONDecodeError) as ex:
                self._logger.exception(f'Failed to parse update vm policy response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse update vm policy response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to update vm policy [{resp.text}] - [{resp.status_code}]')

    def update_policy_status(self, update_policy_status: ArkDPAUpdatePolicyStatus) -> ArkDPAVMPolicy:
        """
        Updates the status of the specified (by ID) VM policy.

        Args:
            update_policy_status (ArkDPAUpdatePolicyStatus): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkDPAVMPolicy: _description_
        """
        if update_policy_status.policy_name and not update_policy_status.policy_id:
            update_policy_status.policy_id = self.__policy_id_by_name(update_policy_status.policy_name)
        self._logger.info(f'Updating vm policy status [{update_policy_status.policy_id}]')
        resp: Response = self.__client.put(
            VM_UPDATE_POLICY_STATUS_API.format(policy_id=update_policy_status.policy_id),
            json=update_policy_status.dict(exclude={'policy_id'}),
        )
        if resp.status_code == HTTPStatus.OK:
            return self.policy(ArkDPAGetPolicy(policy_id=update_policy_status.policy_id))
        raise ArkServiceException(f'Failed to update vm policy status [{resp.text}] - [{resp.status_code}]')

    def list_policies(self) -> List[ArkDPAVMPolicyListItem]:
        """
        Lists all of the tenants's VM policies.

        Raises:
            ArkServiceException: _description_

        Returns:
            List[ArkDPAVMPolicyListItem]: _description_
        """
        self._logger.info('Retrieving all vm policies')
        resp: Response = self.__client.get(VM_POLICIES_API)
        if resp.status_code == HTTPStatus.OK:
            try:
                return parse_obj_as(List[ArkDPAVMPolicyListItem], resp.json()['items'])
            except (ValidationError, JSONDecodeError, KeyError) as ex:
                self._logger.exception(f'Failed to parse list vm policies response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse list vm policies response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to list vm policies [{resp.text}] - [{resp.status_code}]')

    def list_policies_by(self, policies_filter: ArkDPAVMPoliciesFilter) -> List[ArkDPAVMPolicyListItem]:
        """
        Lists VM policies that match the specified filters.

        Args:
            policies_filter (ArkDPAVMPoliciesFilter): _description_

        Returns:
            List[ArkDPAVMPolicyListItem]: _description_
        """
        self._logger.info(f'Retrieving vm policies by filter [{policies_filter}]')
        policies = self.list_policies()

        # Filter by statuses
        if policies_filter.statuses:
            policies = [p for p in policies if p.status in policies_filter.statuses]

        # Filter by name wildcard
        if policies_filter.name:
            policies = [p for p in policies if fnmatch(p.policy_name, policies_filter.name)]

        # Filter by cloud providers
        if policies_filter.providers:
            policies = [p for p in policies if all(cp.value in p.platforms for cp in policies_filter.providers)]

        return policies

    def policy(self, get_policy: ArkDPAGetPolicy) -> ArkDPAVMPolicy:
        """
        Retrieves a VM policy by ID.

        Args:
            get_policy (ArkDPAGetPolicy): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            ArkDPAVMPolicy: _description_
        """
        if get_policy.policy_name and not get_policy.policy_id:
            get_policy.policy_id = self.__policy_id_by_name(get_policy.policy_name)
        self._logger.info(f'Retrieving vm policy [{get_policy.policy_id}]')
        resp: Response = self.__client.get(VM_POLICY_API.format(policy_id=get_policy.policy_id))
        if resp.status_code == HTTPStatus.OK:
            try:
                return ArkDPAVMPolicy.parse_obj(resp.json())
            except (ValidationError, JSONDecodeError) as ex:
                self._logger.exception(f'Failed to parse vm policy response [{str(ex)}] - [{resp.text}]')
                raise ArkServiceException(f'Failed to parse vm policy response [{str(ex)}]') from ex
        raise ArkServiceException(f'Failed to retrieve vm policy [{get_policy.policy_id}] [{resp.text}] - [{resp.status_code}]')

    def policies_stats(self) -> ArkDPAVMPoliciesStats:
        """
        Calculates VM policy statistics.

        Returns:
            ArkDPAVMPoliciesStats: _description_
        """
        self._logger.info('Calculating vm policies stats')
        policies = self.list_policies()
        policies_stats = ArkDPAVMPoliciesStats.construct()
        policies_stats.policies_count = len(policies)

        # Count policies per status
        status_types: Set[ArkDPARuleStatus] = {p.status for p in policies if p.status}
        policies_stats.policies_count_per_status = {st: len([p for p in policies if p.status and p.status == st]) for st in status_types}

        # Count policies per platforms
        policies_stats.policies_count_per_provider = {}
        for policy in policies:
            for platform in policy.platforms:
                if platform not in policies_stats.policies_count_per_provider:
                    policies_stats.policies_count_per_provider[platform] = 0
                policies_stats.policies_count_per_provider[platform] += 1

        return policies_stats

    @staticmethod
    @overrides
    def service_config() -> ArkServiceConfig:
        return SERVICE_CONFIG

add_policy(add_policy)

Adds a new VM policy with the specified information.

Parameters:

Name Type Description Default
add_policy ArkDPVMAAddPolicy

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkDPAVMPolicy ArkDPAVMPolicy

description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def add_policy(self, add_policy: ArkDPAVMAddPolicy) -> ArkDPAVMPolicy:
    """
    Adds a new VM policy with the specified information.

    Args:
        add_policy (ArkDPVMAAddPolicy): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkDPAVMPolicy: _description_
    """
    self._logger.info(f'Adding new vm policy [{add_policy.policy_name}]')
    add_policy_dict = add_policy.dict(by_alias=True)
    add_policy_dict['providersData'] = self.__serialize_providers_dict(add_policy.providers_data)
    self.__serialize_authorization_rules_dict(add_policy_dict['userAccessRules'])
    resp: Response = self.__client.post(VM_POLICIES_API, json=add_policy_dict)
    if resp.status_code == HTTPStatus.CREATED:
        try:
            policy_id = resp.json()['policyId']
            return self.policy(ArkDPAGetPolicy(policy_id=policy_id))
        except (ValidationError, JSONDecodeError, KeyError) as ex:
            self._logger.exception(f'Failed to parse add vm policy response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse add vm policy response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to add vm policy [{resp.text}] - [{resp.status_code}]')

delete_policy(delete_policy)

Deletes the specified (ID or name) VM policy.

Parameters:

Name Type Description Default
delete_policy ArkDPADeletePolicy

description

required

Raises:

Type Description
ArkServiceException

description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def delete_policy(self, delete_policy: ArkDPADeletePolicy) -> None:
    """
    Deletes the specified (ID or name) VM policy.

    Args:
        delete_policy (ArkDPADeletePolicy): _description_

    Raises:
        ArkServiceException: _description_
    """
    if delete_policy.policy_name and not delete_policy.policy_id:
        delete_policy.policy_id = self.__policy_id_by_name(delete_policy.policy_name)
    self._logger.info(f'Deleting vm policy [{delete_policy.policy_id}]')
    resp: Response = self.__client.delete(VM_POLICY_API.format(policy_id=delete_policy.policy_id))
    if resp.status_code != HTTPStatus.NO_CONTENT:
        raise ArkServiceException(f'Failed to delete vm policy [{resp.text}] - [{resp.status_code}]')

list_policies()

Lists all of the tenants's VM policies.

Raises:

Type Description
ArkServiceException

description

Returns:

Type Description
List[ArkDPAVMPolicyListItem]

List[ArkDPAVMPolicyListItem]: description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def list_policies(self) -> List[ArkDPAVMPolicyListItem]:
    """
    Lists all of the tenants's VM policies.

    Raises:
        ArkServiceException: _description_

    Returns:
        List[ArkDPAVMPolicyListItem]: _description_
    """
    self._logger.info('Retrieving all vm policies')
    resp: Response = self.__client.get(VM_POLICIES_API)
    if resp.status_code == HTTPStatus.OK:
        try:
            return parse_obj_as(List[ArkDPAVMPolicyListItem], resp.json()['items'])
        except (ValidationError, JSONDecodeError, KeyError) as ex:
            self._logger.exception(f'Failed to parse list vm policies response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse list vm policies response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to list vm policies [{resp.text}] - [{resp.status_code}]')

list_policies_by(policies_filter)

Lists VM policies that match the specified filters.

Parameters:

Name Type Description Default
policies_filter ArkDPAVMPoliciesFilter

description

required

Returns:

Type Description
List[ArkDPAVMPolicyListItem]

List[ArkDPAVMPolicyListItem]: description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def list_policies_by(self, policies_filter: ArkDPAVMPoliciesFilter) -> List[ArkDPAVMPolicyListItem]:
    """
    Lists VM policies that match the specified filters.

    Args:
        policies_filter (ArkDPAVMPoliciesFilter): _description_

    Returns:
        List[ArkDPAVMPolicyListItem]: _description_
    """
    self._logger.info(f'Retrieving vm policies by filter [{policies_filter}]')
    policies = self.list_policies()

    # Filter by statuses
    if policies_filter.statuses:
        policies = [p for p in policies if p.status in policies_filter.statuses]

    # Filter by name wildcard
    if policies_filter.name:
        policies = [p for p in policies if fnmatch(p.policy_name, policies_filter.name)]

    # Filter by cloud providers
    if policies_filter.providers:
        policies = [p for p in policies if all(cp.value in p.platforms for cp in policies_filter.providers)]

    return policies

policies_stats()

Calculates VM policy statistics.

Returns:

Name Type Description
ArkDPAVMPoliciesStats ArkDPAVMPoliciesStats

description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def policies_stats(self) -> ArkDPAVMPoliciesStats:
    """
    Calculates VM policy statistics.

    Returns:
        ArkDPAVMPoliciesStats: _description_
    """
    self._logger.info('Calculating vm policies stats')
    policies = self.list_policies()
    policies_stats = ArkDPAVMPoliciesStats.construct()
    policies_stats.policies_count = len(policies)

    # Count policies per status
    status_types: Set[ArkDPARuleStatus] = {p.status for p in policies if p.status}
    policies_stats.policies_count_per_status = {st: len([p for p in policies if p.status and p.status == st]) for st in status_types}

    # Count policies per platforms
    policies_stats.policies_count_per_provider = {}
    for policy in policies:
        for platform in policy.platforms:
            if platform not in policies_stats.policies_count_per_provider:
                policies_stats.policies_count_per_provider[platform] = 0
            policies_stats.policies_count_per_provider[platform] += 1

    return policies_stats

policy(get_policy)

Retrieves a VM policy by ID.

Parameters:

Name Type Description Default
get_policy ArkDPAGetPolicy

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkDPAVMPolicy ArkDPAVMPolicy

description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def policy(self, get_policy: ArkDPAGetPolicy) -> ArkDPAVMPolicy:
    """
    Retrieves a VM policy by ID.

    Args:
        get_policy (ArkDPAGetPolicy): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkDPAVMPolicy: _description_
    """
    if get_policy.policy_name and not get_policy.policy_id:
        get_policy.policy_id = self.__policy_id_by_name(get_policy.policy_name)
    self._logger.info(f'Retrieving vm policy [{get_policy.policy_id}]')
    resp: Response = self.__client.get(VM_POLICY_API.format(policy_id=get_policy.policy_id))
    if resp.status_code == HTTPStatus.OK:
        try:
            return ArkDPAVMPolicy.parse_obj(resp.json())
        except (ValidationError, JSONDecodeError) as ex:
            self._logger.exception(f'Failed to parse vm policy response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse vm policy response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to retrieve vm policy [{get_policy.policy_id}] [{resp.text}] - [{resp.status_code}]')

update_policy(update_policy)

Updates a VM policy.

Parameters:

Name Type Description Default
update_policy ArkDPAVMUpdatePolicy

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkDPAVMPolicy ArkDPAVMPolicy

description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def update_policy(self, update_policy: ArkDPAVMUpdatePolicy) -> ArkDPAVMPolicy:
    """
    Updates a VM policy.

    Args:
        update_policy (ArkDPAVMUpdatePolicy): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkDPAVMPolicy: _description_
    """
    if update_policy.policy_name and not update_policy.policy_id:
        update_policy.policy_id = self.__policy_id_by_name(update_policy.policy_name)
    self._logger.info(f'Updating vm policy [{update_policy.policy_id}]')
    update_dict = json.loads(update_policy.json(by_alias=True, exclude_none=True, exclude={'new_policy_name', 'policy_name'}))
    if update_policy.new_policy_name:
        update_dict['policyName'] = update_policy.new_policy_name
    else:
        update_dict['policyName'] = update_policy.policy_name
    if update_policy.providers_data:
        update_dict['providersData'] = self.__serialize_providers_dict(update_policy.providers_data)
    if 'userAccessRules' in update_dict:
        self.__serialize_authorization_rules_dict(update_dict['userAccessRules'])
    resp: Response = self.__client.put(VM_POLICY_API.format(policy_id=update_policy.policy_id), json=update_dict)
    if resp.status_code == HTTPStatus.OK:
        try:
            return ArkDPAVMPolicy.parse_obj(resp.json())
        except (ValidationError, JSONDecodeError) as ex:
            self._logger.exception(f'Failed to parse update vm policy response [{str(ex)}] - [{resp.text}]')
            raise ArkServiceException(f'Failed to parse update vm policy response [{str(ex)}]') from ex
    raise ArkServiceException(f'Failed to update vm policy [{resp.text}] - [{resp.status_code}]')

update_policy_status(update_policy_status)

Updates the status of the specified (by ID) VM policy.

Parameters:

Name Type Description Default
update_policy_status ArkDPAUpdatePolicyStatus

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
ArkDPAVMPolicy ArkDPAVMPolicy

description

Source code in ark_sdk_python/services/dpa/policies/vm/ark_dpa_vm_policies_service.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def update_policy_status(self, update_policy_status: ArkDPAUpdatePolicyStatus) -> ArkDPAVMPolicy:
    """
    Updates the status of the specified (by ID) VM policy.

    Args:
        update_policy_status (ArkDPAUpdatePolicyStatus): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        ArkDPAVMPolicy: _description_
    """
    if update_policy_status.policy_name and not update_policy_status.policy_id:
        update_policy_status.policy_id = self.__policy_id_by_name(update_policy_status.policy_name)
    self._logger.info(f'Updating vm policy status [{update_policy_status.policy_id}]')
    resp: Response = self.__client.put(
        VM_UPDATE_POLICY_STATUS_API.format(policy_id=update_policy_status.policy_id),
        json=update_policy_status.dict(exclude={'policy_id'}),
    )
    if resp.status_code == HTTPStatus.OK:
        return self.policy(ArkDPAGetPolicy(policy_id=update_policy_status.policy_id))
    raise ArkServiceException(f'Failed to update vm policy status [{resp.text}] - [{resp.status_code}]')