summaryrefslogtreecommitdiffstats
path: root/lib/ansible/utils/display.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:55:42 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:55:42 +0000
commit62d9962ec7d01c95bf5732169320d3857a41446e (patch)
treef60d8fc63ff738e5f5afec48a84cf41480ee1315 /lib/ansible/utils/display.py
parentReleasing progress-linux version 2.14.13-1~progress7.99u1. (diff)
downloadansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.tar.xz
ansible-core-62d9962ec7d01c95bf5732169320d3857a41446e.zip
Merging upstream version 2.16.5.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/ansible/utils/display.py')
-rw-r--r--lib/ansible/utils/display.py374
1 files changed, 322 insertions, 52 deletions
diff --git a/lib/ansible/utils/display.py b/lib/ansible/utils/display.py
index 7d98ad4..3f331ad 100644
--- a/lib/ansible/utils/display.py
+++ b/lib/ansible/utils/display.py
@@ -15,34 +15,49 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
+from __future__ import annotations
+
+try:
+ import curses
+except ImportError:
+ HAS_CURSES = False
+else:
+ # this will be set to False if curses.setupterm() fails
+ HAS_CURSES = True
+
+import collections.abc as c
+import codecs
import ctypes.util
import fcntl
import getpass
+import io
import logging
import os
import random
import subprocess
import sys
+import termios
import textwrap
import threading
import time
+import tty
+import typing as t
+from functools import wraps
from struct import unpack, pack
-from termios import TIOCGWINSZ
from ansible import constants as C
-from ansible.errors import AnsibleError, AnsibleAssertionError
-from ansible.module_utils._text import to_bytes, to_text
+from ansible.errors import AnsibleError, AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive
+from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.module_utils.six import text_type
from ansible.utils.color import stringc
from ansible.utils.multiprocessing import context as multiprocessing_context
from ansible.utils.singleton import Singleton
from ansible.utils.unsafe_proxy import wrap_var
-from functools import wraps
+if t.TYPE_CHECKING:
+ # avoid circular import at runtime
+ from ansible.executor.task_queue_manager import FinalQueue
_LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
# Set argtypes, to avoid segfault if the wrong type is provided,
@@ -52,8 +67,11 @@ _LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int)
# Max for c_int
_MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1
+MOVE_TO_BOL = b'\r'
+CLEAR_TO_EOL = b'\x1b[K'
+
-def get_text_width(text):
+def get_text_width(text: str) -> int:
"""Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the
number of columns used to display a text string.
@@ -104,6 +122,20 @@ def get_text_width(text):
return width if width >= 0 else 0
+def proxy_display(method):
+
+ def proxyit(self, *args, **kwargs):
+ if self._final_q:
+ # If _final_q is set, that means we are in a WorkerProcess
+ # and instead of displaying messages directly from the fork
+ # we will proxy them through the queue
+ return self._final_q.send_display(method.__name__, *args, **kwargs)
+ else:
+ return method(self, *args, **kwargs)
+
+ return proxyit
+
+
class FilterBlackList(logging.Filter):
def __init__(self, blacklist):
self.blacklist = [logging.Filter(name) for name in blacklist]
@@ -164,7 +196,7 @@ b_COW_PATHS = (
)
-def _synchronize_textiowrapper(tio, lock):
+def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock):
# Ensure that a background thread can't hold the internal buffer lock on a file object
# during a fork, which causes forked children to hang. We're using display's existing lock for
# convenience (and entering the lock before a fork).
@@ -179,15 +211,70 @@ def _synchronize_textiowrapper(tio, lock):
buffer = tio.buffer
# monkeypatching the underlying file-like object isn't great, but likely safer than subclassing
- buffer.write = _wrap_with_lock(buffer.write, lock)
- buffer.flush = _wrap_with_lock(buffer.flush, lock)
+ buffer.write = _wrap_with_lock(buffer.write, lock) # type: ignore[method-assign]
+ buffer.flush = _wrap_with_lock(buffer.flush, lock) # type: ignore[method-assign]
+
+
+def setraw(fd: int, when: int = termios.TCSAFLUSH) -> None:
+ """Put terminal into a raw mode.
+
+ Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG
+
+ OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display
+ is proxied via the queue from forks. The problem is a race condition, in that we proxy the display
+ over the fork, but before it can be displayed, this plugin will have continued executing, potentially
+ setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF
+ """
+ mode = termios.tcgetattr(fd)
+ mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON)
+ mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST)
+ mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB)
+ mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8
+ mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
+ mode[tty.CC][termios.VMIN] = 1
+ mode[tty.CC][termios.VTIME] = 0
+ termios.tcsetattr(fd, when, mode)
+
+
+def clear_line(stdout: t.BinaryIO) -> None:
+ stdout.write(b'\x1b[%s' % MOVE_TO_BOL)
+ stdout.write(b'\x1b[%s' % CLEAR_TO_EOL)
+
+
+def setup_prompt(stdin_fd: int, stdout_fd: int, seconds: int, echo: bool) -> None:
+ setraw(stdin_fd)
+
+ # Only set stdout to raw mode if it is a TTY. This is needed when redirecting
+ # stdout to a file since a file cannot be set to raw mode.
+ if os.isatty(stdout_fd):
+ setraw(stdout_fd)
+
+ if echo:
+ new_settings = termios.tcgetattr(stdin_fd)
+ new_settings[3] = new_settings[3] | termios.ECHO
+ termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings)
+
+
+def setupterm() -> None:
+ # Nest the try except since curses.error is not available if curses did not import
+ try:
+ curses.setupterm()
+ except (curses.error, TypeError, io.UnsupportedOperation):
+ global HAS_CURSES
+ HAS_CURSES = False
+ else:
+ global MOVE_TO_BOL
+ global CLEAR_TO_EOL
+ # curses.tigetstr() returns None in some circumstances
+ MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL
+ CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL
class Display(metaclass=Singleton):
- def __init__(self, verbosity=0):
+ def __init__(self, verbosity: int = 0) -> None:
- self._final_q = None
+ self._final_q: FinalQueue | None = None
# NB: this lock is used to both prevent intermingled output between threads and to block writes during forks.
# Do not change the type of this lock or upgrade to a shared lock (eg multiprocessing.RLock).
@@ -197,11 +284,11 @@ class Display(metaclass=Singleton):
self.verbosity = verbosity
# list of all deprecation messages to prevent duplicate display
- self._deprecations = {}
- self._warns = {}
- self._errors = {}
+ self._deprecations: dict[str, int] = {}
+ self._warns: dict[str, int] = {}
+ self._errors: dict[str, int] = {}
- self.b_cowsay = None
+ self.b_cowsay: bytes | None = None
self.noncow = C.ANSIBLE_COW_SELECTION
self.set_cowsay_info()
@@ -212,12 +299,12 @@ class Display(metaclass=Singleton):
(out, err) = cmd.communicate()
if cmd.returncode:
raise Exception
- self.cows_available = {to_text(c) for c in out.split()} # set comprehension
+ self.cows_available: set[str] = {to_text(c) for c in out.split()}
if C.ANSIBLE_COW_ACCEPTLIST and any(C.ANSIBLE_COW_ACCEPTLIST):
self.cows_available = set(C.ANSIBLE_COW_ACCEPTLIST).intersection(self.cows_available)
except Exception:
# could not execute cowsay for some reason
- self.b_cowsay = False
+ self.b_cowsay = None
self._set_column_width()
@@ -228,13 +315,25 @@ class Display(metaclass=Singleton):
except Exception as ex:
self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}")
+ codecs.register_error('_replacing_warning_handler', self._replacing_warning_handler)
try:
- sys.stdout.reconfigure(errors='replace')
- sys.stderr.reconfigure(errors='replace')
+ sys.stdout.reconfigure(errors='_replacing_warning_handler')
+ sys.stderr.reconfigure(errors='_replacing_warning_handler')
except Exception as ex:
- self.warning(f"failed to reconfigure stdout/stderr with the replace error handler: {ex}")
+ self.warning(f"failed to reconfigure stdout/stderr with custom encoding error handler: {ex}")
- def set_queue(self, queue):
+ self.setup_curses = False
+
+ def _replacing_warning_handler(self, exception: UnicodeError) -> tuple[str | bytes, int]:
+ # TODO: This should probably be deferred until after the current display is completed
+ # this will require some amount of new functionality
+ self.deprecated(
+ 'Non UTF-8 encoded data replaced with "?" while displaying text to stdout/stderr, this is temporary and will become an error',
+ version='2.18',
+ )
+ return '?', exception.end
+
+ def set_queue(self, queue: FinalQueue) -> None:
"""Set the _final_q on Display, so that we know to proxy display over the queue
instead of directly writing to stdout/stderr from forks
@@ -244,7 +343,7 @@ class Display(metaclass=Singleton):
raise RuntimeError('queue cannot be set in parent process')
self._final_q = queue
- def set_cowsay_info(self):
+ def set_cowsay_info(self) -> None:
if C.ANSIBLE_NOCOWS:
return
@@ -255,18 +354,23 @@ class Display(metaclass=Singleton):
if os.path.exists(b_cow_path):
self.b_cowsay = b_cow_path
- def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False, newline=True):
+ @proxy_display
+ def display(
+ self,
+ msg: str,
+ color: str | None = None,
+ stderr: bool = False,
+ screen_only: bool = False,
+ log_only: bool = False,
+ newline: bool = True,
+ ) -> None:
""" Display a message to the user
Note: msg *must* be a unicode string to prevent UnicodeError tracebacks.
"""
- if self._final_q:
- # If _final_q is set, that means we are in a WorkerProcess
- # and instead of displaying messages directly from the fork
- # we will proxy them through the queue
- return self._final_q.send_display(msg, color=color, stderr=stderr,
- screen_only=screen_only, log_only=log_only, newline=newline)
+ if not isinstance(msg, str):
+ raise TypeError(f'Display message must be str, not: {msg.__class__.__name__}')
nocolor = msg
@@ -321,32 +425,32 @@ class Display(metaclass=Singleton):
# actually log
logger.log(lvl, msg2)
- def v(self, msg, host=None):
+ def v(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=0)
- def vv(self, msg, host=None):
+ def vv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=1)
- def vvv(self, msg, host=None):
+ def vvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=2)
- def vvvv(self, msg, host=None):
+ def vvvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=3)
- def vvvvv(self, msg, host=None):
+ def vvvvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=4)
- def vvvvvv(self, msg, host=None):
+ def vvvvvv(self, msg: str, host: str | None = None) -> None:
return self.verbose(msg, host=host, caplevel=5)
- def debug(self, msg, host=None):
+ def debug(self, msg: str, host: str | None = None) -> None:
if C.DEFAULT_DEBUG:
if host is None:
self.display("%6d %0.5f: %s" % (os.getpid(), time.time(), msg), color=C.COLOR_DEBUG)
else:
self.display("%6d %0.5f [%s]: %s" % (os.getpid(), time.time(), host, msg), color=C.COLOR_DEBUG)
- def verbose(self, msg, host=None, caplevel=2):
+ def verbose(self, msg: str, host: str | None = None, caplevel: int = 2) -> None:
to_stderr = C.VERBOSE_TO_STDERR
if self.verbosity > caplevel:
@@ -355,7 +459,14 @@ class Display(metaclass=Singleton):
else:
self.display("<%s> %s" % (host, msg), color=C.COLOR_VERBOSE, stderr=to_stderr)
- def get_deprecation_message(self, msg, version=None, removed=False, date=None, collection_name=None):
+ def get_deprecation_message(
+ self,
+ msg: str,
+ version: str | None = None,
+ removed: bool = False,
+ date: str | None = None,
+ collection_name: str | None = None,
+ ) -> str:
''' used to print out a deprecation message.'''
msg = msg.strip()
if msg and msg[-1] not in ['!', '?', '.']:
@@ -390,7 +501,15 @@ class Display(metaclass=Singleton):
return message_text
- def deprecated(self, msg, version=None, removed=False, date=None, collection_name=None):
+ @proxy_display
+ def deprecated(
+ self,
+ msg: str,
+ version: str | None = None,
+ removed: bool = False,
+ date: str | None = None,
+ collection_name: str | None = None,
+ ) -> None:
if not removed and not C.DEPRECATION_WARNINGS:
return
@@ -406,7 +525,8 @@ class Display(metaclass=Singleton):
self.display(message_text.strip(), color=C.COLOR_DEPRECATE, stderr=True)
self._deprecations[message_text] = 1
- def warning(self, msg, formatted=False):
+ @proxy_display
+ def warning(self, msg: str, formatted: bool = False) -> None:
if not formatted:
new_msg = "[WARNING]: %s" % msg
@@ -419,11 +539,11 @@ class Display(metaclass=Singleton):
self.display(new_msg, color=C.COLOR_WARN, stderr=True)
self._warns[new_msg] = 1
- def system_warning(self, msg):
+ def system_warning(self, msg: str) -> None:
if C.SYSTEM_WARNINGS:
self.warning(msg)
- def banner(self, msg, color=None, cows=True):
+ def banner(self, msg: str, color: str | None = None, cows: bool = True) -> None:
'''
Prints a header-looking line with cowsay or stars with length depending on terminal width (3 minimum)
'''
@@ -446,7 +566,7 @@ class Display(metaclass=Singleton):
stars = u"*" * star_len
self.display(u"\n%s %s" % (msg, stars), color=color)
- def banner_cowsay(self, msg, color=None):
+ def banner_cowsay(self, msg: str, color: str | None = None) -> None:
if u": [" in msg:
msg = msg.replace(u"[", u"")
if msg.endswith(u"]"):
@@ -463,7 +583,7 @@ class Display(metaclass=Singleton):
(out, err) = cmd.communicate()
self.display(u"%s\n" % to_text(out), color=color)
- def error(self, msg, wrap_text=True):
+ def error(self, msg: str, wrap_text: bool = True) -> None:
if wrap_text:
new_msg = u"\n[ERROR]: %s" % msg
wrapped = textwrap.wrap(new_msg, self.columns)
@@ -475,14 +595,24 @@ class Display(metaclass=Singleton):
self._errors[new_msg] = 1
@staticmethod
- def prompt(msg, private=False):
+ def prompt(msg: str, private: bool = False) -> str:
if private:
return getpass.getpass(msg)
else:
return input(msg)
- def do_var_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None, unsafe=None):
-
+ def do_var_prompt(
+ self,
+ varname: str,
+ private: bool = True,
+ prompt: str | None = None,
+ encrypt: str | None = None,
+ confirm: bool = False,
+ salt_size: int | None = None,
+ salt: str | None = None,
+ default: str | None = None,
+ unsafe: bool = False,
+ ) -> str:
result = None
if sys.__stdin__.isatty():
@@ -515,7 +645,7 @@ class Display(metaclass=Singleton):
if encrypt:
# Circular import because encrypt needs a display class
from ansible.utils.encrypt import do_encrypt
- result = do_encrypt(result, encrypt, salt_size, salt)
+ result = do_encrypt(result, encrypt, salt_size=salt_size, salt=salt)
# handle utf-8 chars
result = to_text(result, errors='surrogate_or_strict')
@@ -524,9 +654,149 @@ class Display(metaclass=Singleton):
result = wrap_var(result)
return result
- def _set_column_width(self):
+ def _set_column_width(self) -> None:
if os.isatty(1):
- tty_size = unpack('HHHH', fcntl.ioctl(1, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
+ tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
else:
tty_size = 0
self.columns = max(79, tty_size - 1)
+
+ def prompt_until(
+ self,
+ msg: str,
+ private: bool = False,
+ seconds: int | None = None,
+ interrupt_input: c.Container[bytes] | None = None,
+ complete_input: c.Container[bytes] | None = None,
+ ) -> bytes:
+ if self._final_q:
+ from ansible.executor.process.worker import current_worker
+ self._final_q.send_prompt(
+ worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds,
+ interrupt_input=interrupt_input, complete_input=complete_input
+ )
+ return current_worker.worker_queue.get()
+
+ if HAS_CURSES and not self.setup_curses:
+ setupterm()
+ self.setup_curses = True
+
+ if (
+ self._stdin_fd is None
+ or not os.isatty(self._stdin_fd)
+ # Compare the current process group to the process group associated
+ # with terminal of the given file descriptor to determine if the process
+ # is running in the background.
+ or os.getpgrp() != os.tcgetpgrp(self._stdin_fd)
+ ):
+ raise AnsiblePromptNoninteractive('stdin is not interactive')
+
+ # When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass,
+ # but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread.
+ # If the main thread handled this, it would also need to send a newline to the tty of any hanging pids.
+ # if seconds is None and interrupt_input is None and complete_input is None:
+ # try:
+ # return self.prompt(msg, private=private)
+ # except KeyboardInterrupt:
+ # # can't catch in the results_thread_main daemon thread
+ # raise AnsiblePromptInterrupt('user interrupt')
+
+ self.display(msg)
+ result = b''
+ with self._lock:
+ original_stdin_settings = termios.tcgetattr(self._stdin_fd)
+ try:
+ setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private)
+
+ # flush the buffer to make sure no previous key presses
+ # are read in below
+ termios.tcflush(self._stdin, termios.TCIFLUSH)
+
+ # read input 1 char at a time until the optional timeout or complete/interrupt condition is met
+ return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input)
+ finally:
+ # restore the old settings for the duped stdin stdin_fd
+ termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings)
+
+ def _read_non_blocking_stdin(
+ self,
+ echo: bool = False,
+ seconds: int | None = None,
+ interrupt_input: c.Container[bytes] | None = None,
+ complete_input: c.Container[bytes] | None = None,
+ ) -> bytes:
+ if self._final_q:
+ raise NotImplementedError
+
+ if seconds is not None:
+ start = time.time()
+ if interrupt_input is None:
+ try:
+ interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR]
+ except Exception:
+ interrupt = b'\x03' # value for Ctrl+C
+
+ try:
+ backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]]
+ except Exception:
+ # unsupported/not present, use default
+ backspace_sequences = [b'\x7f', b'\x08']
+
+ result_string = b''
+ while seconds is None or (time.time() - start < seconds):
+ key_pressed = None
+ try:
+ os.set_blocking(self._stdin_fd, False)
+ while key_pressed is None and (seconds is None or (time.time() - start < seconds)):
+ key_pressed = self._stdin.read(1)
+ # throttle to prevent excess CPU consumption
+ time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
+ finally:
+ os.set_blocking(self._stdin_fd, True)
+ if key_pressed is None:
+ key_pressed = b''
+
+ if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input):
+ clear_line(self._stdout)
+ raise AnsiblePromptInterrupt('user interrupt')
+ if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input):
+ clear_line(self._stdout)
+ break
+ elif key_pressed in backspace_sequences:
+ clear_line(self._stdout)
+ result_string = result_string[:-1]
+ if echo:
+ self._stdout.write(result_string)
+ self._stdout.flush()
+ else:
+ result_string += key_pressed
+ return result_string
+
+ @property
+ def _stdin(self) -> t.BinaryIO | None:
+ if self._final_q:
+ raise NotImplementedError
+ try:
+ return sys.stdin.buffer
+ except AttributeError:
+ return None
+
+ @property
+ def _stdin_fd(self) -> int | None:
+ try:
+ return self._stdin.fileno()
+ except (ValueError, AttributeError):
+ return None
+
+ @property
+ def _stdout(self) -> t.BinaryIO:
+ if self._final_q:
+ raise NotImplementedError
+ return sys.stdout.buffer
+
+ @property
+ def _stdout_fd(self) -> int | None:
+ try:
+ return self._stdout.fileno()
+ except (ValueError, AttributeError):
+ return None