Skip to content

ark_dpa_k8s_service

ArkDPAK8SService

Bases: ArkService

Source code in ark_sdk_python/services/dpa/k8s/ark_dpa_k8s_service.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class ArkDPAK8SService(ArkService):
    def __init__(self, isp_auth: ArkISPAuth) -> None:
        super().__init__(isp_auth)
        self.__isp_auth = isp_auth
        self.__client: ArkISPServiceClient = ArkISPServiceClient.from_isp_auth(self.__isp_auth, 'dpa')

    @staticmethod
    def _save_kube_config_file(folder: str, result: str) -> None:
        if not os.path.exists(folder):
            os.makedirs(folder)
        with open(f'{folder}{os.path.sep}config', 'w', encoding='utf-8') as file_handle:
            file_handle.write(result)

    def generate_kubeconfig(self, generate_kubeconfig: ArkDPAK8SGenerateKubeConfig) -> Optional[str]:
        """
        Builds a Kube config file used to connect to a K8s cluster.

        Args:
            generate_kubeconfig (ArkDPAK8SGenerateKubeConfig): _description_

        Raises:
            ArkServiceException: _description_

        Returns:
            str: __description__
        """
        self._logger.info('Building kube config file')
        response: Response = self.__client.get(BUILD_KUBE_CONFIG_PATH)
        if response.status_code != HTTPStatus.OK:
            raise ArkServiceException(f'Failed to build kube config file - [{response.status_code}] - [{response.text}]')
        result = response.text
        if generate_kubeconfig.folder:
            ArkDPAK8SService._save_kube_config_file(generate_kubeconfig.folder, result)
            return None
        return result

    @staticmethod
    @overrides
    def service_config() -> ArkServiceConfig:
        return SERVICE_CONFIG

generate_kubeconfig(generate_kubeconfig)

Builds a Kube config file used to connect to a K8s cluster.

Parameters:

Name Type Description Default
generate_kubeconfig ArkDPAK8SGenerateKubeConfig

description

required

Raises:

Type Description
ArkServiceException

description

Returns:

Name Type Description
str Optional[str]

description

Source code in ark_sdk_python/services/dpa/k8s/ark_dpa_k8s_service.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def generate_kubeconfig(self, generate_kubeconfig: ArkDPAK8SGenerateKubeConfig) -> Optional[str]:
    """
    Builds a Kube config file used to connect to a K8s cluster.

    Args:
        generate_kubeconfig (ArkDPAK8SGenerateKubeConfig): _description_

    Raises:
        ArkServiceException: _description_

    Returns:
        str: __description__
    """
    self._logger.info('Building kube config file')
    response: Response = self.__client.get(BUILD_KUBE_CONFIG_PATH)
    if response.status_code != HTTPStatus.OK:
        raise ArkServiceException(f'Failed to build kube config file - [{response.status_code}] - [{response.text}]')
    result = response.text
    if generate_kubeconfig.folder:
        ArkDPAK8SService._save_kube_config_file(generate_kubeconfig.folder, result)
        return None
    return result