diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:55:42 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:55:42 +0000 |
commit | 62d9962ec7d01c95bf5732169320d3857a41446e (patch) | |
tree | f60d8fc63ff738e5f5afec48a84cf41480ee1315 /lib/ansible/utils/display.py | |
parent | Releasing progress-linux version 2.14.13-1~progress7.99u1. (diff) | |
download | ansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.tar.xz ansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.zip |
Merging upstream version 2.16.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/utils/display.py')
-rw-r--r-- | lib/ansible/utils/display.py | 374 |
1 files changed, 322 insertions, 52 deletions
diff --git a/lib/ansible/utils/display.py b/lib/ansible/utils/display.py index 7d98ad4..3f331ad 100644 --- a/lib/ansible/utils/display.py +++ b/lib/ansible/utils/display.py @@ -15,34 +15,49 @@ # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>. -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - +from __future__ import annotations + +try: + import curses +except ImportError: + HAS_CURSES = False +else: + # this will be set to False if curses.setupterm() fails + HAS_CURSES = True + +import collections.abc as c +import codecs import ctypes.util import fcntl import getpass +import io import logging import os import random import subprocess import sys +import termios import textwrap import threading import time +import tty +import typing as t +from functools import wraps from struct import unpack, pack -from termios import TIOCGWINSZ from ansible import constants as C -from ansible.errors import AnsibleError, AnsibleAssertionError -from ansible.module_utils._text import to_bytes, to_text +from ansible.errors import AnsibleError, AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive +from ansible.module_utils.common.text.converters import to_bytes, to_text from ansible.module_utils.six import text_type from ansible.utils.color import stringc from ansible.utils.multiprocessing import context as multiprocessing_context from ansible.utils.singleton import Singleton from ansible.utils.unsafe_proxy import wrap_var -from functools import wraps +if t.TYPE_CHECKING: + # avoid circular import at runtime + from ansible.executor.task_queue_manager import FinalQueue _LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c')) # Set argtypes, to avoid segfault if the wrong type is provided, @@ -52,8 +67,11 @@ _LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int) # Max for c_int _MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1 +MOVE_TO_BOL = b'\r' +CLEAR_TO_EOL = b'\x1b[K' + -def get_text_width(text): +def get_text_width(text: str) -> int: """Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the number of columns used to display a text string. @@ -104,6 +122,20 @@ def get_text_width(text): return width if width >= 0 else 0 +def proxy_display(method): + + def proxyit(self, *args, **kwargs): + if self._final_q: + # If _final_q is set, that means we are in a WorkerProcess + # and instead of displaying messages directly from the fork + # we will proxy them through the queue + return self._final_q.send_display(method.__name__, *args, **kwargs) + else: + return method(self, *args, **kwargs) + + return proxyit + + class FilterBlackList(logging.Filter): def __init__(self, blacklist): self.blacklist = [logging.Filter(name) for name in blacklist] @@ -164,7 +196,7 @@ b_COW_PATHS = ( ) -def _synchronize_textiowrapper(tio, lock): +def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock): # Ensure that a background thread can't hold the internal buffer lock on a file object # during a fork, which causes forked children to hang. We're using display's existing lock for # convenience (and entering the lock before a fork). @@ -179,15 +211,70 @@ def _synchronize_textiowrapper(tio, lock): buffer = tio.buffer # monkeypatching the underlying file-like object isn't great, but likely safer than subclassing - buffer.write = _wrap_with_lock(buffer.write, lock) - buffer.flush = _wrap_with_lock(buffer.flush, lock) + buffer.write = _wrap_with_lock(buffer.write, lock) # type: ignore[method-assign] + buffer.flush = _wrap_with_lock(buffer.flush, lock) # type: ignore[method-assign] + + +def setraw(fd: int, when: int = termios.TCSAFLUSH) -> None: + """Put terminal into a raw mode. + + Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG + + OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display + is proxied via the queue from forks. The problem is a race condition, in that we proxy the display + over the fork, but before it can be displayed, this plugin will have continued executing, potentially + setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF + """ + mode = termios.tcgetattr(fd) + mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON) + mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST) + mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB) + mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8 + mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) + mode[tty.CC][termios.VMIN] = 1 + mode[tty.CC][termios.VTIME] = 0 + termios.tcsetattr(fd, when, mode) + + +def clear_line(stdout: t.BinaryIO) -> None: + stdout.write(b'\x1b[%s' % MOVE_TO_BOL) + stdout.write(b'\x1b[%s' % CLEAR_TO_EOL) + + +def setup_prompt(stdin_fd: int, stdout_fd: int, seconds: int, echo: bool) -> None: + setraw(stdin_fd) + + # Only set stdout to raw mode if it is a TTY. This is needed when redirecting + # stdout to a file since a file cannot be set to raw mode. + if os.isatty(stdout_fd): + setraw(stdout_fd) + + if echo: + new_settings = termios.tcgetattr(stdin_fd) + new_settings[3] = new_settings[3] | termios.ECHO + termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings) + + +def setupterm() -> None: + # Nest the try except since curses.error is not available if curses did not import + try: + curses.setupterm() + except (curses.error, TypeError, io.UnsupportedOperation): + global HAS_CURSES + HAS_CURSES = False + else: + global MOVE_TO_BOL + global CLEAR_TO_EOL + # curses.tigetstr() returns None in some circumstances + MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL + CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL class Display(metaclass=Singleton): - def __init__(self, verbosity=0): + def __init__(self, verbosity: int = 0) -> None: - self._final_q = None + self._final_q: FinalQueue | None = None # NB: this lock is used to both prevent intermingled output between threads and to block writes during forks. # Do not change the type of this lock or upgrade to a shared lock (eg multiprocessing.RLock). @@ -197,11 +284,11 @@ class Display(metaclass=Singleton): self.verbosity = verbosity # list of all deprecation messages to prevent duplicate display - self._deprecations = {} - self._warns = {} - self._errors = {} + self._deprecations: dict[str, int] = {} + self._warns: dict[str, int] = {} + self._errors: dict[str, int] = {} - self.b_cowsay = None + self.b_cowsay: bytes | None = None self.noncow = C.ANSIBLE_COW_SELECTION self.set_cowsay_info() @@ -212,12 +299,12 @@ class Display(metaclass=Singleton): (out, err) = cmd.communicate() if cmd.returncode: raise Exception - self.cows_available = {to_text(c) for c in out.split()} # set comprehension + self.cows_available: set[str] = {to_text(c) for c in out.split()} if C.ANSIBLE_COW_ACCEPTLIST and any(C.ANSIBLE_COW_ACCEPTLIST): self.cows_available = set(C.ANSIBLE_COW_ACCEPTLIST).intersection(self.cows_available) except Exception: # could not execute cowsay for some reason - self.b_cowsay = False + self.b_cowsay = None self._set_column_width() @@ -228,13 +315,25 @@ class Display(metaclass=Singleton): except Exception as ex: self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}") + codecs.register_error('_replacing_warning_handler', self._replacing_warning_handler) try: - sys.stdout.reconfigure(errors='replace') - sys.stderr.reconfigure(errors='replace') + sys.stdout.reconfigure(errors='_replacing_warning_handler') + sys.stderr.reconfigure(errors='_replacing_warning_handler') except Exception as ex: - self.warning(f"failed to reconfigure stdout/stderr with the replace error handler: {ex}") + self.warning(f"failed to reconfigure stdout/stderr with custom encoding error handler: {ex}") - def set_queue(self, queue): + self.setup_curses = False + + def _replacing_warning_handler(self, exception: UnicodeError) -> tuple[str | bytes, int]: + # TODO: This should probably be deferred until after the current display is completed + # this will require some amount of new functionality + self.deprecated( + 'Non UTF-8 encoded data replaced with "?" while displaying text to stdout/stderr, this is temporary and will become an error', + version='2.18', + ) + return '?', exception.end + + def set_queue(self, queue: FinalQueue) -> None: """Set the _final_q on Display, so that we know to proxy display over the queue instead of directly writing to stdout/stderr from forks @@ -244,7 +343,7 @@ class Display(metaclass=Singleton): raise RuntimeError('queue cannot be set in parent process') self._final_q = queue - def set_cowsay_info(self): + def set_cowsay_info(self) -> None: if C.ANSIBLE_NOCOWS: return @@ -255,18 +354,23 @@ class Display(metaclass=Singleton): if os.path.exists(b_cow_path): self.b_cowsay = b_cow_path - def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False, newline=True): + @proxy_display + def display( + self, + msg: str, + color: str | None = None, + stderr: bool = False, + screen_only: bool = False, + log_only: bool = False, + newline: bool = True, + ) -> None: """ Display a message to the user Note: msg *must* be a unicode string to prevent UnicodeError tracebacks. """ - if self._final_q: - # If _final_q is set, that means we are in a WorkerProcess - # and instead of displaying messages directly from the fork - # we will proxy them through the queue - return self._final_q.send_display(msg, color=color, stderr=stderr, - screen_only=screen_only, log_only=log_only, newline=newline) + if not isinstance(msg, str): + raise TypeError(f'Display message must be str, not: {msg.__class__.__name__}') nocolor = msg @@ -321,32 +425,32 @@ class Display(metaclass=Singleton): # actually log logger.log(lvl, msg2) - def v(self, msg, host=None): + def v(self, msg: str, host: str | None = None) -> None: return self.verbose(msg, host=host, caplevel=0) - def vv(self, msg, host=None): + def vv(self, msg: str, host: str | None = None) -> None: return self.verbose(msg, host=host, caplevel=1) - def vvv(self, msg, host=None): + def vvv(self, msg: str, host: str | None = None) -> None: return self.verbose(msg, host=host, caplevel=2) - def vvvv(self, msg, host=None): + def vvvv(self, msg: str, host: str | None = None) -> None: return self.verbose(msg, host=host, caplevel=3) - def vvvvv(self, msg, host=None): + def vvvvv(self, msg: str, host: str | None = None) -> None: return self.verbose(msg, host=host, caplevel=4) - def vvvvvv(self, msg, host=None): + def vvvvvv(self, msg: str, host: str | None = None) -> None: return self.verbose(msg, host=host, caplevel=5) - def debug(self, msg, host=None): + def debug(self, msg: str, host: str | None = None) -> None: if C.DEFAULT_DEBUG: if host is None: self.display("%6d %0.5f: %s" % (os.getpid(), time.time(), msg), color=C.COLOR_DEBUG) else: self.display("%6d %0.5f [%s]: %s" % (os.getpid(), time.time(), host, msg), color=C.COLOR_DEBUG) - def verbose(self, msg, host=None, caplevel=2): + def verbose(self, msg: str, host: str | None = None, caplevel: int = 2) -> None: to_stderr = C.VERBOSE_TO_STDERR if self.verbosity > caplevel: @@ -355,7 +459,14 @@ class Display(metaclass=Singleton): else: self.display("<%s> %s" % (host, msg), color=C.COLOR_VERBOSE, stderr=to_stderr) - def get_deprecation_message(self, msg, version=None, removed=False, date=None, collection_name=None): + def get_deprecation_message( + self, + msg: str, + version: str | None = None, + removed: bool = False, + date: str | None = None, + collection_name: str | None = None, + ) -> str: ''' used to print out a deprecation message.''' msg = msg.strip() if msg and msg[-1] not in ['!', '?', '.']: @@ -390,7 +501,15 @@ class Display(metaclass=Singleton): return message_text - def deprecated(self, msg, version=None, removed=False, date=None, collection_name=None): + @proxy_display + def deprecated( + self, + msg: str, + version: str | None = None, + removed: bool = False, + date: str | None = None, + collection_name: str | None = None, + ) -> None: if not removed and not C.DEPRECATION_WARNINGS: return @@ -406,7 +525,8 @@ class Display(metaclass=Singleton): self.display(message_text.strip(), color=C.COLOR_DEPRECATE, stderr=True) self._deprecations[message_text] = 1 - def warning(self, msg, formatted=False): + @proxy_display + def warning(self, msg: str, formatted: bool = False) -> None: if not formatted: new_msg = "[WARNING]: %s" % msg @@ -419,11 +539,11 @@ class Display(metaclass=Singleton): self.display(new_msg, color=C.COLOR_WARN, stderr=True) self._warns[new_msg] = 1 - def system_warning(self, msg): + def system_warning(self, msg: str) -> None: if C.SYSTEM_WARNINGS: self.warning(msg) - def banner(self, msg, color=None, cows=True): + def banner(self, msg: str, color: str | None = None, cows: bool = True) -> None: ''' Prints a header-looking line with cowsay or stars with length depending on terminal width (3 minimum) ''' @@ -446,7 +566,7 @@ class Display(metaclass=Singleton): stars = u"*" * star_len self.display(u"\n%s %s" % (msg, stars), color=color) - def banner_cowsay(self, msg, color=None): + def banner_cowsay(self, msg: str, color: str | None = None) -> None: if u": [" in msg: msg = msg.replace(u"[", u"") if msg.endswith(u"]"): @@ -463,7 +583,7 @@ class Display(metaclass=Singleton): (out, err) = cmd.communicate() self.display(u"%s\n" % to_text(out), color=color) - def error(self, msg, wrap_text=True): + def error(self, msg: str, wrap_text: bool = True) -> None: if wrap_text: new_msg = u"\n[ERROR]: %s" % msg wrapped = textwrap.wrap(new_msg, self.columns) @@ -475,14 +595,24 @@ class Display(metaclass=Singleton): self._errors[new_msg] = 1 @staticmethod - def prompt(msg, private=False): + def prompt(msg: str, private: bool = False) -> str: if private: return getpass.getpass(msg) else: return input(msg) - def do_var_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None, unsafe=None): - + def do_var_prompt( + self, + varname: str, + private: bool = True, + prompt: str | None = None, + encrypt: str | None = None, + confirm: bool = False, + salt_size: int | None = None, + salt: str | None = None, + default: str | None = None, + unsafe: bool = False, + ) -> str: result = None if sys.__stdin__.isatty(): @@ -515,7 +645,7 @@ class Display(metaclass=Singleton): if encrypt: # Circular import because encrypt needs a display class from ansible.utils.encrypt import do_encrypt - result = do_encrypt(result, encrypt, salt_size, salt) + result = do_encrypt(result, encrypt, salt_size=salt_size, salt=salt) # handle utf-8 chars result = to_text(result, errors='surrogate_or_strict') @@ -524,9 +654,149 @@ class Display(metaclass=Singleton): result = wrap_var(result) return result - def _set_column_width(self): + def _set_column_width(self) -> None: if os.isatty(1): - tty_size = unpack('HHHH', fcntl.ioctl(1, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1] + tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1] else: tty_size = 0 self.columns = max(79, tty_size - 1) + + def prompt_until( + self, + msg: str, + private: bool = False, + seconds: int | None = None, + interrupt_input: c.Container[bytes] | None = None, + complete_input: c.Container[bytes] | None = None, + ) -> bytes: + if self._final_q: + from ansible.executor.process.worker import current_worker + self._final_q.send_prompt( + worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds, + interrupt_input=interrupt_input, complete_input=complete_input + ) + return current_worker.worker_queue.get() + + if HAS_CURSES and not self.setup_curses: + setupterm() + self.setup_curses = True + + if ( + self._stdin_fd is None + or not os.isatty(self._stdin_fd) + # Compare the current process group to the process group associated + # with terminal of the given file descriptor to determine if the process + # is running in the background. + or os.getpgrp() != os.tcgetpgrp(self._stdin_fd) + ): + raise AnsiblePromptNoninteractive('stdin is not interactive') + + # When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass, + # but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread. + # If the main thread handled this, it would also need to send a newline to the tty of any hanging pids. + # if seconds is None and interrupt_input is None and complete_input is None: + # try: + # return self.prompt(msg, private=private) + # except KeyboardInterrupt: + # # can't catch in the results_thread_main daemon thread + # raise AnsiblePromptInterrupt('user interrupt') + + self.display(msg) + result = b'' + with self._lock: + original_stdin_settings = termios.tcgetattr(self._stdin_fd) + try: + setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private) + + # flush the buffer to make sure no previous key presses + # are read in below + termios.tcflush(self._stdin, termios.TCIFLUSH) + + # read input 1 char at a time until the optional timeout or complete/interrupt condition is met + return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input) + finally: + # restore the old settings for the duped stdin stdin_fd + termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings) + + def _read_non_blocking_stdin( + self, + echo: bool = False, + seconds: int | None = None, + interrupt_input: c.Container[bytes] | None = None, + complete_input: c.Container[bytes] | None = None, + ) -> bytes: + if self._final_q: + raise NotImplementedError + + if seconds is not None: + start = time.time() + if interrupt_input is None: + try: + interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR] + except Exception: + interrupt = b'\x03' # value for Ctrl+C + + try: + backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]] + except Exception: + # unsupported/not present, use default + backspace_sequences = [b'\x7f', b'\x08'] + + result_string = b'' + while seconds is None or (time.time() - start < seconds): + key_pressed = None + try: + os.set_blocking(self._stdin_fd, False) + while key_pressed is None and (seconds is None or (time.time() - start < seconds)): + key_pressed = self._stdin.read(1) + # throttle to prevent excess CPU consumption + time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) + finally: + os.set_blocking(self._stdin_fd, True) + if key_pressed is None: + key_pressed = b'' + + if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input): + clear_line(self._stdout) + raise AnsiblePromptInterrupt('user interrupt') + if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input): + clear_line(self._stdout) + break + elif key_pressed in backspace_sequences: + clear_line(self._stdout) + result_string = result_string[:-1] + if echo: + self._stdout.write(result_string) + self._stdout.flush() + else: + result_string += key_pressed + return result_string + + @property + def _stdin(self) -> t.BinaryIO | None: + if self._final_q: + raise NotImplementedError + try: + return sys.stdin.buffer + except AttributeError: + return None + + @property + def _stdin_fd(self) -> int | None: + try: + return self._stdin.fileno() + except (ValueError, AttributeError): + return None + + @property + def _stdout(self) -> t.BinaryIO: + if self._final_q: + raise NotImplementedError + return sys.stdout.buffer + + @property + def _stdout_fd(self) -> int | None: + try: + return self._stdout.fileno() + except (ValueError, AttributeError): + return None |