# Copyright (c) 2008, Aldo Cortesi. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import division
try:
import tracemalloc
except ImportError:
tracemalloc = None
from libqtile.dgroups import DGroups
from xcffib.xproto import EventMask, WindowError, AccessError, DrawableError
import io
import logging
import os
import pickle
import shlex
import signal
import sys
import traceback
import xcffib
import xcffib.xinerama
import xcffib.xproto
import six
import warnings
from . import asyncio
from .config import Drag, Click, Screen, Match, Rule
from .group import _Group
from .log_utils import logger
from .state import QtileState
from .utils import QtileError, get_cache_dir
from .widget.base import _Widget
from . import command
from . import hook
from . import utils
from . import window
from . import xcbq
if sys.version_info >= (3, 3):
def _import_module(module_name, dir_path):
import importlib
file_name = os.path.join(dir_path, module_name) + '.py'
f = importlib.machinery.SourceFileLoader(module_name, file_name)
module = f.load_module()
return module
else:
def _import_module(module_name, dir_path):
import imp
fp = None
try:
fp, pathname, description = imp.find_module(module_name, [dir_path])
module = imp.load_module(module_name, fp, pathname, description)
finally:
if fp:
fp.close()
return module
[docs]class Qtile(command.CommandObject):
"""This object is the `root` of the command graph"""
def __init__(self, config, displayName=None, fname=None, no_spawn=False, state=None):
self.no_spawn = no_spawn
self._eventloop = None
self._finalize = False
if not displayName:
displayName = os.environ.get("DISPLAY")
if not displayName:
raise QtileError("No DISPLAY set.")
if not fname:
# Dots might appear in the host part of the display name
# during remote X sessions. Let's strip the host part first.
displayNum = displayName.partition(":")[2]
if "." not in displayNum:
displayName += ".0"
fname = command.find_sockfile(displayName)
self.conn = xcbq.Connection(displayName)
self.config = config
self.fname = fname
hook.init(self)
self.windowMap = {}
self.widgetMap = {}
self.groupMap = {}
self.groups = []
self.keyMap = {}
# Find the modifier mask for the numlock key, if there is one:
nc = self.conn.keysym_to_keycode(xcbq.keysyms["Num_Lock"])
self.numlockMask = xcbq.ModMasks.get(self.conn.get_modifier(nc), 0)
self.validMask = ~(self.numlockMask | xcbq.ModMasks["lock"])
# Because we only do Xinerama multi-screening,
# we can assume that the first
# screen's root is _the_ root.
self.root = self.conn.default_screen.root
self.root.set_attribute(
eventmask=(
EventMask.StructureNotify |
EventMask.SubstructureNotify |
EventMask.SubstructureRedirect |
EventMask.EnterWindow |
EventMask.LeaveWindow
)
)
self.root.set_property(
'_NET_SUPPORTED',
[self.conn.atoms[x] for x in xcbq.SUPPORTED_ATOMS]
)
self.supporting_wm_check_window = self.conn.create_window(-1, -1, 1, 1)
self.root.set_property(
'_NET_SUPPORTING_WM_CHECK',
self.supporting_wm_check_window.wid
)
# setup the default cursor
self.root.set_cursor('left_ptr')
wmname = getattr(self.config, "wmname", "qtile")
self.supporting_wm_check_window.set_property('_NET_WM_NAME', wmname)
self.supporting_wm_check_window.set_property(
'_NET_SUPPORTING_WM_CHECK',
self.supporting_wm_check_window.wid
)
if config.main:
config.main(self)
self.dgroups = None
if self.config.groups:
key_binder = None
if hasattr(self.config, 'dgroups_key_binder'):
key_binder = self.config.dgroups_key_binder
self.dgroups = DGroups(self, self.config.groups, key_binder)
if hasattr(config, "widget_defaults") and config.widget_defaults:
_Widget.global_defaults = config.widget_defaults
else:
_Widget.global_defaults = {}
for i in self.groups:
self.groupMap[i.name] = i
self.setup_eventloop()
self.server = command._Server(self.fname, self, config, self._eventloop)
self.currentScreen = None
self.screens = []
self._process_screens()
self.currentScreen = self.screens[0]
self._drag = None
self.ignoreEvents = set([
xcffib.xproto.KeyReleaseEvent,
xcffib.xproto.ReparentNotifyEvent,
xcffib.xproto.CreateNotifyEvent,
# DWM handles this to help "broken focusing windows".
xcffib.xproto.MapNotifyEvent,
xcffib.xproto.LeaveNotifyEvent,
xcffib.xproto.FocusOutEvent,
xcffib.xproto.FocusInEvent,
xcffib.xproto.NoExposureEvent
])
self.conn.flush()
self.conn.xsync()
self._xpoll()
# Map and Grab keys
for key in self.config.keys:
self.mapKey(key)
# It fixes problems with focus when clicking windows of some specific clients like xterm
def noop(qtile):
pass
self.config.mouse += (Click([], "Button1", command.lazy.function(noop), focus="after"),)
self.mouseMap = {}
for i in self.config.mouse:
if self.mouseMap.get(i.button_code) is None:
self.mouseMap[i.button_code] = []
self.mouseMap[i.button_code].append(i)
self.grabMouse()
# no_spawn is set when we are restarting; we only want to run the
# startup hook once.
if not no_spawn:
hook.fire("startup_once")
hook.fire("startup")
if state:
st = pickle.load(io.BytesIO(state.encode()))
try:
st.apply(self)
except:
logger.exception("failed restoring state")
self.scan()
self.update_net_desktops()
hook.subscribe.setgroup(self.update_net_desktops)
self.selection = {
"PRIMARY": {"owner": None, "selection": ""},
"CLIPBOARD": {"owner": None, "selection": ""}
}
self.setup_selection()
hook.fire("startup_complete")
def setup_selection(self):
PRIMARY = self.conn.atoms["PRIMARY"]
CLIPBOARD = self.conn.atoms["CLIPBOARD"]
self.selection_window = self.conn.create_window(-1, -1, 1, 1)
self.selection_window.set_attribute(eventmask=EventMask.PropertyChange)
self.conn.xfixes.select_selection_input(self.selection_window,
"PRIMARY")
self.conn.xfixes.select_selection_input(self.selection_window,
"CLIPBOARD")
r = self.conn.conn.core.GetSelectionOwner(PRIMARY).reply()
self.selection["PRIMARY"]["owner"] = r.owner
r = self.conn.conn.core.GetSelectionOwner(CLIPBOARD).reply()
self.selection["CLIPBOARD"]["owner"] = r.owner
# ask for selection on starup
self.convert_selection(PRIMARY)
self.convert_selection(CLIPBOARD)
def setup_eventloop(self):
self._eventloop = asyncio.new_event_loop()
self._eventloop.add_signal_handler(signal.SIGINT, self.stop)
self._eventloop.add_signal_handler(signal.SIGTERM, self.stop)
self._eventloop.set_exception_handler(
lambda x, y: logger.exception("Got an exception in poll loop")
)
logger.info('Adding io watch')
fd = self.conn.conn.get_file_descriptor()
self._eventloop.add_reader(fd, self._xpoll)
self.setup_python_dbus()
def setup_python_dbus(self):
# This is a little strange. python-dbus internally depends on gobject,
# so gobject's threads need to be running, and a gobject "main loop
# thread" needs to be spawned, but we try to let it only interact with
# us via calls to asyncio's call_soon_threadsafe.
try:
# We import dbus here to thrown an ImportError if it isn't
# available. Since the only reason we're running this thread is
# because of dbus, if dbus isn't around there's no need to run
# this thread.
import dbus # noqa
from gi.repository import GLib
def gobject_thread():
ctx = GLib.main_context_default()
while not self._finalize:
try:
ctx.iteration(True)
except Exception:
logger.exception("got exception from gobject")
self._glib_loop = self.run_in_executor(gobject_thread)
except ImportError:
logger.warning("importing dbus/gobject failed, dbus will not work.")
self._glib_loop = None
def finalize(self):
self._finalize = True
self._eventloop.remove_signal_handler(signal.SIGINT)
self._eventloop.remove_signal_handler(signal.SIGTERM)
self._eventloop.set_exception_handler(None)
if self._glib_loop:
try:
from gi.repository import GLib
GLib.idle_add(lambda: None)
self._eventloop.run_until_complete(self._glib_loop)
except ImportError:
pass
try:
for w in self.widgetMap.values():
w.finalize()
for l in self.config.layouts:
l.finalize()
for screen in self.screens:
for bar in [screen.top, screen.bottom, screen.left, screen.right]:
if bar is not None:
bar.finalize()
logger.info('Removing io watch')
fd = self.conn.conn.get_file_descriptor()
self._eventloop.remove_reader(fd)
self.conn.finalize()
self.server.close()
except:
logger.exception('exception during finalize')
finally:
self._eventloop.close()
self._eventloop = None
def _process_fake_screens(self):
"""
Since Xephyr and Xnest don't really support offset screens, we'll fake
it here for testing, (or if you want to partition a physical monitor
into separate screens)
"""
for i, s in enumerate(self.config.fake_screens):
# should have x,y, width and height set
s._configure(self, i, s.x, s.y, s.width, s.height, self.groups[i])
if not self.currentScreen:
self.currentScreen = s
self.screens.append(s)
def _process_screens(self):
if hasattr(self.config, 'fake_screens'):
self._process_fake_screens()
return
# What's going on here is a little funny. What we really want is only
# screens that don't overlap here; overlapping screens should see the
# same parts of the root window (i.e. for people doing xrandr
# --same-as). However, the order that X gives us pseudo screens in is
# important, because it indicates what people have chosen via xrandr
# --primary or whatever. So we need to alias screens that should be
# aliased, but preserve order as well. See #383.
xywh = {}
screenpos = []
for s in self.conn.pseudoscreens:
pos = (s.x, s.y)
(w, h) = xywh.get(pos, (0, 0))
if pos not in xywh:
screenpos.append(pos)
xywh[pos] = (max(w, s.width), max(h, s.height))
for i, (x, y) in enumerate(screenpos):
(w, h) = xywh[(x, y)]
if i + 1 > len(self.config.screens):
scr = Screen()
else:
scr = self.config.screens[i]
if not self.currentScreen:
self.currentScreen = scr
scr._configure(
self,
i,
x,
y,
w,
h,
self.groups[i],
)
self.screens.append(scr)
if not self.screens:
if self.config.screens:
s = self.config.screens[0]
else:
s = Screen()
self.currentScreen = s
s._configure(
self,
0, 0, 0,
self.conn.default_screen.width_in_pixels,
self.conn.default_screen.height_in_pixels,
self.groups[0],
)
self.screens.append(s)
def mapKey(self, key):
self.keyMap[(key.keysym, key.modmask & self.validMask)] = key
code = self.conn.keysym_to_keycode(key.keysym)
self.root.grab_key(
code,
key.modmask,
True,
xcffib.xproto.GrabMode.Async,
xcffib.xproto.GrabMode.Async,
)
if self.numlockMask:
self.root.grab_key(
code,
key.modmask | self.numlockMask,
True,
xcffib.xproto.GrabMode.Async,
xcffib.xproto.GrabMode.Async,
)
self.root.grab_key(
code,
key.modmask | self.numlockMask | xcbq.ModMasks["lock"],
True,
xcffib.xproto.GrabMode.Async,
xcffib.xproto.GrabMode.Async,
)
def unmapKey(self, key):
key_index = (key.keysym, key.modmask & self.validMask)
if key_index not in self.keyMap:
return
code = self.conn.keysym_to_keycode(key.keysym)
self.root.ungrab_key(code, key.modmask)
if self.numlockMask:
self.root.ungrab_key(code, key.modmask | self.numlockMask)
self.root.ungrab_key(
code,
key.modmask | self.numlockMask | xcbq.ModMasks["lock"]
)
del(self.keyMap[key_index])
def update_net_desktops(self):
try:
index = self.groups.index(self.currentGroup)
# TODO: we should really only except ValueError here, AttributeError is
# an annoying chicken and egg because we're accessing currentScreen
# (via currentGroup), and when we set up the initial groups, there
# aren't any screens yet. This can probably be changed when #475 is
# fixed.
except (ValueError, AttributeError):
index = 0
self.root.set_property("_NET_NUMBER_OF_DESKTOPS", len(self.groups))
self.root.set_property(
"_NET_DESKTOP_NAMES", "\0".join([i.name for i in self.groups])
)
self.root.set_property("_NET_CURRENT_DESKTOP", index)
def addGroup(self, name, layout=None, layouts=None):
if name not in self.groupMap.keys():
g = _Group(name, layout)
self.groups.append(g)
if not layouts:
layouts = self.config.layouts
g._configure(layouts, self.config.floating_layout, self)
self.groupMap[name] = g
hook.fire("addgroup", self, name)
hook.fire("changegroup")
self.update_net_desktops()
return True
return False
def delGroup(self, name):
# one group per screen is needed
if len(self.groups) == len(self.screens):
raise ValueError("Can't delete all groups.")
if name in self.groupMap.keys():
group = self.groupMap[name]
if group.screen and group.screen.previous_group:
target = group.screen.previous_group
else:
target = group.prevGroup()
# Find a group that's not currently on a screen to bring to the
# front. This will terminate because of our check above.
while target.screen:
target = target.prevGroup()
for i in list(group.windows):
i.togroup(target.name)
if self.currentGroup.name == name:
self.currentScreen.setGroup(target, save_prev=False)
self.groups.remove(group)
del(self.groupMap[name])
hook.fire("delgroup", self, name)
hook.fire("changegroup")
self.update_net_desktops()
def registerWidget(self, w):
"""Register a bar widget
If a widget with the same name already exists, this will silently
ignore that widget. However, this is not necessarily a bug. By default
a widget's name is just ``self.__class__.lower()``, so putting multiple
widgets of the same class will alias and one will be inaccessible.
Since more than one groupbox widget is useful when you have more than
one screen, this is a not uncommon occurrence. If you want to use the
debug info for widgets with the same name, set the name yourself.
"""
if w.name:
if w.name in self.widgetMap:
return
self.widgetMap[w.name] = w
@utils.lru_cache()
def colorPixel(self, name):
return self.conn.screens[0].default_colormap.alloc_color(name).pixel
@property
def currentLayout(self):
return self.currentGroup.layout
@property
def currentGroup(self):
return self.currentScreen.group
@property
def currentWindow(self):
return self.currentScreen.group.currentWindow
def scan(self):
_, _, children = self.root.query_tree()
for item in children:
try:
attrs = item.get_attributes()
state = item.get_wm_state()
except (xcffib.xproto.WindowError, xcffib.xproto.AccessError):
continue
if attrs and attrs.map_state == xcffib.xproto.MapState.Unmapped:
continue
if state and state[0] == window.WithdrawnState:
continue
self.manage(item)
def unmanage(self, win):
c = self.windowMap.get(win)
if c:
hook.fire("client_killed", c)
self.reset_gaps(c)
if getattr(c, "group", None):
c.group.remove(c)
del self.windowMap[win]
self.update_client_list()
def reset_gaps(self, c):
if c.strut:
self.update_gaps((0, 0, 0, 0), c.strut)
def update_gaps(self, strut, old_strut=None):
from libqtile.bar import Gap
(left, right, top, bottom) = strut[:4]
if old_strut:
(old_left, old_right, old_top, old_bottom) = old_strut[:4]
if not left and old_left:
self.currentScreen.left = None
elif not right and old_right:
self.currentScreen.right = None
elif not top and old_top:
self.currentScreen.top = None
elif not bottom and old_bottom:
self.currentScreen.bottom = None
if top:
self.currentScreen.top = Gap(top)
elif bottom:
self.currentScreen.bottom = Gap(bottom)
elif left:
self.currentScreen.left = Gap(left)
elif right:
self.currentScreen.right = Gap(right)
self.currentScreen.resize()
def manage(self, w):
try:
attrs = w.get_attributes()
internal = w.get_property("QTILE_INTERNAL")
except (xcffib.xproto.WindowError, xcffib.xproto.AccessError):
return
if attrs and attrs.override_redirect:
return
if w.wid not in self.windowMap:
if internal:
try:
c = window.Internal(w, self)
except (xcffib.xproto.WindowError, xcffib.xproto.AccessError):
return
self.windowMap[w.wid] = c
else:
try:
c = window.Window(w, self)
except (xcffib.xproto.WindowError, xcffib.xproto.AccessError):
return
if w.get_wm_type() == "dock" or c.strut:
c.static(self.currentScreen.index)
else:
hook.fire("client_new", c)
# Window may be defunct because
# it's been declared static in hook.
if c.defunct:
return
self.windowMap[w.wid] = c
# Window may have been bound to a group in the hook.
if not c.group:
self.currentScreen.group.add(c, focus=c.can_steal_focus())
self.update_client_list()
hook.fire("client_managed", c)
return c
else:
return self.windowMap[w.wid]
def update_client_list(self):
"""Updates the client stack list
This is needed for third party tasklists and drag and drop of tabs in
chrome
"""
windows = [wid for wid, c in self.windowMap.items() if c.group]
self.root.set_property("_NET_CLIENT_LIST", windows)
# TODO: check stack order
self.root.set_property("_NET_CLIENT_LIST_STACKING", windows)
def grabMouse(self):
self.root.ungrab_button(None, None)
for i in self.config.mouse:
if isinstance(i, Click) and i.focus:
# Make a freezing grab on mouse button to gain focus
# Event will propagate to target window
grabmode = xcffib.xproto.GrabMode.Sync
else:
grabmode = xcffib.xproto.GrabMode.Async
eventmask = EventMask.ButtonPress
if isinstance(i, Drag):
eventmask |= EventMask.ButtonRelease
self.root.grab_button(
i.button_code,
i.modmask,
True,
eventmask,
grabmode,
xcffib.xproto.GrabMode.Async,
)
if self.numlockMask:
self.root.grab_button(
i.button_code,
i.modmask | self.numlockMask,
True,
eventmask,
grabmode,
xcffib.xproto.GrabMode.Async,
)
self.root.grab_button(
i.button_code,
i.modmask | self.numlockMask | xcbq.ModMasks["lock"],
True,
eventmask,
grabmode,
xcffib.xproto.GrabMode.Async,
)
def grabKeys(self):
self.root.ungrab_key(None, None)
for key in self.keyMap.values():
self.mapKey(key)
def get_target_chain(self, ename, e):
"""Returns a chain of targets that can handle this event
Finds functions named `handle_X`, either on the window object itself or
on the Qtile instance, where X is the event name (e.g. EnterNotify,
ConfigureNotify, etc).
The event will be passed to each target in turn for handling, until one
of the handlers returns False or None, or the end of the chain is
reached.
"""
chain = []
handler = "handle_%s" % ename
# Certain events expose the affected window id as an "event" attribute.
eventEvents = [
"EnterNotify",
"ButtonPress",
"ButtonRelease",
"KeyPress",
]
if hasattr(e, "window"):
c = self.windowMap.get(e.window)
elif hasattr(e, "drawable"):
c = self.windowMap.get(e.drawable)
elif ename in eventEvents:
c = self.windowMap.get(e.event)
else:
c = None
if c is not None and hasattr(c, handler):
chain.append(getattr(c, handler))
if hasattr(self, handler):
chain.append(getattr(self, handler))
if not chain:
logger.info("Unhandled event: %r" % ename)
return chain
def _xpoll(self):
while True:
try:
e = self.conn.conn.poll_for_event()
if not e:
break
ename = e.__class__.__name__
if ename.endswith("Event"):
ename = ename[:-5]
if e.__class__ not in self.ignoreEvents:
logger.debug(ename)
for h in self.get_target_chain(ename, e):
logger.info("Handling: %s" % ename)
r = h(e)
if not r:
break
# Catch some bad X exceptions. Since X is event based, race
# conditions can occur almost anywhere in the code. For
# example, if a window is created and then immediately
# destroyed (before the event handler is evoked), when the
# event handler tries to examine the window properties, it
# will throw a WindowError exception. We can essentially
# ignore it, since the window is already dead and we've got
# another event in the queue notifying us to clean it up.
except (WindowError, AccessError, DrawableError):
pass
except Exception as e:
error_code = self.conn.conn.has_error()
if error_code:
error_string = xcbq.XCB_CONN_ERRORS[error_code]
logger.exception("Shutting down due to X connection error %s (%s)" %
(error_string, error_code))
self.stop()
return
logger.exception("Got an exception in poll loop")
self.conn.flush()
def stop(self):
logger.info('Stopping eventloop')
self._eventloop.stop()
def loop(self):
self.server.start()
try:
self._eventloop.run_forever()
finally:
self.finalize()
def find_screen(self, x, y):
"""Find a screen based on the x and y offset"""
result = []
for i in self.screens:
if i.x <= x <= i.x + i.width and \
i.y <= y <= i.y + i.height:
result.append(i)
if len(result) == 1:
return result[0]
return None
def find_closest_screen(self, x, y):
"""
If find_screen returns None, then this basically extends a
screen vertically and horizontally and see if x,y lies in the
band.
Only works if it can find a SINGLE closest screen, else we
revert to _find_closest_closest.
Useful when dragging a window out of a screen onto another but
having leftmost corner above viewport.
"""
normal = self.find_screen(x, y)
if normal is not None:
return normal
x_match = []
y_match = []
for i in self.screens:
if i.x <= x <= i.x + i.width:
x_match.append(i)
if i.y <= y <= i.y + i.height:
y_match.append(i)
if len(x_match) == 1:
return x_match[0]
if len(y_match) == 1:
return y_match[0]
return self._find_closest_closest(x, y, x_match + y_match)
def _find_closest_closest(self, x, y, candidate_screens):
"""
if find_closest_screen can't determine one, we've got multiple
screens, so figure out who is closer. We'll calculate using
the square of the distance from the center of a screen.
Note that this could return None if x, y is right/below all
screens (shouldn't happen but we don't do anything about it
here other than returning None)
"""
closest_distance = None
closest_screen = None
if not candidate_screens:
# try all screens
candidate_screens = self.screens
# if left corner is below and right of screen
# it can't really be a candidate
candidate_screens = [
s for s in candidate_screens
if x < s.x + s.width and y < s.y + s.height
]
for s in candidate_screens:
middle_x = s.x + s.width / 2
middle_y = s.y + s.height / 2
distance = (x - middle_x) ** 2 + (y - middle_y) ** 2
if closest_distance is None or distance < closest_distance:
closest_distance = distance
closest_screen = s
return closest_screen
def handle_SelectionNotify(self, e):
if not getattr(e, "owner", None):
return
name = self.conn.atoms.get_name(e.selection)
self.selection[name]["owner"] = e.owner
self.selection[name]["selection"] = ""
self.convert_selection(e.selection)
hook.fire("selection_notify", name, self.selection[name])
def convert_selection(self, selection, _type="UTF8_STRING"):
TYPE = self.conn.atoms[_type]
self.conn.conn.core.ConvertSelection(self.selection_window.wid,
selection,
TYPE, selection,
xcffib.CurrentTime)
def handle_PropertyNotify(self, e):
name = self.conn.atoms.get_name(e.atom)
# it's the selection property
if name in ("PRIMARY", "CLIPBOARD"):
assert e.window == self.selection_window.wid
prop = self.selection_window.get_property(e.atom, "UTF8_STRING")
# If the selection property is None, it is unset, which means the
# clipboard is empty.
value = prop and prop.value.to_utf8() or u""
self.selection[name]["selection"] = value
hook.fire("selection_change", name, self.selection[name])
def handle_EnterNotify(self, e):
if e.event in self.windowMap:
return True
s = self.find_screen(e.root_x, e.root_y)
if s:
self.toScreen(s.index, warp=False)
def handle_ClientMessage(self, event):
atoms = self.conn.atoms
opcode = event.type
data = event.data
# handle change of desktop
if atoms["_NET_CURRENT_DESKTOP"] == opcode:
index = data.data32[0]
try:
self.currentScreen.setGroup(self.groups[index])
except IndexError:
logger.info("Invalid Desktop Index: %s" % index)
def handle_KeyPress(self, e):
keysym = self.conn.code_to_syms[e.detail][0]
state = e.state
if self.numlockMask:
state = e.state | self.numlockMask
k = self.keyMap.get((keysym, state & self.validMask))
if not k:
logger.info("Ignoring unknown keysym: %s" % keysym)
return
for i in k.commands:
if i.check(self):
status, val = self.server.call(
(i.selectors, i.name, i.args, i.kwargs)
)
if status in (command.ERROR, command.EXCEPTION):
logger.error("KB command error %s: %s" % (i.name, val))
else:
return
[docs] def cmd_focus_by_click(self, e):
"""Bring a window to the front
Parameters
==========
e : xcb event
Click event used to determine window to focus
"""
wnd = e.child or e.root
# Additional option for config.py
# Brings clicked window to front
if self.config.bring_front_click:
self.conn.conn.core.ConfigureWindow(
wnd,
xcffib.xproto.ConfigWindow.StackMode,
[xcffib.xproto.StackMode.Above]
)
window = self.windowMap.get(wnd)
if window and not window.window.get_property('QTILE_INTERNAL'):
self.currentGroup.focus(self.windowMap.get(wnd), False)
self.windowMap.get(wnd).focus(False)
self.conn.conn.core.AllowEvents(xcffib.xproto.Allow.ReplayPointer, e.time)
self.conn.conn.flush()
def handle_ButtonPress(self, e):
button_code = e.detail
state = e.state
if self.numlockMask:
state = e.state | self.numlockMask
k = self.mouseMap.get(button_code)
for m in k:
if not m or m.modmask & self.validMask != state & self.validMask:
logger.info("Ignoring unknown button: %s" % button_code)
continue
if isinstance(m, Click):
for i in m.commands:
if i.check(self):
if m.focus == "before":
self.cmd_focus_by_click(e)
status, val = self.server.call(
(i.selectors, i.name, i.args, i.kwargs))
if m.focus == "after":
self.cmd_focus_by_click(e)
if status in (command.ERROR, command.EXCEPTION):
logger.error(
"Mouse command error %s: %s" % (i.name, val)
)
elif isinstance(m, Drag):
x = e.event_x
y = e.event_y
if m.start:
i = m.start
if m.focus == "before":
self.cmd_focus_by_click(e)
status, val = self.server.call(
(i.selectors, i.name, i.args, i.kwargs))
if status in (command.ERROR, command.EXCEPTION):
logger.error(
"Mouse command error %s: %s" % (i.name, val)
)
continue
else:
val = (0, 0)
if m.focus == "after":
self.cmd_focus_by_click(e)
self._drag = (x, y, val[0], val[1], m.commands)
self.root.grab_pointer(
True,
xcbq.ButtonMotionMask |
xcbq.AllButtonsMask |
xcbq.ButtonReleaseMask,
xcffib.xproto.GrabMode.Async,
xcffib.xproto.GrabMode.Async,
)
def handle_ButtonRelease(self, e):
button_code = e.detail
state = e.state & ~xcbq.AllButtonsMask
if self.numlockMask:
state = state | self.numlockMask
k = self.mouseMap.get(button_code)
for m in k:
if not m:
logger.info(
"Ignoring unknown button release: %s" % button_code
)
continue
if isinstance(m, Drag):
self._drag = None
self.root.ungrab_pointer()
def handle_MotionNotify(self, e):
if self._drag is None:
return
ox, oy, rx, ry, cmd = self._drag
dx = e.event_x - ox
dy = e.event_y - oy
if dx or dy:
for i in cmd:
if i.check(self):
status, val = self.server.call((
i.selectors,
i.name,
i.args + (rx + dx, ry + dy, e.event_x, e.event_y),
i.kwargs
))
if status in (command.ERROR, command.EXCEPTION):
logger.error(
"Mouse command error %s: %s" % (i.name, val)
)
def handle_ConfigureNotify(self, e):
"""Handle xrandr events"""
screen = self.currentScreen
if e.window == self.root.wid and \
e.width != screen.width and \
e.height != screen.height:
screen.resize(0, 0, e.width, e.height)
def handle_ConfigureRequest(self, e):
# It's not managed, or not mapped, so we just obey it.
cw = xcffib.xproto.ConfigWindow
args = {}
if e.value_mask & cw.X:
args["x"] = max(e.x, 0)
if e.value_mask & cw.Y:
args["y"] = max(e.y, 0)
if e.value_mask & cw.Height:
args["height"] = max(e.height, 0)
if e.value_mask & cw.Width:
args["width"] = max(e.width, 0)
if e.value_mask & cw.BorderWidth:
args["borderwidth"] = max(e.border_width, 0)
w = xcbq.Window(self.conn, e.window)
w.configure(**args)
def handle_MappingNotify(self, e):
self.conn.refresh_keymap()
if e.request == xcffib.xproto.Mapping.Keyboard:
self.grabKeys()
def handle_MapRequest(self, e):
w = xcbq.Window(self.conn, e.window)
c = self.manage(w)
if c and (not c.group or not c.group.screen):
return
w.map()
def handle_DestroyNotify(self, e):
self.unmanage(e.window)
def handle_UnmapNotify(self, e):
if e.event != self.root.wid:
c = self.windowMap.get(e.window)
if c and getattr(c, "group", None):
try:
c.window.unmap()
c.state = window.WithdrawnState
except xcffib.xproto.WindowError:
# This means that the window has probably been destroyed,
# but we haven't yet seen the DestroyNotify (it is likely
# next in the queue). So, we just let these errors pass
# since the window is dead.
pass
self.unmanage(e.window)
def handle_ScreenChangeNotify(self, e):
hook.fire("screen_change", self, e)
def toScreen(self, n, warp=True):
"""Have Qtile move to screen and put focus there"""
if n >= len(self.screens):
return
old = self.currentScreen
self.currentScreen = self.screens[n]
if old != self.currentScreen:
hook.fire("current_screen_change")
old.group.layoutAll()
self.currentGroup.focus(self.currentWindow, warp)
def moveToGroup(self, group):
"""Create a group if it doesn't exist and move a windows there"""
if self.currentWindow and group:
self.addGroup(group)
self.currentWindow.togroup(group)
def _items(self, name):
if name == "group":
return True, list(self.groupMap.keys())
elif name == "layout":
return True, list(range(len(self.currentGroup.layouts)))
elif name == "widget":
return False, list(self.widgetMap.keys())
elif name == "bar":
return False, [x.position for x in self.currentScreen.gaps]
elif name == "window":
return True, self.listWID()
elif name == "screen":
return True, list(range(len(self.screens)))
def _select(self, name, sel):
if name == "group":
if sel is None:
return self.currentGroup
else:
return self.groupMap.get(sel)
elif name == "layout":
if sel is None:
return self.currentGroup.layout
else:
return utils.lget(self.currentGroup.layouts, sel)
elif name == "widget":
return self.widgetMap.get(sel)
elif name == "bar":
return getattr(self.currentScreen, sel)
elif name == "window":
if sel is None:
return self.currentWindow
else:
return self.clientFromWID(sel)
elif name == "screen":
if sel is None:
return self.currentScreen
else:
return utils.lget(self.screens, sel)
def listWID(self):
return [i.window.wid for i in self.windowMap.values()]
def clientFromWID(self, wid):
for i in self.windowMap.values():
if i.window.wid == wid:
return i
return None
def call_soon(self, func, *args):
""" A wrapper for the event loop's call_soon which also flushes the X
event queue to the server after func is called. """
def f():
func(*args)
self.conn.flush()
return self._eventloop.call_soon(f)
def call_soon_threadsafe(self, func, *args):
""" Another event loop proxy, see `call_soon`. """
def f():
func(*args)
self.conn.flush()
return self._eventloop.call_soon_threadsafe(f)
def call_later(self, delay, func, *args):
""" Another event loop proxy, see `call_soon`. """
def f():
func(*args)
self.conn.flush()
return self._eventloop.call_later(delay, f)
def run_in_executor(self, func, *args):
""" A wrapper for running a function in the event loop's default
executor. """
return self._eventloop.run_in_executor(None, func, *args)
[docs] def cmd_debug(self):
"""Set log level to DEBUG"""
logger.setLevel(logging.DEBUG)
logger.debug('Switching to DEBUG threshold')
[docs] def cmd_info(self):
"""Set log level to INFO"""
logger.setLevel(logging.INFO)
logger.info('Switching to INFO threshold')
[docs] def cmd_warning(self):
"""Set log level to WARNING"""
logger.setLevel(logging.WARNING)
logger.warning('Switching to WARNING threshold')
[docs] def cmd_error(self):
"""Set log level to ERROR"""
logger.setLevel(logging.ERROR)
logger.error('Switching to ERROR threshold')
[docs] def cmd_critical(self):
"""Set log level to CRITICAL"""
logger.setLevel(logging.CRITICAL)
logger.critical('Switching to CRITICAL threshold')
[docs] def cmd_pause(self):
"""Drops into pdb"""
import pdb
pdb.set_trace()
[docs] def cmd_groups(self):
"""Return a dictionary containing information for all groups
Examples
========
groups()
"""
return {i.name: i.info() for i in self.groups}
[docs] def cmd_get_info(self):
"""Prints info for all groups"""
warnings.warn("The `get_info` command is deprecated, use `groups`", DeprecationWarning)
return self.cmd_groups()
[docs] def cmd_display_kb(self, *args):
"""Display table of key bindings"""
class FormatTable(object):
def __init__(self):
self.max_col_size = []
self.rows = []
def add(self, row):
n = len(row) - len(self.max_col_size)
if n > 0:
self.max_col_size += [0] * n
for i, f in enumerate(row):
if len(f) > self.max_col_size[i]:
self.max_col_size[i] = len(f)
self.rows.append(row)
def getformat(self):
return " ".join((["%-{0:d}s".format(max_col_size + 2) for max_col_size in self.max_col_size])) + "\n", len(self.max_col_size)
def expandlist(self, list, n):
if not list:
return ["-" * max_col_size for max_col_size in self.max_col_size]
n -= len(list)
if n > 0:
list += [""] * n
return list
def __str__(self):
format, n = self.getformat()
return "".join([format % tuple(self.expandlist(row, n)) for row in self.rows])
result = FormatTable()
result.add(["KeySym", "Mod", "Command", "Desc"])
result.add([])
rows = []
for (ks, kmm), k in self.keyMap.items():
if not k.commands:
continue
name = ", ".join(xcbq.rkeysyms.get(ks, ("<unknown>", )))
modifiers = ", ".join(utils.translate_modifiers(kmm))
allargs = ", ".join([repr(value) for value in k.commands[0].args] + ["%s = %s" % (keyword, repr(value)) for keyword, value in k.commands[0].kwargs.items()])
rows.append((name, str(modifiers), "{0:s}({1:s})".format(k.commands[0].name, allargs), k.desc))
rows.sort()
for row in rows:
result.add(row)
return str(result)
[docs] def cmd_to_layout_index(self, index, group=None):
"""Switch to the layout with the given index in self.layouts.
Parameters
==========
index :
Index of the layout in the list of layouts.
group :
Group name. If not specified, the current group is assumed.
"""
if group:
group = self.groupMap.get(group)
else:
group = self.currentGroup
group.toLayoutIndex(index)
[docs] def cmd_next_layout(self, group=None):
"""Switch to the next layout.
Parameters
==========
group :
Group name. If not specified, the current group is assumed
"""
if group:
group = self.groupMap.get(group)
else:
group = self.currentGroup
group.nextLayout()
[docs] def cmd_prev_layout(self, group=None):
"""Switch to the previous layout.
Parameters
==========
group :
Group name. If not specified, the current group is assumed
"""
if group:
group = self.groupMap.get(group)
else:
group = self.currentGroup
group.prevLayout()
[docs] def cmd_screens(self):
"""Return a list of dictionaries providing information on all screens"""
lst = [dict(
index=i.index,
group=i.group.name if i.group is not None else None,
x=i.x,
y=i.y,
width=i.width,
height=i.height,
gaps=dict(
top=i.top.geometry() if i.top else None,
bottom=i.bottom.geometry() if i.bottom else None,
left=i.left.geometry() if i.left else None,
right=i.right.geometry() if i.right else None,
)
) for i in self.screens]
return lst
[docs] def cmd_simulate_keypress(self, modifiers, key):
"""Simulates a keypress on the focused window.
Parameters
==========
modifiers :
A list of modifier specification strings. Modifiers can be one of
"shift", "lock", "control" and "mod1" - "mod5".
key :
Key specification.
Examples
========
simulate_keypress(["control", "mod2"], "k")
"""
# FIXME: This needs to be done with sendevent, once we have that fixed.
keysym = xcbq.keysyms.get(key)
if keysym is None:
raise command.CommandError(u"Unknown key: {0:s}".format(key))
keycode = self.conn.first_sym_to_code[keysym]
class DummyEv(object):
pass
d = DummyEv()
d.detail = keycode
try:
d.state = utils.translate_masks(modifiers)
except KeyError as v:
return v.args[0]
self.handle_KeyPress(d)
[docs] def cmd_execute(self, cmd, args):
"""Executes the specified command, replacing the current process"""
self.stop()
os.execv(cmd, args)
[docs] def cmd_restart(self):
"""Restart qtile using the execute command"""
argv = [sys.executable] + sys.argv
if '--no-spawn' not in argv:
argv.append('--no-spawn')
buf = io.BytesIO()
try:
pickle.dump(QtileState(self), buf, protocol=0)
except:
logger.error("Unable to pickle qtile state")
argv = [s for s in argv if not s.startswith('--with-state')]
argv.append('--with-state=' + buf.getvalue().decode())
self.cmd_execute(sys.executable, argv)
[docs] def cmd_spawn(self, cmd):
"""Run cmd in a shell.
cmd may be a string, which is parsed by shlex.split, or a list (similar
to subprocess.Popen).
Examples
========
spawn("firefox")
spawn(["xterm", "-T", "Temporary terminal"])
"""
if isinstance(cmd, six.string_types):
args = shlex.split(cmd)
else:
args = list(cmd)
r, w = os.pipe()
pid = os.fork()
if pid < 0:
os.close(r)
os.close(w)
return pid
if pid == 0:
os.close(r)
# close qtile's stdin, stdout, stderr so the called process doesn't
# pollute our xsession-errors.
os.close(0)
os.close(1)
os.close(2)
pid2 = os.fork()
if pid2 == 0:
os.close(w)
# Open /dev/null as stdin, stdout, stderr
try:
fd = os.open(os.devnull, os.O_RDWR)
except OSError:
# This shouldn't happen, catch it just in case
pass
else:
# For Python >=3.4, need to set file descriptor to inheritable
try:
os.set_inheritable(fd, True)
except AttributeError:
pass
# Again, this shouldn't happen, but we should just check
if fd > 0:
os.dup2(fd, 0)
os.dup2(fd, 1)
os.dup2(fd, 2)
try:
os.execvp(args[0], args)
except OSError as e:
logger.error("failed spawn: \"{0}\"\n{1}".format(cmd, e))
os._exit(1)
else:
# Here it doesn't matter if fork failed or not, we just write
# its return code and exit.
os.write(w, str(pid2).encode())
os.close(w)
# sys.exit raises SystemExit, which will then be caught by our
# top level catchall and we'll end up with two qtiles; os._exit
# actually calls exit.
os._exit(0)
else:
os.close(w)
os.waitpid(pid, 0)
# 1024 bytes should be enough for any pid. :)
pid = os.read(r, 1024)
os.close(r)
return int(pid)
[docs] def cmd_status(self):
"""Return "OK" if Qtile is running"""
return "OK"
[docs] def cmd_sync(self):
"""Sync the X display. Should only be used for development"""
self.conn.flush()
[docs] def cmd_to_screen(self, n):
"""Warp focus to screen n, where n is a 0-based screen number
Examples
========
to_screen(0)
"""
return self.toScreen(n)
[docs] def cmd_next_screen(self):
"""Move to next screen"""
return self.toScreen(
(self.screens.index(self.currentScreen) + 1) % len(self.screens)
)
[docs] def cmd_prev_screen(self):
"""Move to the previous screen"""
return self.toScreen(
(self.screens.index(self.currentScreen) - 1) % len(self.screens)
)
[docs] def cmd_windows(self):
"""Return info for each client window"""
return [
i.info() for i in self.windowMap.values()
if not isinstance(i, window.Internal)
]
[docs] def cmd_internal_windows(self):
"""Return info for each internal window (bars, for example)"""
return [
i.info() for i in self.windowMap.values()
if isinstance(i, window.Internal)
]
[docs] def cmd_qtile_info(self):
"""Returns a dictionary of info on the Qtile instance"""
return dict(socketname=self.fname)
[docs] def cmd_shutdown(self):
"""Quit Qtile"""
self.stop()
[docs] def cmd_switch_groups(self, groupa, groupb):
"""Switch position of groupa to groupb"""
if groupa not in self.groupMap or groupb not in self.groupMap:
return
indexa = self.groups.index(self.groupMap[groupa])
indexb = self.groups.index(self.groupMap[groupb])
self.groups[indexa], self.groups[indexb] = \
self.groups[indexb], self.groups[indexa]
hook.fire("setgroup")
# update window _NET_WM_DESKTOP
for group in (self.groups[indexa], self.groups[indexb]):
for w in group.windows:
w.group = group
def find_window(self, wid):
window = self.windowMap.get(wid)
if window:
if not window.group.screen:
self.currentScreen.setGroup(window.group)
window.group.focus(window, False)
[docs] def cmd_findwindow(self, prompt="window", widget="prompt"):
"""Launch prompt widget to find a window of the given name
Parameters
==========
prompt :
Text with which to prompt user (default: "window")
widget :
Name of the prompt widget (default: "prompt")
"""
mb = self.widgetMap.get(widget)
if not mb:
logger.error("No widget named '{0:s}' present.".format(widget))
return
mb.startInput(
prompt,
self.find_window,
"window",
strict_completer=True
)
[docs] def cmd_next_urgent(self):
"""Focus next window with urgent hint"""
try:
nxt = [w for w in self.windowMap.values() if w.urgent][0]
nxt.group.cmd_toscreen()
nxt.group.focus(nxt)
except IndexError:
pass # no window had urgent set
[docs] def cmd_togroup(self, prompt="group", widget="prompt"):
"""Launch prompt widget to move current window to a given group
Parameters
==========
prompt :
Text with which to prompt user (default: "group")
widget :
Name of the prompt widget (default: "prompt")
"""
if not self.currentWindow:
logger.warning("No window to move")
return
mb = self.widgetMap.get(widget)
if not mb:
logger.error("No widget named '{0:s}' present.".format(widget))
return
mb.startInput(prompt, self.moveToGroup, "group", strict_completer=True)
[docs] def cmd_switchgroup(self, prompt="group", widget="prompt"):
"""Launch prompt widget to switch to a given group to the current screen
Parameters
==========
prompt :
Text with which to prompt user (default: "group")
widget :
Name of the prompt widget (default: "prompt")
"""
def f(group):
if group:
try:
self.groupMap[group].cmd_toscreen()
except KeyError:
logger.info(u"No group named '{0:s}' present.".format(group))
mb = self.widgetMap.get(widget)
if not mb:
logger.warning("No widget named '{0:s}' present.".format(widget))
return
mb.startInput(prompt, f, "group", strict_completer=True)
[docs] def cmd_spawncmd(self, prompt="spawn", widget="prompt",
command="%s", complete="cmd"):
"""Spawn a command using a prompt widget, with tab-completion.
Parameters
==========
prompt :
Text with which to prompt user (default: "spawn: ").
widget :
Name of the prompt widget (default: "prompt").
command :
command template (default: "%s").
complete :
Tab completion function (default: "cmd")
"""
def f(args):
if args:
self.cmd_spawn(command % args)
try:
mb = self.widgetMap[widget]
mb.startInput(prompt, f, complete)
except KeyError:
logger.error("No widget named '{0:s}' present.".format(widget))
[docs] def cmd_qtilecmd(self, prompt="command",
widget="prompt", messenger="xmessage"):
""" Execute a Qtile command using the client syntax
Tab completeion aids navigation of the command tree
Parameters
==========
prompt :
Text to display at the prompt (default: "command: ")
widget :
Name of the prompt widget (default: "prompt")
messenger :
Command to display output, set this to None to disable (default:
"xmessage")
"""
def f(cmd):
if cmd:
# c here is used in eval() below
c = command.CommandRoot(self) # noqa
try:
cmd_arg = str(cmd).split(' ')
except AttributeError:
return
cmd_len = len(cmd_arg)
if cmd_len == 0:
logger.info('No command entered.')
return
try:
result = eval(u'c.{0:s}'.format(cmd))
except (
command.CommandError,
command.CommandException,
AttributeError) as err:
logger.error(err)
result = None
if result is not None:
from pprint import pformat
message = pformat(result)
if messenger:
self.cmd_spawn('{0:s} "{1:s}"'.format(messenger, message))
logger.info(result)
mb = self.widgetMap[widget]
if not mb:
logger.error("No widget named {0:s} present.".format(widget))
return
mb.startInput(prompt, f, "qshell")
[docs] def cmd_addgroup(self, group):
"""Add a group with the given name"""
return self.addGroup(group)
[docs] def cmd_delgroup(self, group):
"""Delete a group with the given name"""
return self.delGroup(group)
[docs] def cmd_add_rule(self, match_args, rule_args, min_priorty=False):
"""Add a dgroup rule, returns rule_id needed to remove it
Parameters
==========
match_args :
config.Match arguments
rule_args :
config.Rule arguments
min_priorty :
If the rule is added with minimum prioriry (last) (default: False)
"""
if not self.dgroups:
logger.warning('No dgroups created')
return
match = Match(**match_args)
rule = Rule(match, **rule_args)
return self.dgroups.add_rule(rule, min_priorty)
[docs] def cmd_remove_rule(self, rule_id):
"""Remove a dgroup rule by rule_id"""
self.dgroups.remove_rule(rule_id)
[docs] def cmd_run_external(self, full_path):
"""Run external Python script"""
def format_error(path, e):
s = """Can't call "main" from "{path}"\n\t{err_name}: {err}"""
return s.format(path=path, err_name=e.__class__.__name__, err=e)
module_name = os.path.splitext(os.path.basename(full_path))[0]
dir_path = os.path.dirname(full_path)
err_str = ""
local_stdout = io.BytesIO()
old_stdout = sys.stdout
sys.stdout = local_stdout
sys.exc_clear()
try:
module = _import_module(module_name, dir_path)
module.main(self)
except ImportError as e:
err_str += format_error(full_path, e)
except:
(exc_type, exc_value, exc_traceback) = sys.exc_info()
err_str += traceback.format_exc()
err_str += format_error(full_path, exc_type(exc_value))
finally:
sys.exc_clear()
sys.stdout = old_stdout
local_stdout.close()
return local_stdout.getvalue() + err_str
[docs] def cmd_hide_show_bar(self, position="all"):
"""Toggle visibility of a given bar
Parameters
==========
position :
one of: "top", "bottom", "left", "right", or "all" (default: "all")
"""
if position in ["top", "bottom", "left", "right"]:
bar = getattr(self.currentScreen, position)
if bar:
bar.show(not bar.is_show())
self.currentGroup.layoutAll()
else:
logger.warning(
"Not found bar in position '%s' for hide/show." % position)
elif position == "all":
screen = self.currentScreen
is_show = None
for bar in [screen.left, screen.right, screen.top, screen.bottom]:
if bar:
if is_show is None:
is_show = not bar.is_show()
bar.show(is_show)
if is_show is not None:
self.currentGroup.layoutAll()
else:
logger.warning("Not found bar for hide/show.")
else:
logger.error("Invalid position value:{0:s}".format(position))
[docs] def cmd_get_state(self):
"""Get pickled state for restarting qtile"""
buf = io.BytesIO()
pickle.dump(QtileState(self), buf, protocol=0)
state = buf.getvalue().decode()
logger.info('State = ')
logger.info(''.join(state.split('\n')))
return state
[docs] def cmd_tracemalloc_toggle(self):
"""Toggle tracemalloc status
Running tracemalloc is required for qtile-top
"""
if not tracemalloc.is_tracing():
tracemalloc.start()
else:
tracemalloc.stop()
[docs] def cmd_tracemalloc_dump(self):
"""Dump tracemalloc snapshot"""
if not tracemalloc:
logger.warning('No tracemalloc module')
raise command.CommandError("No tracemalloc module")
if not tracemalloc.is_tracing():
return [False, "Trace not started"]
cache_directory = get_cache_dir()
malloc_dump = os.path.join(cache_directory, "qtile_tracemalloc.dump")
tracemalloc.take_snapshot().dump(malloc_dump)
return [True, malloc_dump]
[docs] def cmd_run_extention(self, cls):
"""Extentions should run from command run()"""
c = cls(self)
c.run()