Source code for rcontrol.core

# -*- coding: utf-8

# This file is part of rcontrol.
#
# rcontrol is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# rcontrol is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with rcontrol. If not, see <http://www.gnu.org/licenses/>.

import sys
import threading
import six
from collections import OrderedDict
from rcontrol import fs
import abc
import warnings


[docs]class BaseTaskError(Exception): """Raised on a task error. All tasks errors inherit from this."""
class TaskError(BaseTaskError): """Raised on a task error""" def __init__(self, session, task, msg): self.session = session self.task = task self.rawmsg = msg BaseTaskError.__init__(self, "%s: %s (%s)" % (session, task, msg))
[docs]class TimeoutError(TaskError): """Raise on a command timeout error"""
[docs]class ExitCodeError(TaskError): """Raised when the exit code of a command is unexpected"""
[docs]class TaskErrors(BaseTaskError): """A list of task errors""" def __init__(self, errors): self.errors = errors BaseTaskError.__init__(self, '\n'.join(str(e) for e in self.errors))
@six.add_metaclass(abc.ABCMeta)
[docs]class Task(object): """ Represent an asynchronous task. :param session: the session that is responsible of the task. It it accessible via the **session** attribute on the instance. :param on_done: if not None, should be a callback that takes the instance task as the parameter. It is called when the task is done (finished or timed out). If defined, :meth:`error_handled` will return True. """ def __init__(self, session, on_done=None): self.session = session self.__on_done = on_done self.explicit_wait = False # register the task instance to the session session._register_task(self) def _unregister(self): # this must be called by subclasses when the task needs to be # unregistered from the session. This is called from a thread, # when the task is finished (or for a timeout) self.session._unregister_task(self) if self.__on_done: self.__on_done(self)
[docs] def error_handled(self): """ Return True if the error must **not** be reported while using :meth:`BaseSession.wait_for_tasks`. By default, the error is handled if **on_done** was specified in the constructor. """ return bool(self.__on_done)
@abc.abstractmethod
[docs] def is_running(self): """ Return True if the task is running. """
@abc.abstractmethod
[docs] def error(self): """ Return an instance of a :class:`BaseTaskError` or None. """
[docs] def raise_if_error(self): """ Check if an error occured and raise it if any. """ error = self.error() if error: raise error
@abc.abstractmethod def _wait(self, raise_if_error): pass
[docs] def wait(self, raise_if_error=True): """ Block and wait until the task is finished. :param raise_if_error: if True, call :meth:`raise_if_error` at the end. """ self.explicit_wait = True return self._wait(raise_if_error=raise_if_error)
def _async(meth, name): def new_meth(self, *args, **kwargs): on_done = kwargs.pop('on_done', None) return ThreadableTask(self, meth, (self,) + args, kwargs, on_done=on_done) new_meth.__name__ = name new_meth.__doc__ = """ Asynchronous version of :meth:`%s`. This method returns an instance of a :class:`ThreadableTask`. Note that you can use the **on_done** keyword argument to define a callback that will be called at the end of the execution (see the :class:`Task` constructor). """ % meth.__name__ return new_meth class TaskCache(object): def __init__(self): self._cache = set() def update(self, tasks): self._cache.update(id(t) for t in tasks) def __contains__(self, task): return id(task) in self._cache @six.add_metaclass(abc.ABCMeta)
[docs]class BaseSession(object): """ Represent an abstraction of a session on a remote or local machine. """ def __init__(self, auto_close=True): # a lock for tasks and silent errors access self._lock = threading.Lock() self._tasks = [] # silent errors are errors from tasks that are not waited # explicitly. As a task is unregistered from the session once # it is finished, we save in this list the errors of tasks # that are finished before wait_for_tasks is called. self._silent_errors = [] self.auto_close = auto_close def _register_task(self, task): assert isinstance(task, Task) with self._lock: self._tasks.append(task) def _unregister_task(self, task): with self._lock: try: self._tasks.remove(task) except ValueError: pass # this should not happen # keep silent error if not task.error_handled() and not task.explicit_wait: error = task.error() if error: self._silent_errors.append(error)
[docs] def tasks(self): """ Return a copy of the currently active tasks. """ with self._lock: return self._tasks[:]
[docs] def wait_for_tasks(self, raise_if_error=True): """ Wait for the running tasks launched from this session. If any errors are encountered, they are raised or returned depending on **raise_if_error**. Note that this contains errors reported from silently finished tasks (tasks ran and finished in backround without explicit wait call on them). Tasks started from another task callback (like on_finished) are also waited here. This is not required to call this method explictly if you use the :class:`BaseSession` or the :class:`SessionManager` with the **with** keyword. :param raise_if_error: If True, errors are raised using :class:`TaskErrors`. Else the errors are returned as a list. """ errors = [] # in case tasks do not unregister themselves we do not want to # loop infinitely tasks_seen = TaskCache() # we do a while loop to ensure that tasks started from callbacks # are waited too. while True: with self._lock: # bring back to life silent errors errors.extend(self._silent_errors) tasks = [t for t in self._tasks if t not in tasks_seen] if not tasks: with self._lock: # now clean the silent errors self._silent_errors = [] break for task in tasks: task.wait(raise_if_error=False) if not task.error_handled(): error = task.error() if error: errors.append(error) with self._lock: # now clean the silent errors self._silent_errors = [] tasks_seen.update(tasks) if raise_if_error and errors: raise TaskErrors(errors) return errors
@abc.abstractmethod
[docs] def open(self, filename, mode='r', bufsize=-1): """ Return an opened file object. :param filename: the file path to open :param mode: the mode used to open the file :param bufsize: buffer size """
@abc.abstractmethod
[docs] def execute(self, command, **kwargs): """ Execute a command in an asynchronous way. Return an instance of a subclass of a :class:`CommandTask`. :param command: the command to execute (a string) :param kwargs: named arguments passed to the constructor of the class:`CommandTask` subclass. """
@abc.abstractmethod
[docs] def walk(self, top, topdown=True, onerror=None, followlinks=False): """ Walk the file system. Equivalent to os.walk. """
@abc.abstractmethod
[docs] def mkdir(self, path): """ Create a directory. Equivalent to os.mkdir. """
@abc.abstractmethod
[docs] def exists(self, path): """ Return True if the path exists. Equivalent to os.path.exists. """
@abc.abstractmethod
[docs] def isdir(self, path): """ Return True if the path is a directory. Equivalent to os.path.isdir. """
@abc.abstractmethod
[docs] def s_copy_file(self, src, dest_os, dest, chunk_size=16384): """ Copy a file from this session to another session. :param src: full path of the file to copy in this session :param dest_os: session to copy to :param dest: full path of the file to copy in the dest session """ fs.copy_file(self, src, dest_os, dest, chunk_size=chunk_size)
copy_file = _async(s_copy_file, "copy_file")
[docs] def s_copy_dir(self, src, dest_session, dest, chunk_size=16384): """ Recursively copy a directory from a session to another one. **dest** must not exist, it will be created automatically. :param src: path of the dir to copy in this session :param dest_session: session to copy to :param dest: path of the dir to copy in the dest session (must not exists) """ fs.copy_dir(self, src, dest_session, dest, chunk_size=chunk_size)
copy_dir = _async(s_copy_dir, "copy_dir")
[docs] def close(self): """ Close the session. """
def __enter__(self): return self def __exit__(self, type, value, traceback): errors = self.wait_for_tasks(raise_if_error=False) if self.auto_close: self.close() if errors: if value is None: # no exceptions in the with block -> let's raise # the errors raise TaskErrors(errors) else: # TODO: for now, just print errors if any for error in errors: print('ERROR: %s' % error)
[docs]class SessionManager(OrderedDict): """ A specialized OrderedDict that keep sessions instances. It can be used like a namespace: :: sess_manager.local = LocalSession() # equivalent to: # sess_manager['local'] = LocalSession() It should be used inside a **with** block, to wait for pending tasks and close sessions if needed automatically. """ def __setitem__(self, name, value): if not isinstance(name, six.string_types): raise TypeError('key must be an str instance') if not isinstance(value, BaseSession): raise TypeError('only BaseSession instances can be set') OrderedDict.__setitem__(self, name, value) def __setattr__(self, name, value): if isinstance(value, BaseSession): self[name] = value else: self.__dict__[name] = value def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError('%r does not exists' % name) def __delattr__(self, name): try: del self[name] except KeyError: OrderedDict.__delattr__(self, name)
[docs] def wait_for_tasks(self, raise_if_error=True): """ Wait for the running tasks lauched from the sessions. Note that it also wait for tasks that are started from other tasks callbacks, like on_finished. :param raise_if_error: if True, raise all possible encountered errors using :class:`TaskErrors`. Else the errors are returned as a list. """ errors = [] tasks_seen = TaskCache() while True: for session in self.values(): errs = session.wait_for_tasks(raise_if_error=False) errors.extend(errs) # look for tasks created after the wait (in callbacks of # tasks from different sessions) tasks = [] for session in self.values(): tasks.extend(session.tasks()) # if none, then just break - else loop to wait for them if not any(t for t in tasks if t not in tasks_seen): break if raise_if_error and errors: raise TaskErrors(errors) return errors
[docs] def close(self): """ close the sessions. """ for session in self.values(): session.close()
def __enter__(self): return self def __exit__(self, type, value, traceback): errors = self.wait_for_tasks(raise_if_error=False) for session in self.values(): if session.auto_close: session.close() if errors: if value is None: # no exceptions in the with block -> let's raise # the errors raise TaskErrors(errors) else: # TODO: for now, just print errors if any for error in errors: print('ERROR: %s' % error)
[docs]class CommandTask(Task): """ Base class that execute a command in an asynchronous way. It uses an internal stream reader (a subclass of :class:`streamreader.StreamsReader`) :param session: the session that run this command :param reader_class: the :class:`streamreader.StreamsReader` class to use :param command: the command to execute (a string) :param expected_exit_code: the expected exit code of the command. If None, there is no exit code expected. :param combine_stderr: if None, stderr and stdout will be automatically combined unless stderr_callback is defined. You can force to combine stderr or stdout by passing True or False. :param timeout: timeout in seconds for the task. If None, no timeout is set - else timeout_callback is called if the command has not finished in time. :param output_timeout: timeout in seconds for waiting output. If None, no timeout is set - else timeout_callback is called if there is no output in time. :param on_finished: a callable that takes one parameter, the command task instance. Called when the command is finished, but not on timeout. :param on_timeout: a callable that takes one parameter, the command task instance. Called on timeout. :param on_stdout: a callable that takes two parameter, the command task instance and the line read. Called on line read from stdout and possibly from stderr if streams are combined.. :param on_stderr: a callable that takes two parameter, the command task instance and the line read. Called on line read from stderr. """ def __init__(self, session, reader_class, command, expected_exit_code=0, combine_stderr=None, timeout=None, output_timeout=None, on_finished=None, on_timeout=None, on_stdout=None, on_stderr=None, on_done=None, # deprecated aliases finished_callback=None, timeout_callback=None, stdout_callback=None, stderr_callback=None): Task.__init__(self, session, on_done=on_done) if combine_stderr is None: combine_stderr = not stderr_callback self._combine_stderr = combine_stderr self.__exit_code = None self.__expected_exit_code = expected_exit_code self.__timed_out = False def _warn(name): msg = ("You should use on_%s instead of %s_callback" " in new code (it will be removed soon)") % (name, name) warnings.warn(msg) if finished_callback: _warn("finished") on_finished = finished_callback if timeout_callback: _warn("timeout") on_timeout = timeout_callback if stdout_callback: _warn("stdout") on_stdout = stdout_callback if stderr_callback: _warn("stderr") on_stderr = stderr_callback self.__finished_callback = on_finished self.__timeout_callback = on_timeout self.__stdout_callback = on_stdout self.__stderr_callback = on_stderr self._reader = reader_class( stdout_callback=self._on_stdout, stderr_callback=self._on_stderr, timeout=timeout, output_timeout=output_timeout, timeout_callback=self._on_timeout, finished_callback=self._on_finished ) def _set_exit_code(self, exit_code): self.__exit_code = exit_code def _on_stdout(self, line): if self.__stdout_callback: self.__stdout_callback(self, line) def _on_stderr(self, line): if self.__stderr_callback: self.__stderr_callback(self, line) def _on_timeout(self): self.__timed_out = True self._unregister() if self.__timeout_callback: self.__timeout_callback(self) def _on_finished(self): self._unregister() if self.__finished_callback: self.__finished_callback(self)
[docs] def timed_out(self): """ Return True if a timeout occured. """ return self.__timed_out
[docs] def is_running(self): """ Return True if the command is still running. """ return self._reader.is_alive()
[docs] def error(self): """ Return an instance of Exception if any, else None. Actually check for a :class:`TimeoutError` or a :class:`ExitCodeError`. """ if self.__timed_out: return TimeoutError(self.session, self, "timeout") if self.__exit_code is not None and \ self.__expected_exit_code is not None and \ self.__exit_code != self.__expected_exit_code: return ExitCodeError(self.session, self, 'bad exit code: Got %s' % self.__exit_code)
[docs] def exit_code(self): """ Return the exit code of the command, or None if the command is not finished yet. """ return self.__exit_code
def _wait(self, raise_if_error): if self._reader.is_alive(): self._reader.thread.join() if raise_if_error: self.raise_if_error() return self.__exit_code
[docs]class ThreadableTask(Task): """ A task ran in a background thread. """ def __init__(self, session, callable, args, kwargs, on_done=None): Task.__init__(self, session, on_done=on_done) # Set up exception handling self.exception = None def wrapper(*args, **kwargs): try: callable(*args, **kwargs) except Exception: self.exception = TaskError(session, self, sys.exc_info()[1]) finally: self._unregister() # Kick off thread name = getattr(callable, '__name__', None) thread = threading.Thread(None, wrapper, name, args, kwargs) thread.setDaemon(True) thread.start() # Make thread available to instantiator self.thread = thread def is_running(self): return self.thread.is_alive() def error(self): return self.exception def _wait(self, raise_if_error): if self.thread.is_alive(): self.thread.join() if raise_if_error: self.raise_if_error()