Skip to content

ark_pty_ssh_win_connection

ArkPTYSSHWinConnection

Bases: ArkConnection

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class ArkPTYSSHWinConnection(ArkConnection):
    __ANSI_COLOR_STRIPPER: Final[Any] = re.compile(r'\x1b[^m]*m|\x1b\[\?2004[hl]')
    __ANSI_ESCAPE_STRIPPER: Final[Any] = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
    __PROMPT_REGEX: Final[Any] = re.compile(r"[#$]")
    __PASSWORD_PROMPT_REGEX: Final[Any] = re.compile(
        r"(?i)(?:password:)|(?:passphrase for key)|(?:^Please enter your password[\s\S]+.+ >:$)"
    )
    __EXIT_MAKRER: Final[str] = '__ARK_RC_MARK__'
    __DEFAULT_COL_SIZE: Final[int] = 80
    __DEFAULT_ROW_SIZE: Final[int] = 24
    __DEFAULT_NONEAGER_TIMEOEUT: Final[float] = 1.0
    __DEFAULT_PROMPT_OVERALL_TIMEOUT: Final[float] = 20.0
    __CHAR_SLEEP_TIME: Final[float] = 0.05

    def __init__(self):
        super().__init__()
        self.__is_connected: bool = False
        self.__is_suspended: bool = False
        self.__pty: Optional[Any] = None
        self.__output_lock: threading.Lock = threading.Lock()
        self.__buffer: str = ''

    def __strip_ansi(self, ansi_input: str) -> str:
        ansi_input = ansi_input.strip()
        ansi_input = ArkPTYSSHWinConnection.__ANSI_COLOR_STRIPPER.sub('', ansi_input)
        ansi_input = ArkPTYSSHWinConnection.__ANSI_ESCAPE_STRIPPER.sub('', ansi_input)
        return ansi_input

    def __reset_buffer(self) -> None:
        with self.__output_lock:
            self.__buffer = ''

    def __read_until_latest_prompt(
        self,
        password: Optional[str] = None,
        noneager_timeout: float = __DEFAULT_NONEAGER_TIMEOEUT,
        overall_timeout: float = __DEFAULT_PROMPT_OVERALL_TIMEOUT,
        login_prompt: bool = False,
        expected_prompt: Any = __PROMPT_REGEX,
    ) -> None:
        buffer = ''
        prompt_found_at = -1
        start_time = time.time()
        noneager_start_time = start_time

        while True:
            char = self.__pty.read(1)
            if not char:
                if ArkPTYSSHWinConnection.__PASSWORD_PROMPT_REGEX.search(self.__strip_ansi(buffer)) and not password and login_prompt:
                    raise RuntimeError(f'Password prompt with no password given [{buffer}]')
                if (time.time() - noneager_start_time > noneager_timeout) and prompt_found_at != -1:
                    break
                if (time.time() - start_time) > overall_timeout and prompt_found_at == -1:
                    raise RuntimeError(f'Timeout while waiting for prompt [{buffer}]')
                time.sleep(ArkPTYSSHWinConnection.__CHAR_SLEEP_TIME)
                continue

            buffer += char

            with self.__output_lock:
                self.__buffer += char

            if ArkPTYSSHWinConnection.__PASSWORD_PROMPT_REGEX.search(self.__strip_ansi(buffer)) and login_prompt:
                if not password:
                    raise RuntimeError(f'Password prompt with no password given [{buffer}]')
                self.__pty.write(password + '\n')
                buffer = ""
                prompt_found_at = -1
                continue

            if expected_prompt.search(buffer):
                prompt_found_at = len(buffer)

            noneager_start_time = time.time()

        if prompt_found_at != -1:
            with self.__output_lock:
                self.__buffer = buffer

    @overrides
    def connect(self, connection_details: ArkConnectionDetails) -> None:
        """
        Performs SSH connection with given details or keys
        Saves the ssh session to be used for command executions
        Done using windows pty

        Args:
            connection_details (ArkConnectionDetails): _description_

        Raises:
            ArkException: _description_
        """
        # pylint: disable=import-error
        import winpty

        if self.__is_connected:
            return

        address = connection_details.address
        user = connection_details.credentials.user
        port = connection_details.port or SSH_PORT
        password = None
        key_path = None
        if connection_details.credentials.password:
            password = connection_details.credentials.password.get_secret_value()
        elif connection_details.credentials.private_key_filepath:
            key_path = connection_details.credentials.private_key_filepath

        ssh_args = [f'{user}@{address}', '-p', str(port), '-o', 'StrictHostKeyChecking=no']
        if key_path:
            ssh_args.extend(['-i', str(key_path)])
        try:
            self.__pty = winpty.PTY(
                cols=ArkPTYSSHWinConnection.__DEFAULT_COL_SIZE,
                rows=ArkPTYSSHWinConnection.__DEFAULT_ROW_SIZE,
                backend=winpty.enums.Backend.WinPTY,
            )
            ssh_full_cmd = which('ssh', path=os.environ.get('PATH', os.defpath))
            self.__pty.spawn(ssh_full_cmd, cmdline=' ' + subprocess.list2cmdline(ssh_args))
            self.__read_until_latest_prompt(password, login_prompt=True)
            self.__is_connected = True
        except Exception as ex:
            raise ArkException(f'Failed to ssh connect [{str(ex)}]') from ex

    @overrides
    def disconnect(self) -> None:
        """
        Disconnects the ssh session
        """
        if self.__pty:
            self.__pty.write('exit\n')
            self.__pty = None
        self.__is_connected = False
        self.__is_suspended = False

    @overrides
    def suspend_connection(self) -> None:
        """
        Suspends execution of ssh commands
        """
        self.__is_suspended = True

    @overrides
    def restore_connection(self) -> None:
        """
        Restores execution of ssh commands
        """
        self.__is_suspended = False

    @overrides
    def is_suspended(self) -> bool:
        """
        Checks whether ssh commands can be executed or not

        Returns:
            bool: _description_
        """
        return self.__is_suspended

    @overrides
    def is_connected(self) -> bool:
        """
        Checks whether theres a ssh session connected

        Returns:
            bool: _description_
        """
        return self.__is_connected

    @overrides
    def run_command(self, command: ArkConnectionCommand) -> ArkConnectionResult:
        """
        Runs a command over ssh session via pty, returning the result accordingly
        stderr is not supported, only stdout is returned, and it'll contain everything including stderr

        Args:
            command (ArkConnectionCommand): _description_

        Raises:
            ArkException: _description_

        Returns:
            ArkConnectionResult: _description_
        """
        if not self.__is_connected or self.__is_suspended:
            raise ArkException('Cannot run command while not connected or suspended')

        self._logger.debug(f'Running command [{command.command}]')

        self.__reset_buffer()
        self.__pty.write(command.command + "\n")
        self.__read_until_latest_prompt()

        with self.__output_lock:
            stdout = self.__buffer.strip()

        self.__reset_buffer()
        self.__pty.write(f'echo {ArkPTYSSHWinConnection.__EXIT_MAKRER}_$?;\n')
        self.__read_until_latest_prompt()

        with self.__output_lock:
            lines = self.__buffer.strip().splitlines()

        rc = None
        for line in lines:
            line = self.__strip_ansi(line)
            if line.startswith(f'{ArkPTYSSHWinConnection.__EXIT_MAKRER}_'):
                try:
                    rc = int(line[len(ArkPTYSSHWinConnection.__EXIT_MAKRER) + 1 :])  # +1 for the underscore
                    break
                except ValueError:
                    continue
        if rc is None:
            raise ArkException(f"Failed to parse exit code from output - [{self.__buffer}]")
        if rc != command.expected_rc and command.raise_on_error:
            raise ArkException(f'Failed to execute command [{command.command}] - [{rc}] - [{stdout}]')
        self._logger.debug(f'Command rc: [{rc}]')
        self._logger.debug(f'Command stdout: [{stdout}]')
        return ArkConnectionResult(stdout=stdout, rc=rc)

connect(connection_details)

Performs SSH connection with given details or keys Saves the ssh session to be used for command executions Done using windows pty

Parameters:

Name Type Description Default
connection_details ArkConnectionDetails

description

required

Raises:

Type Description
ArkException

description

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@overrides
def connect(self, connection_details: ArkConnectionDetails) -> None:
    """
    Performs SSH connection with given details or keys
    Saves the ssh session to be used for command executions
    Done using windows pty

    Args:
        connection_details (ArkConnectionDetails): _description_

    Raises:
        ArkException: _description_
    """
    # pylint: disable=import-error
    import winpty

    if self.__is_connected:
        return

    address = connection_details.address
    user = connection_details.credentials.user
    port = connection_details.port or SSH_PORT
    password = None
    key_path = None
    if connection_details.credentials.password:
        password = connection_details.credentials.password.get_secret_value()
    elif connection_details.credentials.private_key_filepath:
        key_path = connection_details.credentials.private_key_filepath

    ssh_args = [f'{user}@{address}', '-p', str(port), '-o', 'StrictHostKeyChecking=no']
    if key_path:
        ssh_args.extend(['-i', str(key_path)])
    try:
        self.__pty = winpty.PTY(
            cols=ArkPTYSSHWinConnection.__DEFAULT_COL_SIZE,
            rows=ArkPTYSSHWinConnection.__DEFAULT_ROW_SIZE,
            backend=winpty.enums.Backend.WinPTY,
        )
        ssh_full_cmd = which('ssh', path=os.environ.get('PATH', os.defpath))
        self.__pty.spawn(ssh_full_cmd, cmdline=' ' + subprocess.list2cmdline(ssh_args))
        self.__read_until_latest_prompt(password, login_prompt=True)
        self.__is_connected = True
    except Exception as ex:
        raise ArkException(f'Failed to ssh connect [{str(ex)}]') from ex

disconnect()

Disconnects the ssh session

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
141
142
143
144
145
146
147
148
149
150
@overrides
def disconnect(self) -> None:
    """
    Disconnects the ssh session
    """
    if self.__pty:
        self.__pty.write('exit\n')
        self.__pty = None
    self.__is_connected = False
    self.__is_suspended = False

is_connected()

Checks whether theres a ssh session connected

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
176
177
178
179
180
181
182
183
184
@overrides
def is_connected(self) -> bool:
    """
    Checks whether theres a ssh session connected

    Returns:
        bool: _description_
    """
    return self.__is_connected

is_suspended()

Checks whether ssh commands can be executed or not

Returns:

Name Type Description
bool bool

description

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
166
167
168
169
170
171
172
173
174
@overrides
def is_suspended(self) -> bool:
    """
    Checks whether ssh commands can be executed or not

    Returns:
        bool: _description_
    """
    return self.__is_suspended

restore_connection()

Restores execution of ssh commands

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
159
160
161
162
163
164
@overrides
def restore_connection(self) -> None:
    """
    Restores execution of ssh commands
    """
    self.__is_suspended = False

run_command(command)

Runs a command over ssh session via pty, returning the result accordingly stderr is not supported, only stdout is returned, and it'll contain everything including stderr

Parameters:

Name Type Description Default
command ArkConnectionCommand

description

required

Raises:

Type Description
ArkException

description

Returns:

Name Type Description
ArkConnectionResult ArkConnectionResult

description

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@overrides
def run_command(self, command: ArkConnectionCommand) -> ArkConnectionResult:
    """
    Runs a command over ssh session via pty, returning the result accordingly
    stderr is not supported, only stdout is returned, and it'll contain everything including stderr

    Args:
        command (ArkConnectionCommand): _description_

    Raises:
        ArkException: _description_

    Returns:
        ArkConnectionResult: _description_
    """
    if not self.__is_connected or self.__is_suspended:
        raise ArkException('Cannot run command while not connected or suspended')

    self._logger.debug(f'Running command [{command.command}]')

    self.__reset_buffer()
    self.__pty.write(command.command + "\n")
    self.__read_until_latest_prompt()

    with self.__output_lock:
        stdout = self.__buffer.strip()

    self.__reset_buffer()
    self.__pty.write(f'echo {ArkPTYSSHWinConnection.__EXIT_MAKRER}_$?;\n')
    self.__read_until_latest_prompt()

    with self.__output_lock:
        lines = self.__buffer.strip().splitlines()

    rc = None
    for line in lines:
        line = self.__strip_ansi(line)
        if line.startswith(f'{ArkPTYSSHWinConnection.__EXIT_MAKRER}_'):
            try:
                rc = int(line[len(ArkPTYSSHWinConnection.__EXIT_MAKRER) + 1 :])  # +1 for the underscore
                break
            except ValueError:
                continue
    if rc is None:
        raise ArkException(f"Failed to parse exit code from output - [{self.__buffer}]")
    if rc != command.expected_rc and command.raise_on_error:
        raise ArkException(f'Failed to execute command [{command.command}] - [{rc}] - [{stdout}]')
    self._logger.debug(f'Command rc: [{rc}]')
    self._logger.debug(f'Command stdout: [{stdout}]')
    return ArkConnectionResult(stdout=stdout, rc=rc)

suspend_connection()

Suspends execution of ssh commands

Source code in ark_sdk_python/common/connections/ssh/ark_pty_ssh_win_connection.py
152
153
154
155
156
157
@overrides
def suspend_connection(self) -> None:
    """
    Suspends execution of ssh commands
    """
    self.__is_suspended = True