configuration-language-server/confls/helpers/systemd.py

463 lines
14 KiB
Python
Raw Permalink Normal View History

2021-09-09 18:31:23 +02:00
#!/usr/bin/python3
import pathlib
import logging
import gzip
import pypandoc
import re
import subprocess
from collections import Counter
import threading
import queue
SYSTEMD_ARGUMENT = re.compile(r'\*(\w+)\=\*\s{1,}((>.*\s{1,}){1,})')
SYSTEMD_UNIT = re.compile(r'\\\[([A-Z][a-z]\w+)\\\]')
SYSTEMD_DIGG = re.compile(r'\*\*(.*)\*\*')
DOC_PATH = "/usr/share/man"
DIRECTIVES_ARGUMENT = re.compile(r"(\w+)=\s+(.*)\(.\)")
log = logging.getLogger(__name__)
class ManPage(threading.Thread):
INSTANCES = []
@classmethod
def is_parsed(cls, path):
if str(path) in cls.INSTANCES:
return True
return False
def __init__(self, db, arg):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
self.db = db
self._arg = arg
def main(self):
if ManPage.is_parsed(str(self._arg)):
self.logger.debug("{} already parsed".format(self._arg))
else:
ManPage.INSTANCES.append(str(self._arg))
s = subprocess.Popen("man --where {}".format(self._arg).split(), stdout=subprocess.PIPE)
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
if path.exists() and not path.is_dir():
self.parse(path)
self.db.update(self)
def parse(self, path):
self.logger.debug("parsing {}".format(path))
with gzip.open(path, "rb") as fp:
man = fp.read().decode("utf-8")
md = pypandoc.convert_text(man, format="man", to="md")
matches = SYSTEMD_ARGUMENT.finditer(md)
if matches:
for match in matches:
name = match.group(1)
doc = match.group(2)
all_args = self.db.get_all_args()
known_args = [a for a in all_args if a.name == name]
if not len(known_args):
arg = SystemdArgument(name, doc=(self._arg, doc))
self.db.update(arg)
self.logger.debug("adding unknown {}".format(name))
else:
arg = known_args[0]
arg.doc = (self._arg, doc)
self.logger.debug("updating known {} with doc".format(name))
@property
def path(self):
return pathlib.Path(self._path)
@property
def md(self):
return self._md
@property
def man(self):
return self._man
@property
def args(self):
return self._arguments
@property
def units(self):
units = set(self._units)
return units
class SystemdDirective(ManPage):
def __init__(self):
s = subprocess.Popen("man --where systemd.directives".split(), stdout=subprocess.PIPE)
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
ManPage.__init__(self, path)
@property
def plain(self):
return self._plain
def parse(self):
with gzip.open(self.path, "rb") as fp:
self._man = fp.read().decode("utf-8")
self._plain = pypandoc.convert_text(self._man, format="man", to="plain", extra_args=("--wrap=none",))
matches = DIRECTIVES_ARGUMENT.finditer(self.plain)
man_collection = set()
self.test = []
if matches:
for match in matches:
try:
name = match.group(1)
self.test.append(name)
args = match.group(2)
args = args.replace(",", "")
args = args.split()
for arg in args:
arg = arg.split("(")[0]
if arg in man_collection:
continue
man_collection.add(arg)
s = subprocess.Popen("man --where {}".format(arg).split(), stdout=subprocess.PIPE)
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
if path.exists():
if not ManPage.is_parsed(path):
man = ManPage(path)
except Exception as e:
print(e)
class SystemdUnit(object):
INSTANCES = []
@classmethod
def exists(cls, name):
if name in [n.name for n in cls.INSTANCES]:
return True
return False
@classmethod
def get_unit(cls, name):
unit = [u for u in cls.INSTANCES if u.name == name.lower()]
return unit[0]
def __init__(self, name):
self._name = name
if name not in [n.name for n in self.__class__.INSTANCES]:
self.__class__.INSTANCES.append(self)
self._data = []
def add_arguments(self, arguments):
self._data.extend(arguments)
@property
def name(self):
return self._name.lower()
@property
def man(self):
for path, man in self._data.items():
yield man
@property
def units(self):
for path, man in self._data.items():
for unit in man.units:
yield unit
@property
def arguments(self):
for arg in self._data:
yield arg
def search(self, pattern):
pattern = pattern.lower()
flat = [a for a in self.arguments if pattern in a.name.lower()]
for f in flat:
yield f
class DataBase(threading.Thread):
def __init__(self):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
self._db = []
self._queue = queue.Queue()
self.start()
def setup_collection_threads(self):
self.t_bootstrap = BootStrap(self)
self.t_bootstrap.start()
self.t_directive = Directive(self)
self.t_directive.start()
def main(self):
self.logger.info("starting database thread")
self.setup_collection_threads()
while True:
data = self._queue.get()
if isinstance(data, bool):
self.logger.critical("stopping")
break
elif isinstance(data, threading.Thread):
self.logger.critical("joining {}".format(data.name))
data.join()
if not isinstance(data, ManPage):
self.logger.info("scanning manpages...")
for arg in self.get_all_args():
for path in arg.path:
if not ManPage.is_parsed(path):
t_manpage = ManPage(self, path)
t_manpage.start()
elif isinstance(data, list):
self._db.extend(data)
elif isinstance(data, SystemdArgument):
self._db.append(data)
def update(self, data):
self._queue.put(data)
def stats(self):
count = Counter(self._db)
print(len(count))
def get_all_args(self):
args = [a for a in self._db]
return args
def get_unit_args(self, unit):
args = [a for a in self._db if unit in a.units]
return args
def search(self, pattern, unit=None):
if unit is None:
args = self.get_all_args()
else:
args = self.get_unit_args(unit)
pattern = pattern.lower()
flat = [a for a in args if pattern in a.name.lower()]
for f in flat:
yield f
class SystemdArgument(object):
def __init__(self, name, value=None, unit=None, doc=None, path=None):
self._name = name
self._units = []
self._path = []
self._doc = []
self._value = value
self.units = unit
self.path = path
@property
def name(self):
return self._name
@property
def value(self):
return self._value
@property
def doc(self):
# doc = pypandoc.convert("\n".join(self._doc), format="md", to="plain")
# return doc.strip().replace("\n", "")
doc = "Doc:\n\n"
for src, content in self._doc:
doc += "{}:\n\n{}\n\n".format(src, content.replace(">", ""))
return doc
@doc.setter
def doc(self, data):
if data is not None:
self._doc.append(data)
@property
def doc_md(self):
return self._doc
@property
def path(self):
return self._path
@path.setter
def path(self, data):
if data is not None:
if isinstance(data, list):
self._path.extend(data)
else:
self._path.append(data)
@property
def units(self):
return self._units
@units.setter
def units(self, data):
if data is not None:
self._units.append(data)
DUMP_ARGS = re.compile(r"(\w+)=(.*)")
DUMP_UNIT = re.compile(r"\[([A-Z]\w+)\]")
class BootStrap(threading.Thread):
def __init__(self, db):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
self.db = db
def main(self):
self._parse_config_dump()
self._data_from_config_dump()
self.logger.debug("done")
self.db.update(self)
def _set_key(self, line):
matches = DUMP_UNIT.finditer(line)
if matches:
for match in matches:
unit = match.group(1)
return unit
else:
return None
def _split_argument(self, line):
matches = DUMP_ARGS.finditer(line)
if matches:
for match in matches:
arg = match.group(1)
val = match.group(2)
return arg, val
else:
return None
def _parse_config_dump(self):
s = subprocess.Popen("systemd --dump-configuration --no-pager".split(), stdout=subprocess.PIPE)
s.wait()
dump = s.stdout.read().decode("utf-8")
data = {}
for line in dump.split():
new_key = self._set_key(line)
if new_key is not None:
key = new_key
if key not in data.keys():
data[key] = []
continue
arg = self._split_argument(line)
if arg is None:
continue
data[key].append(arg)
self._data = data
def _data_from_config_dump(self):
data = []
for key, possible_values in self._data.items():
for name, value in possible_values:
all_args = self.db.get_all_args()
known_args = [a for a in all_args if a.name == name]
if not len(known_args):
arg = SystemdArgument(name, value=value, unit=key)
self.db.update(arg)
self.logger.debug("adding unknown {}, for {}".format(name, key))
else:
arg = known_args[0]
arg.value = value
arg.unit = key
self.logger.debug("updating known {}".format(name))
class Directive(threading.Thread):
def __init__(self, db):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
s = subprocess.Popen("man --where systemd.directives".split(), stdout=subprocess.PIPE)
self._path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
self.db = db
def main(self):
self.parse()
self.logger.debug("done")
self.db.update(self)
def parse(self):
self.logger.debug("opening man page to parse")
with gzip.open(self._path, "rb") as fp:
self._man = fp.read().decode("utf-8")
self._plain = pypandoc.convert_text(self._man, format="man", to="plain", extra_args=("--wrap=none",))
matches = DIRECTIVES_ARGUMENT.finditer(self._plain)
if matches:
for match in matches:
try:
name = match.group(1)
mans = match.group(2)
mans = mans.replace(",", "")
mans = mans.split()
mans = [m.split("(")[0] for m in mans]
all_args = self.db.get_all_args()
known_args = [a for a in all_args if a.name == name]
if not len(known_args):
arg = SystemdArgument(name, value=None, unit=None, doc="None", path=mans)
self.db.update(arg)
self.logger.debug("adding unknown {}".format(name))
else:
arg = known_args[0]
arg.path = mans
self.logger.debug("updating known {}".format(name))
#for arg in args:
# arg = arg.split("(")[0]
# if arg in man_collection:
# continue
# man_collection.add(arg)
# s = subprocess.Popen("man --where {}".format(arg).split(), stdout=subprocess.PIPE)
# path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
# if path.exists():
# if not ManPage.is_parsed(path):
# man = ManPage(path)
except Exception as e:
pass
class Completor(object):
def __init__(self):
self.logger = log.getChild(self.__class__.__name__)
self.db = DataBase()
def stats(self):
print(len(self.db.get_all_args()))
def get_units(self):
units = [a.units for a in self.db.get_all_args()]
unique = []
for unit in units:
if isinstance(unit, list):
unique.extend(unit)
else:
unique.append(unit)
unique = set(unique)
return unique
def get_all(self, pattern, unit=None):
return list(self.db.search(pattern, unit))
def get_one(self, pattern):
data = list(self.db.search(pattern))
if data:
return data[0]
if __name__ == "__main__":
log = logging.root
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
log.info("hello world")
app = Completor()
while True:
result = input("?")
if result == "s":
app.stats()
else:
for i in app.get_all(result, None):
print(i.name, i.path, i.units, i.doc)