#! /usr/bin/python # by: http://en.opensuse.org/User:Mvidner # license: http://creativecommons.org/licenses/by/3.0/ # based on http://alban.apinc.org/blog/dbusmessagesboxpy/ VERSION = "0.1" print "dbus-spy %s: monitor, recorder, chart generator" % VERSION norpm = False import sys import os import time from optparse import OptionParser try: import dbus import dbus.service import _dbus_bindings except: print "Install dbus-1-python.rpm" norpm = True try: import gobject except: print "Install python-gobject2.rpm" norpm = True # python-gnome.rpm has gconf for nm-applet... if norpm: sys.exit(1) from dbus.mainloop.glib import DBusGMainLoop DBusGMainLoop(set_as_default=True) # distinguish msgo: message object, and msgd: message dictionary (made from the object) class Output: def start(self): pass def msgd_handler(self, msgd): pass def stop(self): pass # now unused def msgo_dumper(m): print "MSG -------" print "type", m.get_type() print "destination", m.get_destination() print "sender", m.get_sender() print "path", m.get_path() print "interface", m.get_interface() print "member", m.get_member() print "signature", m.get_signature() print "args_list", m.get_args_list() print "auto_start", m.get_auto_start() print "error_name", m.get_error_name() print "no_reply", m.get_no_reply() print "reply_serial", m.get_reply_serial() print "serial", m.get_serial() def msgd_dumper(m): print "MSG -------", m["TIME"] print "type", m["type"] print "destination", m["destination"] print "sender", m["sender"] print "path", m["path"] print "interface", m["interface"] print "member", m["member"] print "signature", m["signature"] print "args_list", m["args_list"] print "auto_start", m["auto_start"] print "error_name", m["error_name"] print "no_reply", m["no_reply"] print "reply_serial", m["reply_serial"] print "serial", m["serial"] TYPE = ["-", "Call", "Return", "Error", "Signal",] ARROW = ["?", "=>", ">>", "=>>", "=>*",] def default(abbr, val, defval): if (val != defval): return "%s: %s" % (abbr, val) else: return "" def msgd_printer(m): print m["TIME"], TYPE[m["type"]], default("S", m["serial"], 0), default("RS", m["reply_serial"], 0), default("NR", m["no_reply"], False), \ m["sender"], "->", m["destination"], default("AS", m["auto_start"], True), default("ERR", m["error_name"], None) print " ", m["path"], "%s.%s(%s) ((%s))" % (m["interface"], m["member"], ",".join(map(str, m["args_list"])), m["signature"]) class Handler(Output): def __init__(self, msgd_handler): """msgd_handler: callable""" self.msgd_handler = msgd_handler class Chart(Output): def __init__(self, fname): self.fname = fname def start(self): self.seen = {} self.names = {} self.serials = {} self.msgs = [] self.lines = [] def sniff(self, m): """infer more info about bus traffic""" src = m["sender"] dest = m["destination"] serial = m["serial"] rserial = m["reply_serial"] ifc = m["interface"] call = m["member"] args = m["args_list"] if ifc == "org.freedesktop.DBus": if TYPE[m["type"]] == "Signal": if call == "NameOwnerChanged": (nname, nfrom, nto) = args #print "NOC '%s'\t%s->%s" % (nname, nfrom, nto) elif call == "NameAcquired": nname = args[0] #print "NA '%s'" % nname # TODO get replies to # calls GetNameOwner, RequestName, NameHasOwner elif TYPE[m["type"]] == "Call": if call == "GetNameOwner": #print "GNO", args pass elif call == "RequestName": #print "RN", args self.names[src] = args[0] # remember serial number if name is interesting if TYPE[m["type"]] == "Call": if serial != 0 and self.interesting_name(dest): self.serials.setdefault(src, {}) self.serials[src][serial] = dest #print "INTERESTING:", src, serial, dest # match by reply serial numbers if rserial != 0: try: self.names[src] = self.serials[dest][rserial] except: print "REPLY?! %s = (%s,%d)" % (src, dest, rserial) def interesting_name(self, conn): return not self.names.has_key(conn) and conn[0] != ":" def name(self, conn): return self.names.get(conn, conn) def msgd_handler(self, m): self.msgs.append(m) # store because the names will only be known later #print ".", self.sniff(m) def post_msgd_handler(self, m): src = self.name(m["sender"]) dest = self.name(m["destination"]) self.seen[src] = True if dest != None: self.seen[dest] = True arrow = ARROW[m["type"]] T = TYPE[m["type"]] if T == "Signal": dest = "" else: dest = '"%s"' % dest label = m["member"] if T == "Error": label = m["error_name"] elif T == "Return": r = m["args_list"] if len(r) == 0: label = "." else: label = repr(r[0]).replace('"', '||') label = label[:333] line = '"%s" %s %s [ label = "%s" ];\n' % (src, arrow, dest, label) self.lines.append(line) def stop(self): for m in self.msgs: self.post_msgd_handler(m) self.f = open(self.fname, "w") # TODO comment inside f self.f.write("msc {\n") # names = map(lambda s: s.replace(".", "\\n"), sorted(self.seen.keys())) names = self.seen.keys() self.f.write(" %s;\n" % ",".join('"%s"' % d for d in names)) for l in self.lines: self.f.write(l) self.f.write("}\n") self.f.close() class Recorder(Output): def __init__(self, fname): self.fname = fname def start(self): self.f = open(self.fname, "w") # TODO comment inside f self.f.write("[\n") def msgd_handler(self, msgd): self.f.write(str(msgd)+",\n") def stop(self): self.f.write("]\n") self.f.close() class Filter(Output): def __init__(self, filter, out): self.filter = filter self.out = out def start(self): self.out.start() def msgd_handler(self, msgd): if self.match(msgd): self.out.msgd_handler(msgd) def stop(self): self.out.stop() def match(self, msgd): # quick and dirty: match substring in msgd python representation return repr(msgd).find(self.filter) != -1 class NotFilter(Filter): def msgd_handler(self, msgd): if not self.match(msgd): self.out.msgd_handler(msgd) class Input: def start(self): pass def run(self): #print self.__class__ self.start() self.out.start() self.run_inner() self.out.stop() self.stop() def run_inner(self): pass def stop(self): pass class Monitor(Input): def __init__(self, which_bus, out): """which_bus: "system" or "session" out: Output""" if which_bus == "system": self.bus = dbus.SystemBus() else: self.bus = dbus.SessionBus() self.out = out def start(self): self.bus.add_match_string("") self.bus.add_message_filter(self.msg_filter) def stop(self): self.bus.remove_message_filter(self.msg_filter) self.bus.remove_match_string("") def msgd_maker(self, m): items = ["args_list", "auto_start", "destination", "error_name", "interface", "member", "no_reply", "path", "reply_serial", "sender", "serial", "signature", "type",] # TODO make more efficient by compile() c = "{\n" for i in items: c = c + " '%s': m.get_%s(),\n" % (i, i) c = c + "}\n" try: dump = eval(c) except Exception, e: print c print e dump["TIME"] = time.time() return dump # msg is dbus.lowlevel.Message def msg_filter(self, abus, msg): try: msgd = self.msgd_maker(msg) self.out.msgd_handler(msgd) # exceptions in signal handlers are eaten by the bindings, # so we better do something ourselves # http://bugs.freedesktop.org/show_bug.cgi?id=9980 except Exception, e: print e def run_inner(self): loop = gobject.MainLoop() print "Press Ctrl-C to stop." try: loop.run() except: print " Loop exited" DBUS_MESSAGE_TYPE_METHOD_CALL = 1 DBUS_MESSAGE_TYPE_METHOD_RETURN = 2 DBUS_MESSAGE_TYPE_ERROR = 3 DBUS_MESSAGE_TYPE_SIGNAL = 4 class Reader(Input): def __init__(self, fname, out): self.f = open(fname, "r") self.out = out try: slurp = self.f.read() self.messages = eval(slurp) # TODO! protect from evil code except Exception, e: print e self.messages = [] # reconstructing messages is too hard. let's work with dicts instead @staticmethod def newmsg(msg_d): print repr(msg_d) print msg_d.__class__ t = msg_d["type"] if t == DBUS_MESSAGE_TYPE_METHOD_CALL: msg = dbus.lowlevel.MethodCallMessage(**msg_d) elif t == DBUS_MESSAGE_TYPE_METHOD_RETURN: msg = dbus.lowlevel.MethodReturnMessage(**msg_d) elif t == DBUS_MESSAGE_TYPE_ERROR: msg = dbus.lowlevel.ErrorMessage(**msg_d) elif t == DBUS_MESSAGE_TYPE_SIGNAL: msg = dbus.lowlevel.SignalMessage(path=msg_d["path"], interface=msg_d["interface"], method=msg_d["member"]) for k, v in msg_d.iteritems(): eval("msg.set_%s(v)" % k) return msg def run_inner(self): for msg_d in self.messages: self.out.msgd_handler(msg_d) op = OptionParser() # INPUT op.add_option("-b", "--bus", dest="bus", help="monitor messages from the bus") op.add_option("-r", "--read", dest="read", help="read messages from file") # OUTPUT op.add_option("-d", "--dump", dest="dump", action="store_true", default=False, help="dump seen messages") op.add_option("-p", "--print", dest="printer", action="store_true", default=False, help="print seen messages") op.add_option("-w", "--write", dest="writeto", help="write seen messages to file") op.add_option("-c", "--chart", dest="chart", help="write MSC chart to file") # FILTERS - TODO op.add_option("-f", "--filter", dest="filter", help="pass only matching messages") op.add_option("-F", "--notfilter", dest="notfilter", help="pass only nonmatching messages") (options, args) = op.parse_args() # TODO multiple outputs: multiplexer(fork) if options.dump: out = Handler(msgd_dumper) elif options.chart != None: out = Chart(options.chart) elif options.writeto != None: out = Recorder(options.writeto) elif options.printer: out = Handler(msgd_printer) else: print "Default output: pretty printer" out = Handler(msgd_printer) # TODO better chaining, multiple filters of same kind if options.filter != None: out = Filter(options.filter, out) if options.notfilter != None: out = NotFilter(options.notfilter, out) if options.bus != None: input = Monitor(options.bus, out) elif options.read != None: input = Reader(options.read, out) else: print "Default input: system monitor" input = Monitor("system", out) input.run()