463 lines
14 KiB
Python
463 lines
14 KiB
Python
|
#!/usr/bin/python3
|
||
|
import pathlib
|
||
|
import logging
|
||
|
import gzip
|
||
|
import pypandoc
|
||
|
import re
|
||
|
import subprocess
|
||
|
from collections import Counter
|
||
|
import threading
|
||
|
import queue
|
||
|
|
||
|
SYSTEMD_ARGUMENT = re.compile(r'\*(\w+)\=\*\s{1,}((>.*\s{1,}){1,})')
|
||
|
SYSTEMD_UNIT = re.compile(r'\\\[([A-Z][a-z]\w+)\\\]')
|
||
|
SYSTEMD_DIGG = re.compile(r'\*\*(.*)\*\*')
|
||
|
DOC_PATH = "/usr/share/man"
|
||
|
|
||
|
DIRECTIVES_ARGUMENT = re.compile(r"(\w+)=\s+(.*)\(.\)")
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class ManPage(threading.Thread):
|
||
|
INSTANCES = []
|
||
|
|
||
|
@classmethod
|
||
|
def is_parsed(cls, path):
|
||
|
if str(path) in cls.INSTANCES:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def __init__(self, db, arg):
|
||
|
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
|
||
|
self.logger = log.getChild(self.__class__.__name__)
|
||
|
self.db = db
|
||
|
self._arg = arg
|
||
|
|
||
|
def main(self):
|
||
|
if ManPage.is_parsed(str(self._arg)):
|
||
|
self.logger.debug("{} already parsed".format(self._arg))
|
||
|
else:
|
||
|
ManPage.INSTANCES.append(str(self._arg))
|
||
|
s = subprocess.Popen("man --where {}".format(self._arg).split(), stdout=subprocess.PIPE)
|
||
|
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
|
||
|
if path.exists() and not path.is_dir():
|
||
|
self.parse(path)
|
||
|
self.db.update(self)
|
||
|
|
||
|
def parse(self, path):
|
||
|
self.logger.debug("parsing {}".format(path))
|
||
|
with gzip.open(path, "rb") as fp:
|
||
|
man = fp.read().decode("utf-8")
|
||
|
md = pypandoc.convert_text(man, format="man", to="md")
|
||
|
matches = SYSTEMD_ARGUMENT.finditer(md)
|
||
|
if matches:
|
||
|
for match in matches:
|
||
|
name = match.group(1)
|
||
|
doc = match.group(2)
|
||
|
all_args = self.db.get_all_args()
|
||
|
known_args = [a for a in all_args if a.name == name]
|
||
|
if not len(known_args):
|
||
|
arg = SystemdArgument(name, doc=(self._arg, doc))
|
||
|
self.db.update(arg)
|
||
|
self.logger.debug("adding unknown {}".format(name))
|
||
|
else:
|
||
|
arg = known_args[0]
|
||
|
arg.doc = (self._arg, doc)
|
||
|
self.logger.debug("updating known {} with doc".format(name))
|
||
|
|
||
|
@property
|
||
|
def path(self):
|
||
|
return pathlib.Path(self._path)
|
||
|
|
||
|
@property
|
||
|
def md(self):
|
||
|
return self._md
|
||
|
|
||
|
@property
|
||
|
def man(self):
|
||
|
return self._man
|
||
|
|
||
|
@property
|
||
|
def args(self):
|
||
|
return self._arguments
|
||
|
|
||
|
@property
|
||
|
def units(self):
|
||
|
units = set(self._units)
|
||
|
return units
|
||
|
|
||
|
|
||
|
class SystemdDirective(ManPage):
|
||
|
def __init__(self):
|
||
|
s = subprocess.Popen("man --where systemd.directives".split(), stdout=subprocess.PIPE)
|
||
|
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
|
||
|
ManPage.__init__(self, path)
|
||
|
|
||
|
@property
|
||
|
def plain(self):
|
||
|
return self._plain
|
||
|
|
||
|
def parse(self):
|
||
|
with gzip.open(self.path, "rb") as fp:
|
||
|
self._man = fp.read().decode("utf-8")
|
||
|
self._plain = pypandoc.convert_text(self._man, format="man", to="plain", extra_args=("--wrap=none",))
|
||
|
matches = DIRECTIVES_ARGUMENT.finditer(self.plain)
|
||
|
man_collection = set()
|
||
|
self.test = []
|
||
|
if matches:
|
||
|
for match in matches:
|
||
|
try:
|
||
|
name = match.group(1)
|
||
|
self.test.append(name)
|
||
|
args = match.group(2)
|
||
|
args = args.replace(",", "")
|
||
|
args = args.split()
|
||
|
for arg in args:
|
||
|
arg = arg.split("(")[0]
|
||
|
if arg in man_collection:
|
||
|
continue
|
||
|
man_collection.add(arg)
|
||
|
s = subprocess.Popen("man --where {}".format(arg).split(), stdout=subprocess.PIPE)
|
||
|
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
|
||
|
if path.exists():
|
||
|
if not ManPage.is_parsed(path):
|
||
|
man = ManPage(path)
|
||
|
except Exception as e:
|
||
|
print(e)
|
||
|
|
||
|
|
||
|
class SystemdUnit(object):
|
||
|
INSTANCES = []
|
||
|
|
||
|
@classmethod
|
||
|
def exists(cls, name):
|
||
|
if name in [n.name for n in cls.INSTANCES]:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
@classmethod
|
||
|
def get_unit(cls, name):
|
||
|
unit = [u for u in cls.INSTANCES if u.name == name.lower()]
|
||
|
return unit[0]
|
||
|
|
||
|
def __init__(self, name):
|
||
|
self._name = name
|
||
|
if name not in [n.name for n in self.__class__.INSTANCES]:
|
||
|
self.__class__.INSTANCES.append(self)
|
||
|
self._data = []
|
||
|
|
||
|
def add_arguments(self, arguments):
|
||
|
self._data.extend(arguments)
|
||
|
|
||
|
@property
|
||
|
def name(self):
|
||
|
return self._name.lower()
|
||
|
|
||
|
@property
|
||
|
def man(self):
|
||
|
for path, man in self._data.items():
|
||
|
yield man
|
||
|
|
||
|
@property
|
||
|
def units(self):
|
||
|
for path, man in self._data.items():
|
||
|
for unit in man.units:
|
||
|
yield unit
|
||
|
|
||
|
@property
|
||
|
def arguments(self):
|
||
|
for arg in self._data:
|
||
|
yield arg
|
||
|
|
||
|
def search(self, pattern):
|
||
|
pattern = pattern.lower()
|
||
|
flat = [a for a in self.arguments if pattern in a.name.lower()]
|
||
|
for f in flat:
|
||
|
yield f
|
||
|
|
||
|
|
||
|
class DataBase(threading.Thread):
|
||
|
def __init__(self):
|
||
|
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
|
||
|
self.logger = log.getChild(self.__class__.__name__)
|
||
|
self._db = []
|
||
|
self._queue = queue.Queue()
|
||
|
self.start()
|
||
|
|
||
|
def setup_collection_threads(self):
|
||
|
self.t_bootstrap = BootStrap(self)
|
||
|
self.t_bootstrap.start()
|
||
|
self.t_directive = Directive(self)
|
||
|
self.t_directive.start()
|
||
|
|
||
|
def main(self):
|
||
|
self.logger.info("starting database thread")
|
||
|
self.setup_collection_threads()
|
||
|
while True:
|
||
|
data = self._queue.get()
|
||
|
if isinstance(data, bool):
|
||
|
self.logger.critical("stopping")
|
||
|
break
|
||
|
elif isinstance(data, threading.Thread):
|
||
|
self.logger.critical("joining {}".format(data.name))
|
||
|
data.join()
|
||
|
if not isinstance(data, ManPage):
|
||
|
self.logger.info("scanning manpages...")
|
||
|
for arg in self.get_all_args():
|
||
|
for path in arg.path:
|
||
|
if not ManPage.is_parsed(path):
|
||
|
t_manpage = ManPage(self, path)
|
||
|
t_manpage.start()
|
||
|
elif isinstance(data, list):
|
||
|
self._db.extend(data)
|
||
|
elif isinstance(data, SystemdArgument):
|
||
|
self._db.append(data)
|
||
|
|
||
|
def update(self, data):
|
||
|
self._queue.put(data)
|
||
|
|
||
|
def stats(self):
|
||
|
count = Counter(self._db)
|
||
|
print(len(count))
|
||
|
|
||
|
def get_all_args(self):
|
||
|
args = [a for a in self._db]
|
||
|
return args
|
||
|
|
||
|
def get_unit_args(self, unit):
|
||
|
args = [a for a in self._db if unit in a.units]
|
||
|
return args
|
||
|
|
||
|
def search(self, pattern, unit=None):
|
||
|
if unit is None:
|
||
|
args = self.get_all_args()
|
||
|
else:
|
||
|
args = self.get_unit_args(unit)
|
||
|
pattern = pattern.lower()
|
||
|
flat = [a for a in args if pattern in a.name.lower()]
|
||
|
for f in flat:
|
||
|
yield f
|
||
|
|
||
|
|
||
|
class SystemdArgument(object):
|
||
|
def __init__(self, name, value=None, unit=None, doc=None, path=None):
|
||
|
self._name = name
|
||
|
self._units = []
|
||
|
self._path = []
|
||
|
self._doc = []
|
||
|
self._value = value
|
||
|
self.units = unit
|
||
|
self.path = path
|
||
|
|
||
|
@property
|
||
|
def name(self):
|
||
|
return self._name
|
||
|
|
||
|
@property
|
||
|
def value(self):
|
||
|
return self._value
|
||
|
|
||
|
@property
|
||
|
def doc(self):
|
||
|
# doc = pypandoc.convert("\n".join(self._doc), format="md", to="plain")
|
||
|
# return doc.strip().replace("\n", "")
|
||
|
doc = "Doc:\n\n"
|
||
|
for src, content in self._doc:
|
||
|
doc += "{}:\n\n{}\n\n".format(src, content.replace(">", ""))
|
||
|
return doc
|
||
|
|
||
|
@doc.setter
|
||
|
def doc(self, data):
|
||
|
if data is not None:
|
||
|
self._doc.append(data)
|
||
|
|
||
|
@property
|
||
|
def doc_md(self):
|
||
|
return self._doc
|
||
|
|
||
|
@property
|
||
|
def path(self):
|
||
|
return self._path
|
||
|
|
||
|
@path.setter
|
||
|
def path(self, data):
|
||
|
if data is not None:
|
||
|
if isinstance(data, list):
|
||
|
self._path.extend(data)
|
||
|
else:
|
||
|
self._path.append(data)
|
||
|
|
||
|
@property
|
||
|
def units(self):
|
||
|
return self._units
|
||
|
|
||
|
@units.setter
|
||
|
def units(self, data):
|
||
|
if data is not None:
|
||
|
self._units.append(data)
|
||
|
|
||
|
|
||
|
DUMP_ARGS = re.compile(r"(\w+)=(.*)")
|
||
|
DUMP_UNIT = re.compile(r"\[([A-Z]\w+)\]")
|
||
|
|
||
|
|
||
|
class BootStrap(threading.Thread):
|
||
|
def __init__(self, db):
|
||
|
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
|
||
|
self.logger = log.getChild(self.__class__.__name__)
|
||
|
self.db = db
|
||
|
|
||
|
def main(self):
|
||
|
self._parse_config_dump()
|
||
|
self._data_from_config_dump()
|
||
|
self.logger.debug("done")
|
||
|
self.db.update(self)
|
||
|
|
||
|
def _set_key(self, line):
|
||
|
matches = DUMP_UNIT.finditer(line)
|
||
|
if matches:
|
||
|
for match in matches:
|
||
|
unit = match.group(1)
|
||
|
return unit
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
def _split_argument(self, line):
|
||
|
matches = DUMP_ARGS.finditer(line)
|
||
|
if matches:
|
||
|
for match in matches:
|
||
|
arg = match.group(1)
|
||
|
val = match.group(2)
|
||
|
return arg, val
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
def _parse_config_dump(self):
|
||
|
s = subprocess.Popen("systemd --dump-configuration --no-pager".split(), stdout=subprocess.PIPE)
|
||
|
s.wait()
|
||
|
dump = s.stdout.read().decode("utf-8")
|
||
|
data = {}
|
||
|
for line in dump.split():
|
||
|
new_key = self._set_key(line)
|
||
|
if new_key is not None:
|
||
|
key = new_key
|
||
|
if key not in data.keys():
|
||
|
data[key] = []
|
||
|
continue
|
||
|
arg = self._split_argument(line)
|
||
|
if arg is None:
|
||
|
continue
|
||
|
data[key].append(arg)
|
||
|
self._data = data
|
||
|
|
||
|
def _data_from_config_dump(self):
|
||
|
data = []
|
||
|
for key, possible_values in self._data.items():
|
||
|
for name, value in possible_values:
|
||
|
all_args = self.db.get_all_args()
|
||
|
known_args = [a for a in all_args if a.name == name]
|
||
|
if not len(known_args):
|
||
|
arg = SystemdArgument(name, value=value, unit=key)
|
||
|
self.db.update(arg)
|
||
|
self.logger.debug("adding unknown {}, for {}".format(name, key))
|
||
|
else:
|
||
|
arg = known_args[0]
|
||
|
arg.value = value
|
||
|
arg.unit = key
|
||
|
self.logger.debug("updating known {}".format(name))
|
||
|
|
||
|
|
||
|
class Directive(threading.Thread):
|
||
|
def __init__(self, db):
|
||
|
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
|
||
|
self.logger = log.getChild(self.__class__.__name__)
|
||
|
s = subprocess.Popen("man --where systemd.directives".split(), stdout=subprocess.PIPE)
|
||
|
self._path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
|
||
|
self.db = db
|
||
|
|
||
|
def main(self):
|
||
|
self.parse()
|
||
|
self.logger.debug("done")
|
||
|
self.db.update(self)
|
||
|
|
||
|
def parse(self):
|
||
|
self.logger.debug("opening man page to parse")
|
||
|
with gzip.open(self._path, "rb") as fp:
|
||
|
self._man = fp.read().decode("utf-8")
|
||
|
self._plain = pypandoc.convert_text(self._man, format="man", to="plain", extra_args=("--wrap=none",))
|
||
|
matches = DIRECTIVES_ARGUMENT.finditer(self._plain)
|
||
|
if matches:
|
||
|
for match in matches:
|
||
|
try:
|
||
|
name = match.group(1)
|
||
|
mans = match.group(2)
|
||
|
mans = mans.replace(",", "")
|
||
|
mans = mans.split()
|
||
|
mans = [m.split("(")[0] for m in mans]
|
||
|
all_args = self.db.get_all_args()
|
||
|
known_args = [a for a in all_args if a.name == name]
|
||
|
if not len(known_args):
|
||
|
arg = SystemdArgument(name, value=None, unit=None, doc="None", path=mans)
|
||
|
self.db.update(arg)
|
||
|
self.logger.debug("adding unknown {}".format(name))
|
||
|
else:
|
||
|
arg = known_args[0]
|
||
|
arg.path = mans
|
||
|
self.logger.debug("updating known {}".format(name))
|
||
|
#for arg in args:
|
||
|
# arg = arg.split("(")[0]
|
||
|
# if arg in man_collection:
|
||
|
# continue
|
||
|
# man_collection.add(arg)
|
||
|
# s = subprocess.Popen("man --where {}".format(arg).split(), stdout=subprocess.PIPE)
|
||
|
# path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
|
||
|
# if path.exists():
|
||
|
# if not ManPage.is_parsed(path):
|
||
|
# man = ManPage(path)
|
||
|
except Exception as e:
|
||
|
pass
|
||
|
|
||
|
|
||
|
class Completor(object):
|
||
|
def __init__(self):
|
||
|
self.logger = log.getChild(self.__class__.__name__)
|
||
|
self.db = DataBase()
|
||
|
|
||
|
def stats(self):
|
||
|
print(len(self.db.get_all_args()))
|
||
|
|
||
|
def get_units(self):
|
||
|
units = [a.units for a in self.db.get_all_args()]
|
||
|
unique = []
|
||
|
for unit in units:
|
||
|
if isinstance(unit, list):
|
||
|
unique.extend(unit)
|
||
|
else:
|
||
|
unique.append(unit)
|
||
|
unique = set(unique)
|
||
|
return unique
|
||
|
|
||
|
def get_all(self, pattern, unit=None):
|
||
|
return list(self.db.search(pattern, unit))
|
||
|
|
||
|
def get_one(self, pattern):
|
||
|
data = list(self.db.search(pattern))
|
||
|
if data:
|
||
|
return data[0]
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
log = logging.root
|
||
|
log.addHandler(logging.StreamHandler())
|
||
|
log.setLevel(logging.INFO)
|
||
|
log.info("hello world")
|
||
|
app = Completor()
|
||
|
while True:
|
||
|
result = input("?")
|
||
|
if result == "s":
|
||
|
app.stats()
|
||
|
else:
|
||
|
for i in app.get_all(result, None):
|
||
|
print(i.name, i.path, i.units, i.doc)
|