From 93ca7a4258e83996c0075ff5976658a12ffb6e02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20=27Necoro=27=20Neumann?= Date: Thu, 6 Jun 2013 17:35:45 +0200 Subject: i3: unify scripts into one --- .i3/config | 4 +- .i3/scripts/i3.py | 559 -------------- .i3/scripts/libs/i3.py | 559 ++++++++++++++ .i3/scripts/libs/sh.py | 1695 ++++++++++++++++++++++++++++++++++++++++++ .i3/scripts/new_workspace.py | 23 - .i3/scripts/workspaces.py | 74 ++ .i3/scripts/workspaces.sh | 24 - 7 files changed, 2330 insertions(+), 608 deletions(-) delete mode 100644 .i3/scripts/i3.py create mode 100644 .i3/scripts/libs/i3.py create mode 100644 .i3/scripts/libs/sh.py delete mode 100755 .i3/scripts/new_workspace.py create mode 100755 .i3/scripts/workspaces.py delete mode 100755 .i3/scripts/workspaces.sh (limited to '.i3') diff --git a/.i3/config b/.i3/config index f72bdc4..d0cc49a 100644 --- a/.i3/config +++ b/.i3/config @@ -149,14 +149,14 @@ bindsym $mod+Tab workspace back_and_forth bindsym $mod+Shift+Tab move container to workspace back_and_forth # goto a specific workspace, via i3-input «3 -bindsym $mod+g exec $nsi $script/workspaces.sh +bindsym $mod+g exec $nsi $script/workspaces.py switch bindsym $mod+Shift+g exec $nsi $script/workspaces.sh move # rename «3 bindsym $mod+Shift+n exec $nsi i3-input -F 'rename workspace to "%s"' -P 'Rename workspace: ' # new temp workspace «3 -bindsym $mod+n exec $nsi $script/new_workspace.py +bindsym $mod+n exec $nsi $script/workspaces.py new # Resizing «2 ############# diff --git a/.i3/scripts/i3.py b/.i3/scripts/i3.py deleted file mode 100644 index 343c709..0000000 --- a/.i3/scripts/i3.py +++ /dev/null @@ -1,559 +0,0 @@ -#====================================================================== -# i3 (Python module for communicating with i3 window manager) -# Copyright (C) 2012 Jure Ziberna -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -#====================================================================== - - -import sys -import subprocess -import json -import socket -import struct -import threading -import time - -ModuleType = type(sys) - - -__author__ = 'Jure Ziberna' -__version__ = '0.6.5' -__date__ = '2012-06-20' -__license__ = 'GNU GPL 3' - - -MSG_TYPES = [ - 'command', - 'get_workspaces', - 'subscribe', - 'get_outputs', - 'get_tree', - 'get_marks', - 'get_bar_config', -] - -EVENT_TYPES = [ - 'workspace', - 'output', -] - - -class i3Exception(Exception): - pass - -class MessageTypeError(i3Exception): - """ - Raised when message type isn't available. See i3.MSG_TYPES. - """ - def __init__(self, type): - msg = "Message type '%s' isn't available" % type - super(MessageTypeError, self).__init__(msg) - -class EventTypeError(i3Exception): - """ - Raised when even type isn't available. See i3.EVENT_TYPES. - """ - def __init__(self, type): - msg = "Event type '%s' isn't available" % type - super(EventTypeError, self).__init__(msg) - -class MessageError(i3Exception): - """ - Raised when a message to i3 is unsuccessful. - That is, when it contains 'success': false in its JSON formatted response. - """ - pass - -class ConnectionError(i3Exception): - """ - Raised when a socket couldn't connect to the window manager. - """ - def __init__(self, socket_path): - msg = "Could not connect to socket at '%s'" % socket_path - super(ConnectionError, self).__init__(msg) - - -def parse_msg_type(msg_type): - """ - Returns an i3-ipc code of the message type. Raises an exception if - the given message type isn't available. - """ - try: - index = int(msg_type) - except ValueError: - index = -1 - if index >= 0 and index < len(MSG_TYPES): - return index - msg_type = str(msg_type).lower() - if msg_type in MSG_TYPES: - return MSG_TYPES.index(msg_type) - else: - raise MessageTypeError(msg_type) - -def parse_event_type(event_type): - """ - Returns an i3-ipc string of the event_type. Raises an exception if - the given event type isn't available. - """ - try: - index = int(event_type) - except ValueError: - index = -1 - if index >= 0 and index < len(EVENT_TYPES): - return EVENT_TYPES[index] - event_type = str(event_type).lower() - if event_type in EVENT_TYPES: - return event_type - else: - raise EventTypeError(event_type) - - -class Socket(object): - """ - Socket for communicating with the i3 window manager. - Optional arguments: - - path of the i3 socket. Path is retrieved from i3-wm itself via - "i3.get_socket_path()" if not provided. - - timeout in seconds - - chunk_size in bytes - - magic_string as a safety string for i3-ipc. Set to 'i3-ipc' by default. - """ - magic_string = 'i3-ipc' # safety string for i3-ipc - chunk_size = 1024 # in bytes - timeout = 0.5 # in seconds - buffer = b'' # byte string - - def __init__(self, path=None, timeout=None, chunk_size=None, - magic_string=None): - if not path: - path = get_socket_path() - self.path = path - if timeout: - self.timeout = timeout - if chunk_size: - self.chunk_size = chunk_size - if magic_string: - self.magic_string = magic_string - # Socket initialization and connection - self.initialize() - self.connect() - # Struct format initialization, length of magic string is in bytes - self.struct_header = '<%dsII' % len(self.magic_string.encode('utf-8')) - self.struct_header_size = struct.calcsize(self.struct_header) - - def initialize(self): - """ - Initializes the socket. - """ - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.settimeout(self.timeout) - - def connect(self, path=None): - """ - Connects the socket to socket path if not already connected. - """ - if not self.connected: - self.initialize() - if not path: - path = self.path - try: - self.socket.connect(path) - except socket.error: - raise ConnectionError(path) - - def get(self, msg_type, payload=''): - """ - Convenience method, calls "socket.send(msg_type, payload)" and - returns data from "socket.receive()". - """ - self.send(msg_type, payload) - return self.receive() - - def subscribe(self, event_type, event=None): - """ - Subscribes to an event. Returns data on first occurrence. - """ - event_type = parse_event_type(event_type) - # Create JSON payload from given event type and event - payload = [event_type] - if event: - payload.append(event) - payload = json.dumps(payload) - return self.get('subscribe', payload) - - def send(self, msg_type, payload=''): - """ - Sends the given message type with given message by packing them - and continuously sending bytes from the packed message. - """ - message = self.pack(msg_type, payload) - # Continuously send the bytes from the message - self.socket.sendall(message) - - def receive(self): - """ - Tries to receive a data. Unpacks the received byte string if - successful. Returns None on failure. - """ - try: - data = self.socket.recv(self.chunk_size) - msg_magic, msg_length, msg_type = self.unpack_header(data) - msg_size = self.struct_header_size + msg_length - # Keep receiving data until the whole message gets through - while len(data) < msg_size: - data += self.socket.recv(msg_length) - data = self.buffer + data - return self.unpack(data) - except socket.timeout: - return None - - def pack(self, msg_type, payload): - """ - Packs the given message type and payload. Turns the resulting - message into a byte string. - """ - msg_magic = self.magic_string - # Get the byte count instead of number of characters - msg_length = len(payload.encode('utf-8')) - msg_type = parse_msg_type(msg_type) - # "struct.pack" returns byte string, decoding it for concatenation - msg_length = struct.pack('I', msg_length).decode('utf-8') - msg_type = struct.pack('I', msg_type).decode('utf-8') - message = '%s%s%s%s' % (msg_magic, msg_length, msg_type, payload) - # Encoding the message back to byte string - return message.encode('utf-8') - - def unpack(self, data): - """ - Unpacks the given byte string and parses the result from JSON. - Returns None on failure and saves data into "self.buffer". - """ - data_size = len(data) - msg_magic, msg_length, msg_type = self.unpack_header(data) - msg_size = self.struct_header_size + msg_length - # Message shouldn't be any longer than the data - if data_size >= msg_size: - payload = data[self.struct_header_size:msg_size].decode('utf-8') - payload = json.loads(payload) - self.buffer = data[msg_size:] - return payload - else: - self.buffer = data - return None - - def unpack_header(self, data): - """ - Unpacks the header of given byte string. - """ - return struct.unpack(self.struct_header, data[:self.struct_header_size]) - - @property - def connected(self): - """ - Returns True if connected and False if not. - """ - try: - self.get('command') - return True - except socket.error: - return False - - def close(self): - """ - Closes the socket connection. - """ - self.socket.close() - - -class Subscription(threading.Thread): - """ - Creates a new subscription and runs a listener loop. Calls the - callback on event. - Example parameters: - callback = lambda event, data, subscription: print(data) - event_type = 'workspace' - event = 'focus' - event_socket = - data_socket = - """ - subscribed = False - type_translation = { - 'workspace': 'get_workspaces', - 'output': 'get_outputs' - } - - def __init__(self, callback, event_type, event=None, event_socket=None, - data_socket=None): - # Variable initialization - if not callable(callback): - raise TypeError('Callback must be callable') - event_type = parse_event_type(event_type) - self.callback = callback - self.event_type = event_type - self.event = event - # Socket initialization - if not event_socket: - event_socket = Socket() - self.event_socket = event_socket - self.event_socket.subscribe(event_type, event) - if not data_socket: - data_socket = Socket() - self.data_socket = data_socket - # Thread initialization - threading.Thread.__init__(self) - self.start() - - def run(self): - """ - Wrapper method for the listen method -- handles exceptions. - The method is run by the underlying "threading.Thread" object. - """ - try: - self.listen() - except socket.error: - self.close() - - def listen(self): - """ - Runs a listener loop until self.subscribed is set to False. - Calls the given callback method with data and the object itself. - If event matches the given one, then matching data is retrieved. - Otherwise, the event itself is sent to the callback. - In that case 'change' key contains the thing that was changed. - """ - self.subscribed = True - while self.subscribed: - event = self.event_socket.receive() - if not event: # skip an iteration if event is None - continue - if not self.event or ('change' in event and event['change'] == self.event): - msg_type = self.type_translation[self.event_type] - data = self.data_socket.get(msg_type) - else: - data = None - self.callback(event, data, self) - self.close() - - def close(self): - """ - Ends subscription loop by setting self.subscribed to False and - closing both sockets. - """ - self.subscribed = False - self.event_socket.close() - if self.data_socket is not default_socket(): - self.data_socket.close() - - -def __call_cmd__(cmd): - """ - Returns output (stdout or stderr) of the given command args. - """ - try: - output = subprocess.check_output(cmd) - except subprocess.CalledProcessError as error: - output = error.output - output = output.decode('utf-8') # byte string decoding - return output.strip() - - -__socket__ = None -def default_socket(socket=None): - """ - Returns i3.Socket object, which was initiliazed once with default values - if no argument is given. - Otherwise sets the default socket to the given socket. - """ - global __socket__ - if socket and isinstance(socket, Socket): - __socket__ = socket - elif not __socket__: - __socket__ = Socket() - return __socket__ - - -def msg(type, message=''): - """ - Takes a message type and a message itself. - Talks to the i3 via socket and returns the response from the socket. - """ - response = default_socket().get(type, message) - return response - - -def __function__(type, message='', *args, **crit): - """ - Accepts a message type, a message. Takes optional args and keyword - args which are present in all future calls of the resulting function. - Returns a function, which takes arguments and container criteria. - If message type was 'command', the function returns success value. - """ - def function(*args2, **crit2): - msg_full = ' '.join([message] + list(args) + list(args2)) - criteria = dict(crit) - criteria.update(crit2) - if criteria: - msg_full = '%s %s' % (container(**criteria), msg_full) - response = msg(type, msg_full) - response = success(response) - if isinstance(response, i3Exception): - raise response - return response - function.__name__ = type - function.__doc__ = 'Message sender (type: %s, message: %s)' % (type, message) - return function - - -def subscribe(event_type, event=None, callback=None): - """ - Accepts an event_type and event itself. - Creates a new subscription, prints data on every event until - KeyboardInterrupt is raised. - """ - if not callback: - def callback(event, data, subscription): - print('changed:', event['change']) - if data: - print('data:\n', data) - - socket = default_socket() - subscription = Subscription(callback, event_type, event, data_socket=socket) - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - print('') # force newline - finally: - subscription.close() - - -def get_socket_path(): - """ - Gets the socket path via i3 command. - """ - cmd = ['i3', '--get-socketpath'] - output = __call_cmd__(cmd) - return output - - -def success(response): - """ - Convenience method for filtering success values of a response. - Each success dictionary is replaces with boolean value. - i3.MessageError is returned if error key is found in any of the - success dictionaries. - """ - if isinstance(response, dict) and 'success' in response: - if 'error' in response: - return MessageError(response['error']) - return response['success'] - elif isinstance(response, list): - for index, item in enumerate(response): - item = success(item) - if isinstance(item, i3Exception): - return item - response[index] = item - return response - - -def container(**criteria): - """ - Turns keyword arguments into a formatted container criteria. - """ - if 'klass' in criteria: - criteria['class'] = criteria['klass'] - del criteria['klass'] - - criteria = ['%s="%s"' % (key, val) for key, val in criteria.items()] - return '[%s]' % ' '.join(criteria) - - -def parent(con_id, tree=None): - """ - Searches for a parent of a node/container, given the container id. - Returns None if no container with given id exists (or if the - container is already a root node). - """ - def has_child(node): - for child in node['nodes']: - if child['id'] == con_id: - return True - return False - parents = filter(tree, has_child) - if not parents or len(parents) > 1: - return None - return parents[0] - - -def filter(tree=None, function=None, **conditions): - """ - Filters a tree based on given conditions. For example, to get a list of - unfocused windows (leaf nodes) in the current tree: - i3.filter(nodes=[], focused=False) - The return value is always a list of matched items, even if there's - only one item that matches. - The user function should take a single node. The function doesn't have - to do any dict key or index checking (this is handled by i3.filter - internally). - """ - if tree is None: - tree = msg('get_tree') - elif isinstance(tree, list): - tree = {'list': tree} - if function: - try: - if function(tree): - return [tree] - except (KeyError, IndexError): - pass - else: - for key, value in conditions.items(): - if key not in tree or tree[key] != value: - break - else: - return [tree] - matches = [] - for nodes in ['nodes', 'floating_nodes', 'list']: - if nodes in tree: - for node in tree[nodes]: - matches += filter(node, function, **conditions) - return matches - - -class i3(ModuleType): - """ - i3.py is a Python module for communicating with the i3 window manager. - """ - def __init__(self, module): - self.__module__ = module - self.__name__ = module.__name__ - - def __getattr__(self, name): - """ - Turns a nonexistent attribute into a function. - Returns the resulting function. - """ - try: - return getattr(self.__module__, name) - except AttributeError: - pass - if name.lower() in self.__module__.MSG_TYPES: - return self.__module__.__function__(type=name) - else: - return self.__module__.__function__(type='command', message=name) - - -# Turn the module into an i3 object -sys.modules[__name__] = i3(sys.modules[__name__]) diff --git a/.i3/scripts/libs/i3.py b/.i3/scripts/libs/i3.py new file mode 100644 index 0000000..343c709 --- /dev/null +++ b/.i3/scripts/libs/i3.py @@ -0,0 +1,559 @@ +#====================================================================== +# i3 (Python module for communicating with i3 window manager) +# Copyright (C) 2012 Jure Ziberna +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#====================================================================== + + +import sys +import subprocess +import json +import socket +import struct +import threading +import time + +ModuleType = type(sys) + + +__author__ = 'Jure Ziberna' +__version__ = '0.6.5' +__date__ = '2012-06-20' +__license__ = 'GNU GPL 3' + + +MSG_TYPES = [ + 'command', + 'get_workspaces', + 'subscribe', + 'get_outputs', + 'get_tree', + 'get_marks', + 'get_bar_config', +] + +EVENT_TYPES = [ + 'workspace', + 'output', +] + + +class i3Exception(Exception): + pass + +class MessageTypeError(i3Exception): + """ + Raised when message type isn't available. See i3.MSG_TYPES. + """ + def __init__(self, type): + msg = "Message type '%s' isn't available" % type + super(MessageTypeError, self).__init__(msg) + +class EventTypeError(i3Exception): + """ + Raised when even type isn't available. See i3.EVENT_TYPES. + """ + def __init__(self, type): + msg = "Event type '%s' isn't available" % type + super(EventTypeError, self).__init__(msg) + +class MessageError(i3Exception): + """ + Raised when a message to i3 is unsuccessful. + That is, when it contains 'success': false in its JSON formatted response. + """ + pass + +class ConnectionError(i3Exception): + """ + Raised when a socket couldn't connect to the window manager. + """ + def __init__(self, socket_path): + msg = "Could not connect to socket at '%s'" % socket_path + super(ConnectionError, self).__init__(msg) + + +def parse_msg_type(msg_type): + """ + Returns an i3-ipc code of the message type. Raises an exception if + the given message type isn't available. + """ + try: + index = int(msg_type) + except ValueError: + index = -1 + if index >= 0 and index < len(MSG_TYPES): + return index + msg_type = str(msg_type).lower() + if msg_type in MSG_TYPES: + return MSG_TYPES.index(msg_type) + else: + raise MessageTypeError(msg_type) + +def parse_event_type(event_type): + """ + Returns an i3-ipc string of the event_type. Raises an exception if + the given event type isn't available. + """ + try: + index = int(event_type) + except ValueError: + index = -1 + if index >= 0 and index < len(EVENT_TYPES): + return EVENT_TYPES[index] + event_type = str(event_type).lower() + if event_type in EVENT_TYPES: + return event_type + else: + raise EventTypeError(event_type) + + +class Socket(object): + """ + Socket for communicating with the i3 window manager. + Optional arguments: + - path of the i3 socket. Path is retrieved from i3-wm itself via + "i3.get_socket_path()" if not provided. + - timeout in seconds + - chunk_size in bytes + - magic_string as a safety string for i3-ipc. Set to 'i3-ipc' by default. + """ + magic_string = 'i3-ipc' # safety string for i3-ipc + chunk_size = 1024 # in bytes + timeout = 0.5 # in seconds + buffer = b'' # byte string + + def __init__(self, path=None, timeout=None, chunk_size=None, + magic_string=None): + if not path: + path = get_socket_path() + self.path = path + if timeout: + self.timeout = timeout + if chunk_size: + self.chunk_size = chunk_size + if magic_string: + self.magic_string = magic_string + # Socket initialization and connection + self.initialize() + self.connect() + # Struct format initialization, length of magic string is in bytes + self.struct_header = '<%dsII' % len(self.magic_string.encode('utf-8')) + self.struct_header_size = struct.calcsize(self.struct_header) + + def initialize(self): + """ + Initializes the socket. + """ + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.socket.settimeout(self.timeout) + + def connect(self, path=None): + """ + Connects the socket to socket path if not already connected. + """ + if not self.connected: + self.initialize() + if not path: + path = self.path + try: + self.socket.connect(path) + except socket.error: + raise ConnectionError(path) + + def get(self, msg_type, payload=''): + """ + Convenience method, calls "socket.send(msg_type, payload)" and + returns data from "socket.receive()". + """ + self.send(msg_type, payload) + return self.receive() + + def subscribe(self, event_type, event=None): + """ + Subscribes to an event. Returns data on first occurrence. + """ + event_type = parse_event_type(event_type) + # Create JSON payload from given event type and event + payload = [event_type] + if event: + payload.append(event) + payload = json.dumps(payload) + return self.get('subscribe', payload) + + def send(self, msg_type, payload=''): + """ + Sends the given message type with given message by packing them + and continuously sending bytes from the packed message. + """ + message = self.pack(msg_type, payload) + # Continuously send the bytes from the message + self.socket.sendall(message) + + def receive(self): + """ + Tries to receive a data. Unpacks the received byte string if + successful. Returns None on failure. + """ + try: + data = self.socket.recv(self.chunk_size) + msg_magic, msg_length, msg_type = self.unpack_header(data) + msg_size = self.struct_header_size + msg_length + # Keep receiving data until the whole message gets through + while len(data) < msg_size: + data += self.socket.recv(msg_length) + data = self.buffer + data + return self.unpack(data) + except socket.timeout: + return None + + def pack(self, msg_type, payload): + """ + Packs the given message type and payload. Turns the resulting + message into a byte string. + """ + msg_magic = self.magic_string + # Get the byte count instead of number of characters + msg_length = len(payload.encode('utf-8')) + msg_type = parse_msg_type(msg_type) + # "struct.pack" returns byte string, decoding it for concatenation + msg_length = struct.pack('I', msg_length).decode('utf-8') + msg_type = struct.pack('I', msg_type).decode('utf-8') + message = '%s%s%s%s' % (msg_magic, msg_length, msg_type, payload) + # Encoding the message back to byte string + return message.encode('utf-8') + + def unpack(self, data): + """ + Unpacks the given byte string and parses the result from JSON. + Returns None on failure and saves data into "self.buffer". + """ + data_size = len(data) + msg_magic, msg_length, msg_type = self.unpack_header(data) + msg_size = self.struct_header_size + msg_length + # Message shouldn't be any longer than the data + if data_size >= msg_size: + payload = data[self.struct_header_size:msg_size].decode('utf-8') + payload = json.loads(payload) + self.buffer = data[msg_size:] + return payload + else: + self.buffer = data + return None + + def unpack_header(self, data): + """ + Unpacks the header of given byte string. + """ + return struct.unpack(self.struct_header, data[:self.struct_header_size]) + + @property + def connected(self): + """ + Returns True if connected and False if not. + """ + try: + self.get('command') + return True + except socket.error: + return False + + def close(self): + """ + Closes the socket connection. + """ + self.socket.close() + + +class Subscription(threading.Thread): + """ + Creates a new subscription and runs a listener loop. Calls the + callback on event. + Example parameters: + callback = lambda event, data, subscription: print(data) + event_type = 'workspace' + event = 'focus' + event_socket = + data_socket = + """ + subscribed = False + type_translation = { + 'workspace': 'get_workspaces', + 'output': 'get_outputs' + } + + def __init__(self, callback, event_type, event=None, event_socket=None, + data_socket=None): + # Variable initialization + if not callable(callback): + raise TypeError('Callback must be callable') + event_type = parse_event_type(event_type) + self.callback = callback + self.event_type = event_type + self.event = event + # Socket initialization + if not event_socket: + event_socket = Socket() + self.event_socket = event_socket + self.event_socket.subscribe(event_type, event) + if not data_socket: + data_socket = Socket() + self.data_socket = data_socket + # Thread initialization + threading.Thread.__init__(self) + self.start() + + def run(self): + """ + Wrapper method for the listen method -- handles exceptions. + The method is run by the underlying "threading.Thread" object. + """ + try: + self.listen() + except socket.error: + self.close() + + def listen(self): + """ + Runs a listener loop until self.subscribed is set to False. + Calls the given callback method with data and the object itself. + If event matches the given one, then matching data is retrieved. + Otherwise, the event itself is sent to the callback. + In that case 'change' key contains the thing that was changed. + """ + self.subscribed = True + while self.subscribed: + event = self.event_socket.receive() + if not event: # skip an iteration if event is None + continue + if not self.event or ('change' in event and event['change'] == self.event): + msg_type = self.type_translation[self.event_type] + data = self.data_socket.get(msg_type) + else: + data = None + self.callback(event, data, self) + self.close() + + def close(self): + """ + Ends subscription loop by setting self.subscribed to False and + closing both sockets. + """ + self.subscribed = False + self.event_socket.close() + if self.data_socket is not default_socket(): + self.data_socket.close() + + +def __call_cmd__(cmd): + """ + Returns output (stdout or stderr) of the given command args. + """ + try: + output = subprocess.check_output(cmd) + except subprocess.CalledProcessError as error: + output = error.output + output = output.decode('utf-8') # byte string decoding + return output.strip() + + +__socket__ = None +def default_socket(socket=None): + """ + Returns i3.Socket object, which was initiliazed once with default values + if no argument is given. + Otherwise sets the default socket to the given socket. + """ + global __socket__ + if socket and isinstance(socket, Socket): + __socket__ = socket + elif not __socket__: + __socket__ = Socket() + return __socket__ + + +def msg(type, message=''): + """ + Takes a message type and a message itself. + Talks to the i3 via socket and returns the response from the socket. + """ + response = default_socket().get(type, message) + return response + + +def __function__(type, message='', *args, **crit): + """ + Accepts a message type, a message. Takes optional args and keyword + args which are present in all future calls of the resulting function. + Returns a function, which takes arguments and container criteria. + If message type was 'command', the function returns success value. + """ + def function(*args2, **crit2): + msg_full = ' '.join([message] + list(args) + list(args2)) + criteria = dict(crit) + criteria.update(crit2) + if criteria: + msg_full = '%s %s' % (container(**criteria), msg_full) + response = msg(type, msg_full) + response = success(response) + if isinstance(response, i3Exception): + raise response + return response + function.__name__ = type + function.__doc__ = 'Message sender (type: %s, message: %s)' % (type, message) + return function + + +def subscribe(event_type, event=None, callback=None): + """ + Accepts an event_type and event itself. + Creates a new subscription, prints data on every event until + KeyboardInterrupt is raised. + """ + if not callback: + def callback(event, data, subscription): + print('changed:', event['change']) + if data: + print('data:\n', data) + + socket = default_socket() + subscription = Subscription(callback, event_type, event, data_socket=socket) + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print('') # force newline + finally: + subscription.close() + + +def get_socket_path(): + """ + Gets the socket path via i3 command. + """ + cmd = ['i3', '--get-socketpath'] + output = __call_cmd__(cmd) + return output + + +def success(response): + """ + Convenience method for filtering success values of a response. + Each success dictionary is replaces with boolean value. + i3.MessageError is returned if error key is found in any of the + success dictionaries. + """ + if isinstance(response, dict) and 'success' in response: + if 'error' in response: + return MessageError(response['error']) + return response['success'] + elif isinstance(response, list): + for index, item in enumerate(response): + item = success(item) + if isinstance(item, i3Exception): + return item + response[index] = item + return response + + +def container(**criteria): + """ + Turns keyword arguments into a formatted container criteria. + """ + if 'klass' in criteria: + criteria['class'] = criteria['klass'] + del criteria['klass'] + + criteria = ['%s="%s"' % (key, val) for key, val in criteria.items()] + return '[%s]' % ' '.join(criteria) + + +def parent(con_id, tree=None): + """ + Searches for a parent of a node/container, given the container id. + Returns None if no container with given id exists (or if the + container is already a root node). + """ + def has_child(node): + for child in node['nodes']: + if child['id'] == con_id: + return True + return False + parents = filter(tree, has_child) + if not parents or len(parents) > 1: + return None + return parents[0] + + +def filter(tree=None, function=None, **conditions): + """ + Filters a tree based on given conditions. For example, to get a list of + unfocused windows (leaf nodes) in the current tree: + i3.filter(nodes=[], focused=False) + The return value is always a list of matched items, even if there's + only one item that matches. + The user function should take a single node. The function doesn't have + to do any dict key or index checking (this is handled by i3.filter + internally). + """ + if tree is None: + tree = msg('get_tree') + elif isinstance(tree, list): + tree = {'list': tree} + if function: + try: + if function(tree): + return [tree] + except (KeyError, IndexError): + pass + else: + for key, value in conditions.items(): + if key not in tree or tree[key] != value: + break + else: + return [tree] + matches = [] + for nodes in ['nodes', 'floating_nodes', 'list']: + if nodes in tree: + for node in tree[nodes]: + matches += filter(node, function, **conditions) + return matches + + +class i3(ModuleType): + """ + i3.py is a Python module for communicating with the i3 window manager. + """ + def __init__(self, module): + self.__module__ = module + self.__name__ = module.__name__ + + def __getattr__(self, name): + """ + Turns a nonexistent attribute into a function. + Returns the resulting function. + """ + try: + return getattr(self.__module__, name) + except AttributeError: + pass + if name.lower() in self.__module__.MSG_TYPES: + return self.__module__.__function__(type=name) + else: + return self.__module__.__function__(type='command', message=name) + + +# Turn the module into an i3 object +sys.modules[__name__] = i3(sys.modules[__name__]) diff --git a/.i3/scripts/libs/sh.py b/.i3/scripts/libs/sh.py new file mode 100644 index 0000000..0e46f14 --- /dev/null +++ b/.i3/scripts/libs/sh.py @@ -0,0 +1,1695 @@ +#=============================================================================== +# Copyright (C) 2011-2012 by Andrew Moffat +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +#=============================================================================== + + +__version__ = "1.08" +__project_url__ = "https://github.com/amoffat/sh" + + + +import platform + +if "windows" in platform.system().lower(): + raise ImportError("sh %s is currently only supported on linux and osx. \ +please install pbs 0.110 (http://pypi.python.org/pypi/pbs) for windows \ +support." % __version__) + + + +import sys +IS_PY3 = sys.version_info[0] == 3 + +import traceback +import os +import re +from glob import glob as original_glob +from types import ModuleType +from functools import partial +import inspect +import time as _time + +from locale import getpreferredencoding +DEFAULT_ENCODING = getpreferredencoding() or "utf-8" + + +if IS_PY3: + from io import StringIO + from io import BytesIO as cStringIO + from queue import Queue, Empty +else: + from StringIO import StringIO + from cStringIO import OutputType as cStringIO + from Queue import Queue, Empty + +IS_OSX = platform.system() == "Darwin" +THIS_DIR = os.path.dirname(os.path.realpath(__file__)) + + +import errno +import warnings + +import pty +import termios +import signal +import gc +import select +import atexit +import threading +import tty +import fcntl +import struct +import resource +from collections import deque +import logging +import weakref + + +logging_enabled = False + + +if IS_PY3: + raw_input = input + unicode = str + basestring = str + + + + +class ErrorReturnCode(Exception): + truncate_cap = 750 + + def __init__(self, full_cmd, stdout, stderr): + self.full_cmd = full_cmd + self.stdout = stdout + self.stderr = stderr + + + if self.stdout is None: tstdout = "" + else: + tstdout = self.stdout[:self.truncate_cap] + out_delta = len(self.stdout) - len(tstdout) + if out_delta: + tstdout += ("... (%d more, please see e.stdout)" % out_delta).encode() + + if self.stderr is None: tstderr = "" + else: + tstderr = self.stderr[:self.truncate_cap] + err_delta = len(self.stderr) - len(tstderr) + if err_delta: + tstderr += ("... (%d more, please see e.stderr)" % err_delta).encode() + + msg = "\n\n RAN: %r\n\n STDOUT:\n%s\n\n STDERR:\n%s" %\ + (full_cmd, tstdout.decode(DEFAULT_ENCODING), tstderr.decode(DEFAULT_ENCODING)) + super(ErrorReturnCode, self).__init__(msg) + + +class SignalException(ErrorReturnCode): pass + +SIGNALS_THAT_SHOULD_THROW_EXCEPTION = ( + signal.SIGKILL, + signal.SIGSEGV, + signal.SIGTERM, + signal.SIGINT, + signal.SIGQUIT +) + + +# we subclass AttributeError because: +# https://github.com/ipython/ipython/issues/2577 +# https://github.com/amoffat/sh/issues/97#issuecomment-10610629 +class CommandNotFound(AttributeError): pass + +rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_(\d+)") +rc_exc_cache = {} + +def get_rc_exc(rc): + rc = int(rc) + try: return rc_exc_cache[rc] + except KeyError: pass + + if rc > 0: + name = "ErrorReturnCode_%d" % rc + exc = type(name, (ErrorReturnCode,), {}) + else: + name = "SignalException_%d" % abs(rc) + exc = type(name, (SignalException,), {}) + + rc_exc_cache[rc] = exc + return exc + + + + +def which(program): + def is_exe(fpath): + return os.path.exists(fpath) and os.access(fpath, os.X_OK) + + fpath, fname = os.path.split(program) + if fpath: + if is_exe(program): return program + else: + if "PATH" not in os.environ: return None + for path in os.environ["PATH"].split(os.pathsep): + exe_file = os.path.join(path, program) + if is_exe(exe_file): + return exe_file + + return None + +def resolve_program(program): + path = which(program) + if not path: + # our actual command might have a dash in it, but we can't call + # that from python (we have to use underscores), so we'll check + # if a dash version of our underscore command exists and use that + # if it does + if "_" in program: path = which(program.replace("_", "-")) + if not path: return None + return path + + +# we add this thin wrapper to glob.glob because of a specific edge case where +# glob does not expand to anything. for example, if you try to do +# glob.glob("*.py") and there are no *.py files in the directory, glob.glob +# returns an empty list. this empty list gets passed to the command, and +# then the command fails with a misleading error message. this thin wrapper +# ensures that if there is no expansion, we pass in the original argument, +# so that when the command fails, the error message is clearer +def glob(arg): + return original_glob(arg) or arg + + + +class Logger(object): + def __init__(self, name, context=None): + self.name = name + self.context = "%s" + if context: self.context = "%s: %%s" % context + self.log = logging.getLogger(name) + + def info(self, msg, *args): + if not logging_enabled: return + self.log.info(self.context, msg % args) + + def debug(self, msg, *args): + if not logging_enabled: return + self.log.debug(self.context, msg % args) + + def error(self, msg, *args): + if not logging_enabled: return + self.log.error(self.context, msg % args) + + def exception(self, msg, *args): + if not logging_enabled: return + self.log.exception(self.context, msg % args) + + + +class RunningCommand(object): + def __init__(self, cmd, call_args, stdin, stdout, stderr): + truncate = 20 + if len(cmd) > truncate: + logger_str = "command %r...(%d more) call_args %r" % \ + (cmd[:truncate], len(cmd) - truncate, call_args) + else: + logger_str = "command %r call_args %r" % (cmd, call_args) + + self.log = Logger("command", logger_str) + self.call_args = call_args + self.cmd = cmd + self.ran = " ".join(cmd) + self.process = None + + # this flag is for whether or not we've handled the exit code (like + # by raising an exception). this is necessary because .wait() is called + # from multiple places, and wait() triggers the exit code to be + # processed. but we don't want to raise multiple exceptions, only + # one (if any at all) + self._handled_exit_code = False + + self.should_wait = True + spawn_process = True + + + # with contexts shouldn't run at all yet, they prepend + # to every command in the context + if call_args["with"]: + spawn_process = False + Command._prepend_stack.append(self) + + + if callable(call_args["out"]) or callable(call_args["err"]): + self.should_wait = False + + if call_args["piped"] or call_args["iter"] or call_args["iter_noblock"]: + self.should_wait = False + + # we're running in the background, return self and let us lazily + # evaluate + if call_args["bg"]: self.should_wait = False + + # redirection + if call_args["err_to_out"]: stderr = STDOUT + + + # set up which stream should write to the pipe + # TODO, make pipe None by default and limit the size of the Queue + # in oproc.OProc + pipe = STDOUT + if call_args["iter"] == "out" or call_args["iter"] is True: pipe = STDOUT + elif call_args["iter"] == "err": pipe = STDERR + + if call_args["iter_noblock"] == "out" or call_args["iter_noblock"] is True: pipe = STDOUT + elif call_args["iter_noblock"] == "err": pipe = STDERR + + + if spawn_process: + self.log.debug("starting process") + self.process = OProc(cmd, stdin, stdout, stderr, + self.call_args, pipe=pipe) + + if self.should_wait: + self.wait() + + + def wait(self): + self._handle_exit_code(self.process.wait()) + return self + + # here we determine if we had an exception, or an error code that we weren't + # expecting to see. if we did, we create and raise an exception + def _handle_exit_code(self, code): + if self._handled_exit_code: return + self._handled_exit_code = True + + if code not in self.call_args["ok_code"] and \ + (code > 0 or -code in SIGNALS_THAT_SHOULD_THROW_EXCEPTION): + raise get_rc_exc(code)( + " ".join(self.cmd), + self.process.stdout, + self.process.stderr + ) + + + + @property + def stdout(self): + self.wait() + return self.process.stdout + + @property + def stderr(self): + self.wait() + return self.process.stderr + + @property + def exit_code(self): + self.wait() + return self.process.exit_code + + @property + def pid(self): + return self.process.pid + + def __len__(self): + return len(str(self)) + + def __enter__(self): + # we don't actually do anything here because anything that should + # have been done would have been done in the Command.__call__ call. + # essentially all that has to happen is the comand be pushed on + # the prepend stack. + pass + + def __iter__(self): + return self + + def next(self): + # we do this because if get blocks, we can't catch a KeyboardInterrupt + # so the slight timeout allows for that. + while True: + try: chunk = self.process._pipe_queue.get(False, .001) + except Empty: + if self.call_args["iter_noblock"]: return errno.EWOULDBLOCK + else: + if chunk is None: + self.wait() + raise StopIteration() + try: return chunk.decode(self.call_args["encoding"], + self.call_args["decode_errors"]) + except UnicodeDecodeError: return chunk + + # python 3 + __next__ = next + + def __exit__(self, typ, value, traceback): + if self.call_args["with"] and Command._prepend_stack: + Command._prepend_stack.pop() + + def __str__(self): + if IS_PY3: return self.__unicode__() + else: return unicode(self).encode(self.call_args["encoding"]) + + def __unicode__(self): + if self.process and self.stdout: + return self.stdout.decode(self.call_args["encoding"], + self.call_args["decode_errors"]) + return "" + + def __eq__(self, other): + return unicode(self) == unicode(other) + + def __contains__(self, item): + return item in str(self) + + def __getattr__(self, p): + # let these three attributes pass through to the OProc object + if p in ("signal", "terminate", "kill"): + if self.process: return getattr(self.process, p) + else: raise AttributeError + return getattr(unicode(self), p) + + def __repr__(self): + try: return str(self) + except UnicodeDecodeError: + if self.process: + if self.stdout: return repr(self.stdout) + return repr("") + + def __long__(self): + return long(str(self).strip()) + + def __float__(self): + return float(str(self).strip()) + + def __int__(self): + return int(str(self).strip()) + + + + + +class Command(object): + _prepend_stack = [] + + _call_args = { + # currently unsupported + #"fg": False, # run command in foreground + + "bg": False, # run command in background + "with": False, # prepend the command to every command after it + "in": None, + "out": None, # redirect STDOUT + "err": None, # redirect STDERR + "err_to_out": None, # redirect STDERR to STDOUT + + # stdin buffer size + # 1 for line, 0 for unbuffered, any other number for that amount + "in_bufsize": 0, + # stdout buffer size, same values as above + "out_bufsize": 1, + "err_bufsize": 1, + + # this is how big the output buffers will be for stdout and stderr. + # this is essentially how much output they will store from the process. + # we use a deque, so if it overflows past this amount, the first items + # get pushed off as each new item gets added. + # + # NOTICE + # this is not a *BYTE* size, this is a *CHUNK* size...meaning, that if + # you're buffering out/err at 1024 bytes, the internal buffer size will + # be "internal_bufsize" CHUNKS of 1024 bytes + "internal_bufsize": 3 * 1024**2, + + "env": None, + "piped": None, + "iter": None, + "iter_noblock": None, + "ok_code": 0, + "cwd": None, + "long_sep": "=", + + # this is for programs that expect their input to be from a terminal. + # ssh is one of those programs + "tty_in": False, + "tty_out": True, + + "encoding": DEFAULT_ENCODING, + "decode_errors": "strict", + + # how long the process should run before it is auto-killed + "timeout": 0, + + # these control whether or not stdout/err will get aggregated together + # as the process runs. this has memory usage implications, so sometimes + # with long-running processes with a lot of data, it makes sense to + # set these to true + "no_out": False, + "no_err": False, + "no_pipe": False, + + # if any redirection is used for stdout or stderr, internal buffering + # of that data is not stored. this forces it to be stored, as if + # the output is being T'd to both the redirected destination and our + # internal buffers + "tee": None, + } + + # these are arguments that cannot be called together, because they wouldn't + # make any sense + _incompatible_call_args = ( + #("fg", "bg", "Command can't be run in the foreground and background"), + ("err", "err_to_out", "Stderr is already being redirected"), + ("piped", "iter", "You cannot iterate when this command is being piped"), + ) + + + # this method exists because of the need to have some way of letting + # manual object instantiation not perform the underscore-to-dash command + # conversion that resolve_program uses. + # + # there are 2 ways to create a Command object. using sh.Command() + # or by using sh.. the method fed into sh.Command must be taken + # literally, and so no underscore-dash conversion is performed. the one + # for sh. must do the underscore-dash converesion, because we + # can't type dashes in method names + @classmethod + def _create(cls, program, **default_kwargs): + path = resolve_program(program) + if not path: raise CommandNotFound(program) + + cmd = cls(path) + if default_kwargs: cmd = cmd.bake(**default_kwargs) + + return cmd + + + def __init__(self, path): + path = which(path) + if not path: raise CommandNotFound(path) + self._path = path + + self._partial = False + self._partial_baked_args = [] + self._partial_call_args = {} + + # bugfix for functools.wraps. issue #121 + self.__name__ = repr(self) + + + def __getattribute__(self, name): + # convenience + getattr = partial(object.__getattribute__, self) + + if name.startswith("_"): return getattr(name) + if name == "bake": return getattr("bake") + if name.endswith("_"): name = name[:-1] + + return getattr("bake")(name) + + + @staticmethod + def _extract_call_args(kwargs, to_override={}): + kwargs = kwargs.copy() + call_args = {} + for parg, default in Command._call_args.items(): + key = "_" + parg + + if key in kwargs: + call_args[parg] = kwargs[key] + del kwargs[key] + elif parg in to_override: + call_args[parg] = to_override[parg] + + # test for incompatible call args + s1 = set(call_args.keys()) + for args in Command._incompatible_call_args: + args = list(args) + error = args.pop() + + if s1.issuperset(args): + raise TypeError("Invalid special arguments %r: %s" % (args, error)) + + return call_args, kwargs + + + # this helper method is for normalizing an argument into a string in the + # system's default encoding. we can feed it a number or a string or + # whatever + def _format_arg(self, arg): + if IS_PY3: arg = str(arg) + else: + # if the argument is already unicode, or a number or whatever, + # this first call will fail. + try: arg = unicode(arg, DEFAULT_ENCODING).encode(DEFAULT_ENCODING) + except TypeError: arg = unicode(arg).encode(DEFAULT_ENCODING) + return arg + + + def _aggregate_keywords(self, keywords, sep, raw=False): + processed = [] + for k, v in keywords.items(): + # we're passing a short arg as a kwarg, example: + # cut(d="\t") + if len(k) == 1: + if v is not False: + processed.append("-" + k) + if v is not True: + processed.append(self._format_arg(v)) + + # we're doing a long arg + else: + if not raw: k = k.replace("_", "-") + + if v is True: + processed.append("--" + k) + elif v is False: + pass + else: + processed.append("--%s%s%s" % (k, sep, self._format_arg(v))) + return processed + + + def _compile_args(self, args, kwargs, sep): + processed_args = [] + + # aggregate positional args + for arg in args: + if isinstance(arg, (list, tuple)): + if not arg: + warnings.warn("Empty list passed as an argument to %r. \ +If you're using glob.glob(), please use sh.glob() instead." % self.path, stacklevel=3) + for sub_arg in arg: processed_args.append(self._format_arg(sub_arg)) + elif isinstance(arg, dict): + processed_args += self._aggregate_keywords(arg, sep, raw=True) + else: + processed_args.append(self._format_arg(arg)) + + # aggregate the keyword arguments + processed_args += self._aggregate_keywords(kwargs, sep) + + return processed_args + + + # TODO needs documentation + def bake(self, *args, **kwargs): + fn = Command(self._path) + fn._partial = True + + call_args, kwargs = self._extract_call_args(kwargs) + + pruned_call_args = call_args + for k,v in Command._call_args.items(): + try: + if pruned_call_args[k] == v: + del pruned_call_args[k] + except KeyError: continue + + fn._partial_call_args.update(self._partial_call_args) + fn._partial_call_args.update(pruned_call_args) + fn._partial_baked_args.extend(self._partial_baked_args) + sep = pruned_call_args.get("long_sep", self._call_args["long_sep"]) + fn._partial_baked_args.extend(self._compile_args(args, kwargs, sep)) + return fn + + def __str__(self): + if IS_PY3: return self.__unicode__() + else: return unicode(self).encode(DEFAULT_ENCODING) + + def __eq__(self, other): + try: return str(self) == str(other) + except: return False + + def __repr__(self): + return "" % str(self) + + def __unicode__(self): + baked_args = " ".join(self._partial_baked_args) + if baked_args: baked_args = " " + baked_args + return self._path + baked_args + + def __enter__(self): + self(_with=True) + + def __exit__(self, typ, value, traceback): + Command._prepend_stack.pop() + + + def __call__(self, *args, **kwargs): + kwargs = kwargs.copy() + args = list(args) + + cmd = [] + + # aggregate any 'with' contexts + call_args = Command._call_args.copy() + for prepend in self._prepend_stack: + # don't pass the 'with' call arg + pcall_args = prepend.call_args.copy() + try: del pcall_args["with"] + except: pass + + call_args.update(pcall_args) + cmd.extend(prepend.cmd) + + cmd.append(self._path) + + # here we extract the special kwargs and override any + # special kwargs from the possibly baked command + tmp_call_args, kwargs = self._extract_call_args(kwargs, self._partial_call_args) + call_args.update(tmp_call_args) + + if not isinstance(call_args["ok_code"], (tuple, list)): + call_args["ok_code"] = [call_args["ok_code"]] + + + # check if we're piping via composition + stdin = call_args["in"] + if args: + first_arg = args.pop(0) + if isinstance(first_arg, RunningCommand): + # it makes sense that if the input pipe of a command is running + # in the background, then this command should run in the + # background as well + if first_arg.call_args["bg"]: call_args["bg"] = True + stdin = first_arg.process._pipe_queue + + else: + args.insert(0, first_arg) + + processed_args = self._compile_args(args, kwargs, call_args["long_sep"]) + + # makes sure our arguments are broken up correctly + split_args = self._partial_baked_args + processed_args + + final_args = split_args + + cmd.extend(final_args) + + + # stdout redirection + stdout = call_args["out"] + if stdout \ + and not callable(stdout) \ + and not hasattr(stdout, "write") \ + and not isinstance(stdout, (cStringIO, StringIO)): + + stdout = open(str(stdout), "wb") + + + # stderr redirection + stderr = call_args["err"] + if stderr and not callable(stderr) and not hasattr(stderr, "write") \ + and not isinstance(stderr, (cStringIO, StringIO)): + stderr = open(str(stderr), "wb") + + + return RunningCommand(cmd, call_args, stdin, stdout, stderr) + + + + +# used in redirecting +STDOUT = -1 +STDERR = -2 + + + +# Process open = Popen +# Open Process = OProc +class OProc(object): + _procs_to_cleanup = set() + _registered_cleanup = False + _default_window_size = (24, 80) + + def __init__(self, cmd, stdin, stdout, stderr, call_args, + persist=False, pipe=STDOUT): + + self.call_args = call_args + + self._single_tty = self.call_args["tty_in"] and self.call_args["tty_out"] + + # this logic is a little convoluted, but basically this top-level + # if/else is for consolidating input and output TTYs into a single + # TTY. this is the only way some secure programs like ssh will + # output correctly (is if stdout and stdin are both the same TTY) + if self._single_tty: + self._stdin_fd, self._slave_stdin_fd = pty.openpty() + + self._stdout_fd = self._stdin_fd + self._slave_stdout_fd = self._slave_stdin_fd + + self._stderr_fd = self._stdin_fd + self._slave_stderr_fd = self._slave_stdin_fd + + # do not consolidate stdin and stdout + else: + if self.call_args["tty_in"]: + self._slave_stdin_fd, self._stdin_fd = pty.openpty() + else: + self._slave_stdin_fd, self._stdin_fd = os.pipe() + + # tty_out is usually the default + if self.call_args["tty_out"]: + self._stdout_fd, self._slave_stdout_fd = pty.openpty() + else: + self._stdout_fd, self._slave_stdout_fd = os.pipe() + + # unless STDERR is going to STDOUT, it ALWAYS needs to be a pipe, + # and never a PTY. the reason for this is not totally clear to me, + # but it has to do with the fact that if STDERR isn't set as the + # CTTY (because STDOUT is), the STDERR buffer won't always flush + # by the time the process exits, and the data will be lost. + # i've only seen this on OSX. + if stderr is not STDOU