beta push

This commit is contained in:
waldek 2021-09-09 18:31:23 +02:00
commit e7b93c5dda
13 changed files with 1116 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
venv/
build/
*.egg-info/
dist/
__pycache__/

19
confls/__init__.py Normal file
View File

@ -0,0 +1,19 @@
# Copyright 2017 Palantir Technologies, Inc.
import os
import sys
import pluggy
#from ._version import get_versions
if sys.version_info[0] < 3:
from future.standard_library import install_aliases
install_aliases()
#__version__ = get_versions()['version']
#del get_versions
PYLS = 'confls'
hookspec = pluggy.HookspecMarker(PYLS)
hookimpl = pluggy.HookimplMarker(PYLS)
IS_WIN = os.name == 'nt'

121
confls/__main__.py Normal file
View File

@ -0,0 +1,121 @@
# Copyright 2017 Palantir Technologies, Inc.
import argparse
import logging
import logging.config
import sys
from .configuration_ls import ConfigurationLanguageServer, start_io_lang_server, start_tcp_lang_server
LOG_FORMAT = "%(asctime)s UTC - %(levelname)s - %(name)s - %(message)s"
def add_arguments(parser):
parser.description = "Configuration Language Server"
parser.add_argument(
"--tcp", action="store_true",
help="Use TCP server instead of stdio"
)
parser.add_argument(
"--host", default="127.0.0.1",
help="Bind to this address"
)
parser.add_argument(
"--port", type=int, default=2087,
help="Bind to this port"
)
parser.add_argument(
'--check-parent-process', action="store_true",
help="Check whether parent process is still alive using os.kill(ppid, 0) "
"and auto shut down language server process when parent process is not alive."
"Note that this may not work on a Windows machine."
)
log_group = parser.add_mutually_exclusive_group()
log_group.add_argument(
"--log-config",
help="Path to a JSON file containing Python logging config."
)
log_group.add_argument(
"--log-file",
help="Redirect logs to the given file instead of writing to stderr."
"Has no effect if used with --log-config."
)
parser.add_argument(
'-v', '--verbose', action='count', default=0,
help="Increase verbosity of log output, overrides log config file"
)
def main():
parser = argparse.ArgumentParser()
add_arguments(parser)
args = parser.parse_args()
_configure_logger(args.verbose, args.log_config, args.log_file)
if args.tcp:
start_tcp_lang_server(args.host, args.port, args.check_parent_process,
ConfigurationLanguageServer)
else:
stdin, stdout = _binary_stdio()
start_io_lang_server(stdin, stdout, args.check_parent_process,
ConfigurationLanguageServer)
def _binary_stdio():
"""Construct binary stdio streams (not text mode).
This seems to be different for Window/Unix Python2/3, so going by:
https://stackoverflow.com/questions/2850893/reading-binary-data-from-stdin
"""
PY3K = sys.version_info >= (3, 0)
if PY3K:
# pylint: disable=no-member
stdin, stdout = sys.stdin.buffer, sys.stdout.buffer
else:
# Python 2 on Windows opens sys.stdin in text mode, and
# binary data that read from it becomes corrupted on \r\n
if sys.platform == "win32":
# set sys.stdin to binary mode
# pylint: disable=no-member,import-error
import os
import msvcrt
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
stdin, stdout = sys.stdin, sys.stdout
return stdin, stdout
def _configure_logger(verbose=0, log_config=None, log_file=None):
root_logger = logging.root
if log_config:
with open(log_config, 'r') as f:
logging.config.dictConfig(json.load(f))
else:
formatter = logging.Formatter(LOG_FORMAT)
if log_file:
log_handler = logging.handlers.RotatingFileHandler(
log_file, mode='a', maxBytes=50*1024*1024,
backupCount=10, encoding=None, delay=0
)
else:
log_handler = logging.StreamHandler()
log_handler.setFormatter(formatter)
root_logger.addHandler(log_handler)
if verbose == 0:
level = logging.WARNING
elif verbose == 1:
level = logging.INFO
elif verbose >= 2:
level = logging.DEBUG
root_logger.setLevel(level)
if __name__ == '__main__':
main()

View File

56
confls/config/config.py Normal file
View File

@ -0,0 +1,56 @@
import logging
import logging.config
import pyls.config.config as config
import pyls.uris as uris
import pyls._utils as _utils
import pkg_resources
import pluggy
from confls import hookspecs, PYLS
log = logging.getLogger(__name__)
class ConfigurationConfig(config.Config):
def __init__(self, root_uri, init_opts, process_id, capabilities):
self._root_path = uris.to_fs_path(root_uri)
self._root_uri = root_uri
self._init_opts = init_opts
self._process_id = process_id
self._capabilities = capabilities
self._settings = {}
self._plugin_settings = {}
self._config_sources = {}
self._pm = pluggy.PluginManager(PYLS)
self._pm.trace.root.setwriter(log.debug)
self._pm.enable_tracing()
self._pm.add_hookspecs(hookspecs)
# Pluggy will skip loading a plugin if it throws a DistributionNotFound exception.
# However I don't want all plugins to have to catch ImportError and re-throw. So here we'll filter
# out any entry points that throw ImportError assuming one or more of their dependencies isn't present.
for entry_point in pkg_resources.iter_entry_points(PYLS):
try:
entry_point.load()
except ImportError as e:
log.warning("Failed to load %s entry point '%s': %s", PYLS, entry_point.name, e)
self._pm.set_blocked(entry_point.name)
# Load the entry points into pluggy, having blocked any failing ones
self._pm.load_setuptools_entrypoints(PYLS)
for name, plugin in self._pm.list_name_plugin():
if plugin is not None:
log.info("Loaded pyls plugin %s from %s", name, plugin)
for plugin_conf in self._pm.hook.pyls_settings(config=self):
self._plugin_settings = _utils.merge_dicts(self._plugin_settings, plugin_conf)
self._update_disabled_plugins()

148
confls/configuration_ls.py Normal file
View File

@ -0,0 +1,148 @@
import logging
import string
import logging.config
import threading
import socketserver
from functools import partial
from pyls.python_ls import PythonLanguageServer, _StreamHandlerWrapper, PARENT_PROCESS_WATCH_INTERVAL, flatten, merge
import pyls.lsp as lsp
from pyls.workspace import Workspace
import pyls.uris as uris
import pyls._utils as _utils
from .config import config
log = logging.getLogger(__name__)
def start_tcp_lang_server(bind_addr, port, check_parent_process, handler_class):
if not issubclass(handler_class, PythonLanguageServer):
raise ValueError('Handler class must be an instance of PythonLanguageServer')
def shutdown_server(check_parent_process, *args):
# pylint: disable=unused-argument
if check_parent_process:
log.debug('Shutting down server')
# Shutdown call must be done on a thread, to prevent deadlocks
stop_thread = threading.Thread(target=server.shutdown)
stop_thread.start()
# Construct a custom wrapper class around the user's handler_class
wrapper_class = type(
handler_class.__name__ + 'Handler',
(_StreamHandlerWrapper,),
{'DELEGATE_CLASS': partial(handler_class,
check_parent_process=check_parent_process),
'SHUTDOWN_CALL': partial(shutdown_server, check_parent_process)}
)
server = socketserver.TCPServer((bind_addr, port), wrapper_class, bind_and_activate=False)
server.allow_reuse_address = True
try:
server.server_bind()
server.server_activate()
log.info('Serving %s on (%s, %s)', handler_class.__name__, bind_addr, port)
server.serve_forever()
finally:
log.info('Shutting down')
server.server_close()
def start_io_lang_server(rfile, wfile, check_parent_process, handler_class):
if not issubclass(handler_class, PythonLanguageServer):
raise ValueError('Handler class must be an instance of PythonLanguageServer')
log.info('Starting %s IO language server', handler_class.__name__)
server = handler_class(rfile, wfile, check_parent_process)
server.start()
class ConfigurationLanguageServer(PythonLanguageServer):
def __init__(self, rx, tx, check_parent_process):
PythonLanguageServer.__init__(self, rx, tx, check_parent_process)
def m_initialize(self, processId=None, rootUri=None, rootPath=None, initializationOptions=None, **_kwargs):
log.debug('Language server initialized with %s %s %s %s', processId, rootUri, rootPath, initializationOptions)
if rootUri is None:
rootUri = uris.from_fs_path(rootPath) if rootPath is not None else ''
self.workspaces.pop(self.root_uri, None)
self.root_uri = rootUri
self.config = config.ConfigurationConfig(rootUri, initializationOptions or {},
processId, _kwargs.get('capabilities', {}))
self.workspace = Workspace(rootUri, self._endpoint, self.config)
self.workspaces[rootUri] = self.workspace
self._dispatchers = self._hook('pyls_dispatchers')
self._hook('pyls_initialize')
if self._check_parent_process and processId is not None and self.watching_thread is None:
def watch_parent_process(pid):
# exit when the given pid is not alive
if not _utils.is_process_alive(pid):
log.info("parent process %s is not alive, exiting!", pid)
self.m_exit()
else:
threading.Timer(PARENT_PROCESS_WATCH_INTERVAL, watch_parent_process, args=[pid]).start()
self.watching_thread = threading.Thread(target=watch_parent_process, args=(processId,))
self.watching_thread.daemon = True
self.watching_thread.start()
# Get our capabilities
return {'capabilities': self.capabilities()}
def completions(self, doc_uri, position):
completions = self._hook('pyls_completions', doc_uri, position=position)
return {
'isIncomplete': False,
'items': flatten(completions)
}
def capabilities(self):
server_capabilities = {
'codeActionProvider': True,
'codeLensProvider': {
'resolveProvider': False, # We may need to make this configurable
},
'completionProvider': {
'resolveProvider': False, # We know everything ahead of time
'triggerCharacters': [l for l in string.ascii_letters]
},
'documentFormattingProvider': True,
'documentHighlightProvider': True,
'documentRangeFormattingProvider': True,
'documentSymbolProvider': True,
'definitionProvider': True,
'executeCommandProvider': {
'commands': flatten(self._hook('pyls_commands'))
},
'hoverProvider': True,
'referencesProvider': True,
'renameProvider': True,
'foldingRangeProvider': True,
'signatureHelpProvider': {
'triggerCharacters': [],
},
'hover': {
"contentFormat": "markdown",
},
'textDocumentSync': {
'change': lsp.TextDocumentSyncKind.INCREMENTAL,
'save': {
'includeText': True,
},
'openClose': True,
},
'workspace': {
'workspaceFolders': {
'supported': True,
'changeNotifications': True
}
},
'experimental': merge(self._hook('pyls_experimental_capabilities'))
}
log.info('Server capabilities: %s', server_capabilities)
return server_capabilities

View File

@ -0,0 +1,3 @@
from .systemd import Completor
SYSTEMD_COMPLETOR = Completor()

462
confls/helpers/systemd.py Normal file
View File

@ -0,0 +1,462 @@
#!/usr/bin/python3
import pathlib
import logging
import gzip
import pypandoc
import re
import subprocess
from collections import Counter
import threading
import queue
SYSTEMD_ARGUMENT = re.compile(r'\*(\w+)\=\*\s{1,}((>.*\s{1,}){1,})')
SYSTEMD_UNIT = re.compile(r'\\\[([A-Z][a-z]\w+)\\\]')
SYSTEMD_DIGG = re.compile(r'\*\*(.*)\*\*')
DOC_PATH = "/usr/share/man"
DIRECTIVES_ARGUMENT = re.compile(r"(\w+)=\s+(.*)\(.\)")
log = logging.getLogger(__name__)
class ManPage(threading.Thread):
INSTANCES = []
@classmethod
def is_parsed(cls, path):
if str(path) in cls.INSTANCES:
return True
return False
def __init__(self, db, arg):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
self.db = db
self._arg = arg
def main(self):
if ManPage.is_parsed(str(self._arg)):
self.logger.debug("{} already parsed".format(self._arg))
else:
ManPage.INSTANCES.append(str(self._arg))
s = subprocess.Popen("man --where {}".format(self._arg).split(), stdout=subprocess.PIPE)
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
if path.exists() and not path.is_dir():
self.parse(path)
self.db.update(self)
def parse(self, path):
self.logger.debug("parsing {}".format(path))
with gzip.open(path, "rb") as fp:
man = fp.read().decode("utf-8")
md = pypandoc.convert_text(man, format="man", to="md")
matches = SYSTEMD_ARGUMENT.finditer(md)
if matches:
for match in matches:
name = match.group(1)
doc = match.group(2)
all_args = self.db.get_all_args()
known_args = [a for a in all_args if a.name == name]
if not len(known_args):
arg = SystemdArgument(name, doc=(self._arg, doc))
self.db.update(arg)
self.logger.debug("adding unknown {}".format(name))
else:
arg = known_args[0]
arg.doc = (self._arg, doc)
self.logger.debug("updating known {} with doc".format(name))
@property
def path(self):
return pathlib.Path(self._path)
@property
def md(self):
return self._md
@property
def man(self):
return self._man
@property
def args(self):
return self._arguments
@property
def units(self):
units = set(self._units)
return units
class SystemdDirective(ManPage):
def __init__(self):
s = subprocess.Popen("man --where systemd.directives".split(), stdout=subprocess.PIPE)
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
ManPage.__init__(self, path)
@property
def plain(self):
return self._plain
def parse(self):
with gzip.open(self.path, "rb") as fp:
self._man = fp.read().decode("utf-8")
self._plain = pypandoc.convert_text(self._man, format="man", to="plain", extra_args=("--wrap=none",))
matches = DIRECTIVES_ARGUMENT.finditer(self.plain)
man_collection = set()
self.test = []
if matches:
for match in matches:
try:
name = match.group(1)
self.test.append(name)
args = match.group(2)
args = args.replace(",", "")
args = args.split()
for arg in args:
arg = arg.split("(")[0]
if arg in man_collection:
continue
man_collection.add(arg)
s = subprocess.Popen("man --where {}".format(arg).split(), stdout=subprocess.PIPE)
path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
if path.exists():
if not ManPage.is_parsed(path):
man = ManPage(path)
except Exception as e:
print(e)
class SystemdUnit(object):
INSTANCES = []
@classmethod
def exists(cls, name):
if name in [n.name for n in cls.INSTANCES]:
return True
return False
@classmethod
def get_unit(cls, name):
unit = [u for u in cls.INSTANCES if u.name == name.lower()]
return unit[0]
def __init__(self, name):
self._name = name
if name not in [n.name for n in self.__class__.INSTANCES]:
self.__class__.INSTANCES.append(self)
self._data = []
def add_arguments(self, arguments):
self._data.extend(arguments)
@property
def name(self):
return self._name.lower()
@property
def man(self):
for path, man in self._data.items():
yield man
@property
def units(self):
for path, man in self._data.items():
for unit in man.units:
yield unit
@property
def arguments(self):
for arg in self._data:
yield arg
def search(self, pattern):
pattern = pattern.lower()
flat = [a for a in self.arguments if pattern in a.name.lower()]
for f in flat:
yield f
class DataBase(threading.Thread):
def __init__(self):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
self._db = []
self._queue = queue.Queue()
self.start()
def setup_collection_threads(self):
self.t_bootstrap = BootStrap(self)
self.t_bootstrap.start()
self.t_directive = Directive(self)
self.t_directive.start()
def main(self):
self.logger.info("starting database thread")
self.setup_collection_threads()
while True:
data = self._queue.get()
if isinstance(data, bool):
self.logger.critical("stopping")
break
elif isinstance(data, threading.Thread):
self.logger.critical("joining {}".format(data.name))
data.join()
if not isinstance(data, ManPage):
self.logger.info("scanning manpages...")
for arg in self.get_all_args():
for path in arg.path:
if not ManPage.is_parsed(path):
t_manpage = ManPage(self, path)
t_manpage.start()
elif isinstance(data, list):
self._db.extend(data)
elif isinstance(data, SystemdArgument):
self._db.append(data)
def update(self, data):
self._queue.put(data)
def stats(self):
count = Counter(self._db)
print(len(count))
def get_all_args(self):
args = [a for a in self._db]
return args
def get_unit_args(self, unit):
args = [a for a in self._db if unit in a.units]
return args
def search(self, pattern, unit=None):
if unit is None:
args = self.get_all_args()
else:
args = self.get_unit_args(unit)
pattern = pattern.lower()
flat = [a for a in args if pattern in a.name.lower()]
for f in flat:
yield f
class SystemdArgument(object):
def __init__(self, name, value=None, unit=None, doc=None, path=None):
self._name = name
self._units = []
self._path = []
self._doc = []
self._value = value
self.units = unit
self.path = path
@property
def name(self):
return self._name
@property
def value(self):
return self._value
@property
def doc(self):
# doc = pypandoc.convert("\n".join(self._doc), format="md", to="plain")
# return doc.strip().replace("\n", "")
doc = "Doc:\n\n"
for src, content in self._doc:
doc += "{}:\n\n{}\n\n".format(src, content.replace(">", ""))
return doc
@doc.setter
def doc(self, data):
if data is not None:
self._doc.append(data)
@property
def doc_md(self):
return self._doc
@property
def path(self):
return self._path
@path.setter
def path(self, data):
if data is not None:
if isinstance(data, list):
self._path.extend(data)
else:
self._path.append(data)
@property
def units(self):
return self._units
@units.setter
def units(self, data):
if data is not None:
self._units.append(data)
DUMP_ARGS = re.compile(r"(\w+)=(.*)")
DUMP_UNIT = re.compile(r"\[([A-Z]\w+)\]")
class BootStrap(threading.Thread):
def __init__(self, db):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
self.db = db
def main(self):
self._parse_config_dump()
self._data_from_config_dump()
self.logger.debug("done")
self.db.update(self)
def _set_key(self, line):
matches = DUMP_UNIT.finditer(line)
if matches:
for match in matches:
unit = match.group(1)
return unit
else:
return None
def _split_argument(self, line):
matches = DUMP_ARGS.finditer(line)
if matches:
for match in matches:
arg = match.group(1)
val = match.group(2)
return arg, val
else:
return None
def _parse_config_dump(self):
s = subprocess.Popen("systemd --dump-configuration --no-pager".split(), stdout=subprocess.PIPE)
s.wait()
dump = s.stdout.read().decode("utf-8")
data = {}
for line in dump.split():
new_key = self._set_key(line)
if new_key is not None:
key = new_key
if key not in data.keys():
data[key] = []
continue
arg = self._split_argument(line)
if arg is None:
continue
data[key].append(arg)
self._data = data
def _data_from_config_dump(self):
data = []
for key, possible_values in self._data.items():
for name, value in possible_values:
all_args = self.db.get_all_args()
known_args = [a for a in all_args if a.name == name]
if not len(known_args):
arg = SystemdArgument(name, value=value, unit=key)
self.db.update(arg)
self.logger.debug("adding unknown {}, for {}".format(name, key))
else:
arg = known_args[0]
arg.value = value
arg.unit = key
self.logger.debug("updating known {}".format(name))
class Directive(threading.Thread):
def __init__(self, db):
threading.Thread.__init__(self, target=self.main, name=self.__class__.__name__)
self.logger = log.getChild(self.__class__.__name__)
s = subprocess.Popen("man --where systemd.directives".split(), stdout=subprocess.PIPE)
self._path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
self.db = db
def main(self):
self.parse()
self.logger.debug("done")
self.db.update(self)
def parse(self):
self.logger.debug("opening man page to parse")
with gzip.open(self._path, "rb") as fp:
self._man = fp.read().decode("utf-8")
self._plain = pypandoc.convert_text(self._man, format="man", to="plain", extra_args=("--wrap=none",))
matches = DIRECTIVES_ARGUMENT.finditer(self._plain)
if matches:
for match in matches:
try:
name = match.group(1)
mans = match.group(2)
mans = mans.replace(",", "")
mans = mans.split()
mans = [m.split("(")[0] for m in mans]
all_args = self.db.get_all_args()
known_args = [a for a in all_args if a.name == name]
if not len(known_args):
arg = SystemdArgument(name, value=None, unit=None, doc="None", path=mans)
self.db.update(arg)
self.logger.debug("adding unknown {}".format(name))
else:
arg = known_args[0]
arg.path = mans
self.logger.debug("updating known {}".format(name))
#for arg in args:
# arg = arg.split("(")[0]
# if arg in man_collection:
# continue
# man_collection.add(arg)
# s = subprocess.Popen("man --where {}".format(arg).split(), stdout=subprocess.PIPE)
# path = pathlib.Path(s.stdout.read().decode("utf-8").strip())
# if path.exists():
# if not ManPage.is_parsed(path):
# man = ManPage(path)
except Exception as e:
pass
class Completor(object):
def __init__(self):
self.logger = log.getChild(self.__class__.__name__)
self.db = DataBase()
def stats(self):
print(len(self.db.get_all_args()))
def get_units(self):
units = [a.units for a in self.db.get_all_args()]
unique = []
for unit in units:
if isinstance(unit, list):
unique.extend(unit)
else:
unique.append(unit)
unique = set(unique)
return unique
def get_all(self, pattern, unit=None):
return list(self.db.search(pattern, unit))
def get_one(self, pattern):
data = list(self.db.search(pattern))
if data:
return data[0]
if __name__ == "__main__":
log = logging.root
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
log.info("hello world")
app = Completor()
while True:
result = input("?")
if result == "s":
app.stats()
else:
for i in app.get_all(result, None):
print(i.name, i.path, i.units, i.doc)

120
confls/hookspecs.py Normal file
View File

@ -0,0 +1,120 @@
from confls import hookspec
@hookspec
def pyls_code_actions(config, workspace, document, range, context):
pass
@hookspec
def pyls_code_lens(config, workspace, document):
pass
@hookspec
def pyls_commands(config, workspace):
"""The list of command strings supported by the server.
Returns:
List[str]: The supported commands.
"""
@hookspec
def pyls_completions(config, workspace, document, position):
pass
@hookspec
def pyls_definitions(config, workspace, document, position):
pass
@hookspec
def pyls_dispatchers(config, workspace):
pass
@hookspec
def pyls_document_did_open(config, workspace, document):
pass
@hookspec
def pyls_document_did_save(config, workspace, document):
pass
@hookspec
def pyls_document_highlight(config, workspace, document, position):
pass
@hookspec
def pyls_document_symbols(config, workspace, document):
pass
@hookspec(firstresult=True)
def pyls_execute_command(config, workspace, command, arguments):
pass
@hookspec
def pyls_experimental_capabilities(config, workspace):
pass
@hookspec
def pyls_folding_range(config, workspace, document):
pass
@hookspec(firstresult=True)
def pyls_format_document(config, workspace, document):
pass
@hookspec(firstresult=True)
def pyls_format_range(config, workspace, document, range):
pass
@hookspec(firstresult=True)
def pyls_hover(config, workspace, document, position):
pass
@hookspec
def pyls_initialize(config, workspace):
pass
@hookspec
def pyls_initialized():
pass
@hookspec
def pyls_lint(config, workspace, document, is_saved):
pass
@hookspec
def pyls_references(config, workspace, document, position, exclude_declaration):
pass
@hookspec(firstresult=True)
def pyls_rename(config, workspace, document, position, new_name):
pass
@hookspec
def pyls_settings(config):
pass
@hookspec(firstresult=True)
def pyls_signature_help(config, workspace, document, position):
pass

View File

View File

@ -0,0 +1,120 @@
# Copyright 2017 Palantir Technologies, Inc.
import logging
import subprocess
import re
import string
from pyls import _utils, lsp
from confls import hookimpl
from confls.helpers import SYSTEMD_COMPLETOR
log = logging.getLogger(__name__)
def setup():
s = subprocess.Popen("systemd --dump-configuration".split(), stdout=subprocess.PIPE)
lines = s.stdout.read().decode("utf-8")
args = lines.split()
args.append(None)
keys = re.findall(r"\[\w+\]", lines)
indexes = [args.index(key) for key in keys]
keys.append(None)
indexes.append(len(args))
slices = {key:(args.index(key) + 1, args.index(keys[keys.index(key) + 1])) for key in keys[:-1]}
data = {}
for key, value in slices.items():
start, end = value[0], value[1]
section_args = args[start:end]
t = []
for section_arg in section_args:
split = re.split(r"\=", section_arg)
try:
a = split[0]
b = split[1]
d = {}
d['key'] = a
d['val'] = b
t.append(d)
except:
print(split)
data[key] = t
return data
ARGS = setup()
#DIRECTIVE = helper.SystemdDirective()
def get_active_key(document, position):
lines = list(document.lines)[0:position["line"]]
lines.reverse()
for line in lines:
line = line.strip()
if re.match(r"\[\w+\]", line):
key = line
return key
return None
@hookimpl
def pyls_completions(config, document, position):
"""Get formatted completions for current code position"""
settings = config.plugin_settings('systemd', document_path=document.path)
key = get_active_key(document, position)
line = document.lines[position["line"]]
word = line[0:position['character']]
print(word, key)
try:
if key is None or "[" in word:
t = list(SYSTEMD_COMPLETOR.get_units())
completions = [_format_section(c) for c in t]
return completions
else:
key = key.replace("[", "").replace("]","")#.lower()
t = list(SYSTEMD_COMPLETOR.get_all(word, key))
completions = [_format_argument(c) for c in t]
return completions
except Exception as e:
print(e)
return None
def _format_argument(d):
completion = {
'label': "{}".format(d.name),
'detail': "Value type: {}".format(d.value),
'documentation': "Found in: " + " ".join(d.path),
'sortText': d.name,
'insertText': d.name,
}
return completion
def _format_section(d):
completion = {
'label': "{}".format(d),
'detail': "details about {}".format(d),
'documentation': "doc...",
'sortText': d,
'insertText': d,
}
return completion
def _label(definition):
sig = definition.get_signatures()
if definition.type in ('function', 'method') and sig:
params = ', '.join(param.name for param in sig[0].params)
return '{}({})'.format(definition.name, params)
return definition.name
def _detail(definition):
try:
return definition.parent().full_name or ''
except AttributeError:
return definition.full_name or ''

View File

@ -0,0 +1,20 @@
# Copyright 2017 Palantir Technologies, Inc.
import logging
from confls import hookimpl
from pyls import _utils
from confls.helpers import SYSTEMD_COMPLETOR
log = logging.getLogger(__name__)
@hookimpl
def pyls_hover(document, position):
code_position = _utils.position_to_jedi_linecolumn(document, position)
word = document.word_at_position(position)
details = SYSTEMD_COMPLETOR.get_one(word)
if details is not None:
return {'contents': details.doc, "contentFormat": "markdown"}
else:
return

42
setup.py Normal file
View File

@ -0,0 +1,42 @@
#!/usr/bin/env python
import sys
from setuptools import find_packages, setup
README = "test"
install_requires = [
'python-language-server',
]
setup(
name='configuration-language-server',
# Versions should comply with PEP440. For a discussion on single-sourcing
# the version across setup.py and the project code, see
# https://packaging.python.org/en/latest/single_source_version.html
description='Configuration Language Server for the Language Server Protocol',
install_requires=install_requires,
long_description=README,
# The project's main homepage.
url='https://github.com/palantir/python-language-server',
author='Palantir Technologies, Inc.',
# You can just specify the packages manually here if your project is
# simple. Or you can use find_packages().
packages=find_packages(exclude=['contrib', 'docs', 'test', 'test.*']),
entry_points={
'console_scripts': [
'confls = confls.__main__:main',
],
'confls': [
'systemd = confls.plugins.completion_systemd',
'hover = confls.plugins.hover_systemd',
]
},
)