Skip to content

db

ArkUAPSIADBAccessPolicy

Bases: ArkUAPSIACommonAccessPolicy

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_access_policy.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ArkUAPSIADBAccessPolicy(ArkUAPSIACommonAccessPolicy):
    targets: Annotated[Dict[ArkWorkspaceType, ArkUAPSIADBTargets], Field(description='The targets of the db access policy')]

    @field_validator('targets', mode='before')
    @classmethod
    def validate_workspace_type(cls, val: Dict[str, Any]):
        if val is not None:
            for key in val.keys():
                if ArkWorkspaceType(key) not in [ArkWorkspaceType.FQDN_IP]:
                    raise ValueError('Invalid Workspace Type')
        return val

    def serialize_model(self, *args, **kwargs) -> Dict[str, Any]:
        """
        Serializes the model to a dictionary, including the profiles of each instance in the targets.
        Done to customize serialization of each instance's profile.
        This ensures that each instance's profile is serialized according to its authentication method,
        and included in the output under the 'profile' key.
        """
        data = super().model_dump(*args, **kwargs)

        for workspace_type, target in self.targets.items():
            serialized_instances = []
            for index, instance in enumerate(target.instances):
                profile = instance.profile_by_authentication_method()
                if profile is None:
                    raise ValueError(
                        f'No profile found for the given authentication method, instance: [{instance.instance_name}], authentication method: [{instance.authentication_method}]'
                    )

                instance_data = instance.model_dump(
                    *args,
                    **kwargs,
                )
                data['targets'][workspace_type]['instances'][index]['profile'] = profile.model_dump(*args, **kwargs)
                del data['targets'][workspace_type]['instances'][index][
                    instance.auth_method_to_profile_field_name()[instance.authentication_method]
                ]
                serialized_instances.append(instance_data)

        return data

serialize_model(*args, **kwargs)

Serializes the model to a dictionary, including the profiles of each instance in the targets. Done to customize serialization of each instance's profile. This ensures that each instance's profile is serialized according to its authentication method, and included in the output under the 'profile' key.

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_access_policy.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def serialize_model(self, *args, **kwargs) -> Dict[str, Any]:
    """
    Serializes the model to a dictionary, including the profiles of each instance in the targets.
    Done to customize serialization of each instance's profile.
    This ensures that each instance's profile is serialized according to its authentication method,
    and included in the output under the 'profile' key.
    """
    data = super().model_dump(*args, **kwargs)

    for workspace_type, target in self.targets.items():
        serialized_instances = []
        for index, instance in enumerate(target.instances):
            profile = instance.profile_by_authentication_method()
            if profile is None:
                raise ValueError(
                    f'No profile found for the given authentication method, instance: [{instance.instance_name}], authentication method: [{instance.authentication_method}]'
                )

            instance_data = instance.model_dump(
                *args,
                **kwargs,
            )
            data['targets'][workspace_type]['instances'][index]['profile'] = profile.model_dump(*args, **kwargs)
            del data['targets'][workspace_type]['instances'][index][
                instance.auth_method_to_profile_field_name()[instance.authentication_method]
            ]
            serialized_instances.append(instance_data)

    return data

ArkUAPSIADBFilters

Bases: ArkUAPFilters

This module defines filters specific to the SIA DB policies within the UAP (Unified Access Policies) service.

You can set the following fields:

  • policy_type: Optional[List[ArkUAPPolicyType]] A list of policy types to filter the policies by.

  • policy_tags: Optional[List[str]] A list of policy tags to filter the policies by.

  • identities: Optional[List[str]] A list of identities to filter the policies by.

  • status: Optional[List[ArkUAPStatusType]] A list of policy statuses to filter the policies by.

  • text_search: Optional[str] A text value to apply as a search filter across policies.

  • show_editable_policies: Optional[bool] Whether to show only policies that are editable by the current user.

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_filters.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ArkUAPSIADBFilters(ArkUAPFilters):
    """
    This module defines filters specific to the SIA DB policies
    within the UAP (Unified Access Policies) service.

    You can set the following fields:

    - policy_type: Optional[List[ArkUAPPolicyType]]
        A list of policy types to filter the policies by.

    - policy_tags: Optional[List[str]]
        A list of policy tags to filter the policies by.

    - identities: Optional[List[str]]
        A list of identities to filter the policies by.

    - status: Optional[List[ArkUAPStatusType]]
        A list of policy statuses to filter the policies by.

    - text_search: Optional[str]
        A text value to apply as a search filter across policies.

    - show_editable_policies: Optional[bool]
        Whether to show only policies that are editable by the current user.
    """

    target_category: List[ArkCategoryType] = Field(default=[ArkCategoryType.DB], description="Target category is fixed to SIA DB")

    def __setattr__(self, name: str, value: List[ArkCategoryType]) -> None:
        if name == 'target_category' and value != [ArkCategoryType.DB]:
            raise ValueError('target_category is final and cannot be modified.')
        super().__setattr__(name, value)

ArkUAPSIADBInstanceTarget

Bases: ArkCamelizedModel

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_instance_target.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class ArkUAPSIADBInstanceTarget(ArkCamelizedModel):
    instance_name: Annotated[
        str, Field(strict=True, min_length=1, max_length=UAP_SIA_DB_INSTANCE_NAME_LENGTH, description='The name of the database instance')
    ]
    instance_type: Annotated[ArkSIADBDatabaseFamilyType, Field(description='The database type of the database instance')]
    instance_id: Annotated[
        str, Field(strict=True, min_length=1, max_length=UAP_SIA_DB_INSTANCE_ID_LENGTH, description='The id of the database instance')
    ]
    authentication_method: Annotated[
        ArkUAPSIADBAuthenticationMethod, Field(description='The authentication method corresponding to this profile')
    ]

    # Profiles, only one of these will be set based on the authentication method.
    # Note that the API has a single profiles field, but we separate them here for clarity and easier usage.
    ldap_auth_profile: Annotated[
        Optional[ArkUAPSIADBLDAPAuthProfile], Field(description='The LDAP authentication profile for this database instance')
    ] = None
    db_auth_profile: Annotated[
        Optional[ArkUAPSIADBLocalDBAuthProfile],
        Field(description='The local database authentication profile for this database instance'),
    ] = None
    oracle_db_auth_profile: Annotated[
        ArkUAPSIADBOracleDBAuthProfile,
        Field(description='The Oracle database authentication profile for this database instance'),
    ] = None
    mongo_auth_profile: Annotated[
        Optional[ArkUAPSIADBMongoAuthProfile],
        Field(description='The MongoDB authentication profile for this database instance'),
    ] = None
    sql_server_auth_profile: Annotated[
        Optional[ArkUAPSIADBSqlServerAuthProfile],
        Field(description='The SQL Server authentication profile for this database instance'),
    ] = None
    rds_iam_user_auth_profile: Annotated[
        Optional[ArkUAPSIADBRDSIAMUserAuthProfile],
        Field(description='The RDS IAM User authentication profile for this database instance'),
    ] = None

    def databases_count(self) -> int:
        profile = self.profile_by_authentication_method()

        if not profile:
            raise ValueError(
                f'No profile found for the given authentication method, instance: [{self.instance_name}], authentication method: [{self.authentication_method}]'
            )

        return profile.databases_count()

    def profile_by_authentication_method(self) -> ArkUAPSIADBProfile:
        """
        Returns the profile corresponding to the authentication method.
        """
        if self.authentication_method == ArkUAPSIADBAuthenticationMethod.LDAP_AUTH:
            return self.ldap_auth_profile
        if self.authentication_method == ArkUAPSIADBAuthenticationMethod.DB_AUTH:
            return self.db_auth_profile
        if self.authentication_method == ArkUAPSIADBAuthenticationMethod.ORACLE_AUTH:
            return self.oracle_db_auth_profile
        if self.authentication_method == ArkUAPSIADBAuthenticationMethod.MONGO_AUTH:
            return self.mongo_auth_profile
        if self.authentication_method == ArkUAPSIADBAuthenticationMethod.SQLSERVER_AUTH:
            return self.sql_server_auth_profile
        if self.authentication_method == ArkUAPSIADBAuthenticationMethod.RDS_IAM_USER_AUTH:
            return self.rds_iam_user_auth_profile

        raise ValueError(f'Unsupported authentication method: {self.authentication_method}')

    @model_validator(mode='before')
    @classmethod
    def move_profile_to_specific_field(cls, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'profile' not in data:
            return data

        auth_method = data.get('authentication_method') or data.get('authenticationMethod')
        if not auth_method:
            return data  # This case will throw an error later since authentication method is defined as required,

        if isinstance(auth_method, str):
            auth_method = ArkUAPSIADBAuthenticationMethod(auth_method)

        profile_field_name = cls.auth_method_to_profile_field_name().get(auth_method)
        if not profile_field_name:
            raise ValueError(f'Unsupported authentication method for profile: {auth_method}')

        # Move 'profile' to the specific field.
        data[profile_field_name] = data.pop('profile')
        return data

    @classmethod
    def auth_method_to_profile_field_name(cls) -> Dict[ArkUAPSIADBAuthenticationMethod, str]:
        return {
            ArkUAPSIADBAuthenticationMethod.LDAP_AUTH: 'ldapAuthProfile',
            ArkUAPSIADBAuthenticationMethod.DB_AUTH: 'dbAuthProfile',
            ArkUAPSIADBAuthenticationMethod.ORACLE_AUTH: 'oracleDbAuthProfile',
            ArkUAPSIADBAuthenticationMethod.MONGO_AUTH: 'mongoAuthProfile',
            ArkUAPSIADBAuthenticationMethod.SQLSERVER_AUTH: 'sqlServerAuthProfile',
            ArkUAPSIADBAuthenticationMethod.RDS_IAM_USER_AUTH: 'rdsIamUserAuthProfile',
        }

profile_by_authentication_method()

Returns the profile corresponding to the authentication method.

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_instance_target.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def profile_by_authentication_method(self) -> ArkUAPSIADBProfile:
    """
    Returns the profile corresponding to the authentication method.
    """
    if self.authentication_method == ArkUAPSIADBAuthenticationMethod.LDAP_AUTH:
        return self.ldap_auth_profile
    if self.authentication_method == ArkUAPSIADBAuthenticationMethod.DB_AUTH:
        return self.db_auth_profile
    if self.authentication_method == ArkUAPSIADBAuthenticationMethod.ORACLE_AUTH:
        return self.oracle_db_auth_profile
    if self.authentication_method == ArkUAPSIADBAuthenticationMethod.MONGO_AUTH:
        return self.mongo_auth_profile
    if self.authentication_method == ArkUAPSIADBAuthenticationMethod.SQLSERVER_AUTH:
        return self.sql_server_auth_profile
    if self.authentication_method == ArkUAPSIADBAuthenticationMethod.RDS_IAM_USER_AUTH:
        return self.rds_iam_user_auth_profile

    raise ValueError(f'Unsupported authentication method: {self.authentication_method}')

ArkUAPSIADBMongoAuthProfile

Bases: ArkUAPSIADBProfile

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_profiles.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class ArkUAPSIADBMongoAuthProfile(ArkUAPSIADBProfile):
    global_builtin_roles: Annotated[
        List[ArkSIADBMongoGlobalBuiltinRole],
        Field(max_length=DB_PROFILE_MAXIMUM_ENTITIES, description='The list of global builtin roles to assign to the user'),
    ]
    database_builtin_roles: Annotated[
        Dict[
            DatabaseName,
            Annotated[
                List[ArkSIADBMongoDatabaseBuiltinRole],
                Field(
                    max_length=DB_PROFILE_MAXIMUM_ENTITIES,
                    description='The list of database specific builtin roles to assign to the user upon connecting to the specified database',
                ),
            ],
        ],
        Field(
            max_length=UAP_SIA_DB_TARGETS_MAX_ITEMS_COUNT,
            description='The list of database builtin roles to assign to the user, for each database',
        ),
    ]
    database_custom_roles: Annotated[
        Dict[
            DatabaseName,
            Annotated[
                List[Annotated[str, StringConstraints(strict=True, min_length=1, max_length=DB_ROLE_MAX_LENGTH)]],
                Field(
                    max_length=DB_PROFILE_MAXIMUM_ENTITIES,
                    description='The list of database specific custom roles to assign to the user upon connecting to the specified database',
                ),
            ],
        ],
        Field(
            max_length=UAP_SIA_DB_TARGETS_MAX_ITEMS_COUNT,
            description='The list of database custom roles to assign to the user, for each database',
        ),
    ]

    @overrides
    def databases_count(self) -> int:
        """
        Returns the number of all databases the defined by the profile.
        It includes all the databases defined by the profile, and the instance itself
        """
        return 1 + len(self.database_custom_roles.keys() | self.database_builtin_roles.keys())

    @model_validator(mode='after')
    def validate_global_roles(self) -> Self:
        _validate_global_roles(
            global_builtin_roles=self.global_builtin_roles,
            global_custom_roles=None,
            database_builtin_roles=self.database_builtin_roles,
            database_custom_roles=self.database_custom_roles,
        )

        return self

    @model_validator(mode='after')
    def validate_databases_roles(self) -> Self:
        _validate_databases_roles_logic(
            database_builtin_roles=self.database_builtin_roles,
            database_custom_roles=self.database_custom_roles,
        )

        return self

databases_count()

Returns the number of all databases the defined by the profile. It includes all the databases defined by the profile, and the instance itself

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_profiles.py
167
168
169
170
171
172
173
@overrides
def databases_count(self) -> int:
    """
    Returns the number of all databases the defined by the profile.
    It includes all the databases defined by the profile, and the instance itself
    """
    return 1 + len(self.database_custom_roles.keys() | self.database_builtin_roles.keys())

ArkUAPSIADBProfile

Bases: ArkCamelizedModel

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_profiles.py
89
90
91
92
93
94
95
class ArkUAPSIADBProfile(ArkCamelizedModel):
    # pylint: disable=no-self-use
    def databases_count(self) -> int:
        """
        Returns the number of all databases the defined by the profile.
        """
        return 1

databases_count()

Returns the number of all databases the defined by the profile.

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_profiles.py
91
92
93
94
95
def databases_count(self) -> int:
    """
    Returns the number of all databases the defined by the profile.
    """
    return 1

ArkUAPSIADBSqlServerAuthProfile

Bases: ArkUAPSIADBProfile

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_profiles.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class ArkUAPSIADBSqlServerAuthProfile(ArkUAPSIADBProfile):
    global_builtin_roles: Annotated[
        List[ArkSIADBSqlServerGlobalBuiltinRole],
        Field(
            description='The list of global builtin roles to assign to the user',
            default_factory=list,
        ),
    ]
    global_custom_roles: Annotated[
        List[SqlServerRole],
        Field(
            max_length=DB_PROFILE_MAXIMUM_ENTITIES,
            description='The list of global custom roles to assign to the user',
            default_factory=list,
        ),
    ]
    database_builtin_roles: Annotated[
        Dict[
            DatabaseName,
            Annotated[
                List[ArkSIADBSqlServerDatabaseBuiltinRole],
                Field(
                    max_length=DB_PROFILE_MAXIMUM_ENTITIES,
                    description='The list of database specific builtin roles to assign to the user upon connecting to the specified database',
                ),
            ],
        ],
        Field(
            max_length=UAP_SIA_DB_TARGETS_MAX_ITEMS_COUNT,
            description='The list of database builtin roles to assign to the user, for each database',
            default_factory=dict,
        ),
    ]
    database_custom_roles: Annotated[
        Dict[
            DatabaseName,
            Annotated[
                List[SqlServerRole],
                Field(
                    max_length=DB_PROFILE_MAXIMUM_ENTITIES,
                    description='The list of per database specific custom roles to assign to the user upon connecting to the specified database',
                ),
            ],
        ],
        Field(
            max_length=UAP_SIA_DB_TARGETS_MAX_ITEMS_COUNT,
            description='The list of database custom roles to assign to the user, for each database',
            default_factory=dict,
        ),
    ]

    @overrides
    def databases_count(self) -> int:
        """
        Returns the number of all databases the defined by the profile.
        It includes all the databases defined by the profile, and the instance itself
        """
        return 1 + len(self.database_custom_roles.keys() | self.database_builtin_roles.keys())

    @model_validator(mode='after')
    def validate_global_roles(self) -> Self:
        _validate_global_roles(
            global_builtin_roles=self.global_builtin_roles,
            global_custom_roles=self.global_custom_roles,
            database_builtin_roles=self.database_builtin_roles,
            database_custom_roles=self.database_custom_roles,
        )

        return self

    @model_validator(mode='after')
    def validate_databases_roles(self) -> Self:
        _validate_databases_roles_logic(
            database_builtin_roles=self.database_builtin_roles,
            database_custom_roles=self.database_custom_roles,
        )

        return self

databases_count()

Returns the number of all databases the defined by the profile. It includes all the databases defined by the profile, and the instance itself

Source code in ark_sdk_python/models/services/uap/sia/db/ark_uap_sia_db_profiles.py
247
248
249
250
251
252
253
@overrides
def databases_count(self) -> int:
    """
    Returns the number of all databases the defined by the profile.
    It includes all the databases defined by the profile, and the instance itself
    """
    return 1 + len(self.database_custom_roles.keys() | self.database_builtin_roles.keys())