# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright 2018 Nextdoor.com, Inc
"""
:mod:`kingpin.actors.base`
^^^^^^^^^^^^^^^^^^^^^^^^^^
Base Actor object class
An Actor object is a class that executes a single logical action
on a resource as part of your deployment structure. For example, you
may have an Actor that launches a server array in RightScale, or you
may have one that sends an email.
Each Actor object should do one thing, and one thing only. Its responsible
for being able to execute the operation in both 'dry' and 'non-dry' modes.
The behavior for 'dry' mode can contain real API calls, but should not make
any live changes. It is up to the developer of the Actor to define what
'dry' mode looks like for that particular action.
"""
import inspect
import json
import logging
import os
import sys
import time
from tornado import gen
from tornado import httpclient
from tornado import httputil
from kingpin import utils
from kingpin.actors import exceptions
from kingpin.actors.utils import timer
from kingpin.constants import REQUIRED, STATE
log = logging.getLogger(__name__)
__author__ = 'Matt Wise <matt@nextdoor.com>'
# If super-debug logging is enabled, then we turn on the URLLIB3 HTTP
# request logging. This is extremely verbose and insecure, but useful
# for troubleshooting. URLLIB3 is used by several actors (aws, rightscale),
# so we do this setup here in the base actor class.
if os.getenv('URLLIB_DEBUG', None):
utils.super_httplib_debug_logging()
# Allow the user to override the default_timeout for all actors by setting an
# environment variable
DEFAULT_TIMEOUT = os.getenv('DEFAULT_TIMEOUT', 3600)
[docs]class LogAdapter(logging.LoggerAdapter):
"""Simple Actor Logging Adapter.
Provides a common logging format for actors that uses the actors
description and dry parameter as a prefix to the supplied log message.
"""
def process(self, msg, kwargs):
return ('[%s%s] %s' % (self.extra['dry'], self.extra['desc'], msg),
kwargs)
[docs]class BaseActor(object):
"""Abstract base class for Actor objects."""
# {
# 'option_name': (type, default, "Long description of the option"),
# }
#
# If `default` is REQUIRED then the option requires user specified input
#
# Example:
# {
# 'room': (str, REQUIRED, 'Hipchat room to notify'),
# 'from': (str, 'Kingpin', 'User that sends the message')
# }
all_options = {}
# Default description format
desc = "{actor}"
# Set the default timeout for the gen.with_timeout() wrapper that we use to
# monitor and control the length of execution of a single Actor.
default_timeout = DEFAULT_TIMEOUT
# Context separators. These define the left-and-right identifiers of a
# 'contextual token' in the actor. By default this is { and }, so a
# contextual token looks like '{KEY}'.
left_context_separator = '{'
right_context_separator = '}'
# Ensure that at __init__ time, if the self._options dict is not completely
# filled in properly (meaning there are no left-over {KEY}'s), we throw an
# exception. This will change in the future when we have some concept of a
# second 'global runtime context object'.
strict_init_context = True
def __init__(self, desc=None, options={}, dry=False, warn_on_failure=False,
condition=True, init_context={}, init_tokens={},
timeout=None):
"""Initializes the Actor.
Args:
desc: (Str) description of the action being executed.
options: (Dict) Key/Value pairs that have the options
for this action. Values should be primitives.
dry: (Bool) or not this Actor will actually make changes.
warn_on_failure: (Bool) Whether this actor ignores its return
value and always succeeds (but warns).
condition: (Bool) Whether to run this actor.
init_context: (Dict) Key/Value pairs used at instantiation
time to replace {KEY} strings in the actor definition.
This is usually driven by the group.Sync/Async actors.
init_tokens: (Dict) Key/Value pairs passed into the actor that can
be used for token replacement. Typically this is os.environ() plus
some custom tokens. Set generally by the misc.Macro actor.
timeout: (Str/Int/Float) Timeout in seconds for the actor.
"""
self._type = '%s.%s' % (self.__module__, self.__class__.__name__)
self._options = options
self._desc = desc
self._dry = dry
self._warn_on_failure = warn_on_failure
self._condition = condition
self._init_context = init_context
self._init_tokens = init_tokens
self._timeout = timeout
if timeout is None:
self._timeout = self.default_timeout
# strict about this -- but in the future, when we have a
# runtime_context object, we may loosen this restriction).
self._fill_in_contexts(context=self._init_context,
strict=self.strict_init_context)
self._setup_log()
self._setup_defaults()
self._validate_options() # Relies on _setup_log() above
# Fill in any options with the supplied initialization context. Be
self.log.debug('Initialized (warn_on_failure=%s, '
'strict_init_context=%s)' %
(warn_on_failure, self.strict_init_context))
def __repr__(self):
"""Returns a nice name/description of the actor.
Either the user has supplied a custom desc parameter to the actor,
giving it a useful description for them. On the other hand, if an actor
defines a custom ActorClass.desc field, that field is interpreted by
this method an any variables that can be swapped in dynamically are.
For example, if misc.Sleep.desc is 'Sleeping {sleep}s', this method
will fill in the value of the option 'sleep' into the string, and then
use that for the representation of the object.
"""
if self._desc:
return self._desc
return self.__class__.desc.format(actor=self._type, **self._options)
def _setup_log(self):
"""Create a customized logging object based on the LogAdapter."""
name = '%s.%s' % (self.__module__, self.__class__.__name__)
logger = logging.getLogger(name)
dry_str = 'DRY: ' if self._dry else ''
self.log = LogAdapter(logger, {'desc': self, 'dry': dry_str})
def _setup_defaults(self):
"""Populate options with defaults if they aren't set."""
for option, definition in self.all_options.items():
if option not in self._options:
default = definition[1]
if default is not REQUIRED:
self._options.update({option: default})
def _validate_options(self):
"""Validate that all the required options were passed in.
Args:
options: A dictionary of options.
Raises:
exceptions.InvalidOptions
"""
# Loop through all_options, and find the required ones
required = [opt_name
for (opt_name, definition) in self.all_options.items()
if definition[1] is REQUIRED]
self.log.debug('Checking for required options: %s' % required)
option_errors = []
option_warnings = []
for opt in required:
if opt not in self._options:
description = self.all_options[opt][2]
option_errors.append('Option "%s" is required: %s' % (
opt, description))
for opt, value in self._options.items():
if opt not in self.all_options:
option_warnings.append('Option "%s" is not expected by %s.' % (
opt, self.__class__.__name__))
continue
expected_type = self.all_options[opt][0]
# Unicode is not a `str` but it is a `basestring`
# Cast the passed value explicitly as a string
if isinstance(value, basestring):
value = str(value)
# If the expected_type has an attribute 'valid', then verify that
# the option passed in is one of those valid options.
if hasattr(expected_type, 'validate'):
try:
expected_type.validate(value)
continue
except exceptions.InvalidOptions as e:
option_errors.append(e)
# If the option type is Bool, try to convert the strings True/False
# into booleans. If this doesn't work, siletly move on and let the
# failure get caught below.
if expected_type is bool:
try:
value = self.str2bool(value, strict=True)
self._options[opt] = value
except exceptions.InvalidOptions as e:
self.log.warning(e)
if not (value is None or isinstance(value, expected_type)):
message = 'Option "%s" has to be %s and is %s.' % (
opt, expected_type, type(value))
option_errors.append(message)
for w in option_warnings:
self.log.warning(w)
if option_errors:
for e in option_errors:
self.log.critical(e)
raise exceptions.InvalidOptions(
'Found %s issue(s) with passed options.' % len(option_errors))
[docs] def option(self, name):
"""Return the value for a given Actor option."""
return self._options.get(name)
[docs] def readfile(self, path):
"""Return file contents as a string.
Raises:
InvalidOptions if file is not found, or readable.
"""
try:
with open(path) as f:
contents = f.read()
except IOError as e:
raise exceptions.InvalidOptions(e)
return contents
[docs] @gen.coroutine
def timeout(self, f, *args, **kwargs):
"""Wraps a Coroutine method in a timeout.
Used to wrap the self.execute() method in a timeout that will raise an
ActorTimedOut exception if an actor takes too long to execute.
*Note, Tornado 4+ does not allow you to actually kill a task on the
IOLoop.* This means that all we are doing here is notifying the caller
(through the raised exception) that a problem has happened.
Fairly simple Actors should actually 'stop executing' when this
exception is raised. Complex actors with very unique behaviors though
(like the rightsacle.server_array.Execute actor) have the ability to
continue to execute in the background until the Kingpin application
quits. It is not the job of this method to try to kill these actors,
but just to let the user know that a failure has happened.
"""
# Get our timeout setting, or fallback to the default
self.log.debug('%s.%s() deadline: %s(s)' %
(self._type, f.__name__, self._timeout))
# Get our Future object but don't yield on it yet, This starts the
# execution, but allows us to wrap it below with the
# 'gen.with_timeout' function.
fut = f(*args, **kwargs)
# If no timeout is set (none, or 0), then we just yield the Future and
# return its results.
if not self._timeout:
ret = yield fut
raise gen.Return(ret)
# Generate a timestamp in the future at which point we will raise
# an alarm if the actor is still executing
deadline = time.time() + float(self._timeout)
# Now we yield on the gen_with_timeout function
try:
ret = yield gen.with_timeout(
deadline, fut, quiet_exceptions=(exceptions.ActorTimedOut))
except gen.TimeoutError:
msg = ('%s.%s() execution exceeded deadline: %ss' %
(self._type, f.__name__, self._timeout))
self.log.error(msg)
raise exceptions.ActorTimedOut(msg)
raise gen.Return(ret)
[docs] def str2bool(self, v, strict=False):
"""Returns a Boolean from a variety of inputs.
args:
value: String/Bool
strict: Whether or not to _only_ convert the known words into
booleans, or whether to allow "any" word to be considered True
other than the known False words.
returns:
A boolean
"""
false = ('no', 'false', 'f', '0')
true = ('yes', 'true', 't', '1')
string = str(v).lower()
if strict:
if string not in true and string not in false:
raise exceptions.InvalidOptions(
'Expected [%s, %s] but got: %s' %
(true, false, string))
return string not in false
def _check_condition(self):
"""Check if specified condition allows this actor to run.
Evaluate self._condition to figure out if this actor should run.
The only exception to simply casting this variable to bool is if
the value of self._condition is a string "False" or string "0".
"""
check = self.str2bool(self._condition)
self.log.debug('Condition %s evaluates to %s' % (
self._condition, check))
return check
def _fill_in_contexts(self, context={}, strict=True):
"""Parses self._options and updates it with the supplied context.
Parses the objects self._options dict (by converting it into a JSON
string, substituting, and then turning it back into a dict) and the
self._desc string and replaces any {KEY}s with the valoues from the
context dict that was supplied.
Args:
strict: bool whether or not to allow missing context keys to be
skipped over.
Raises:
exceptions.InvalidOptions
"""
# Inject contexts into Description
try:
self._desc = utils.populate_with_tokens(
str(self),
context,
self.left_context_separator,
self.right_context_separator,
strict=strict)
except LookupError as e:
msg = 'Context for description failed: %s' % e
raise exceptions.InvalidOptions(msg)
# Inject contexts into condition
try:
self._condition = utils.populate_with_tokens(
str(self._condition),
context,
self.left_context_separator,
self.right_context_separator,
strict=strict)
except LookupError as e:
msg = 'Context for condition failed: %s' % e
raise exceptions.InvalidOptions(msg)
# Convert our self._options dict into a string for fast parsing
options_string = json.dumps(self._options)
# Generate a new string with the values parsed out. At this point, if
# any value is un-matched, an exception is raised and execution fails.
# This stops execution during a dry run, before any live changes are
# made.
try:
new_options_string = utils.populate_with_tokens(
options_string,
context,
self.left_context_separator,
self.right_context_separator,
strict=strict)
except LookupError as e:
msg = 'Context for options failed: %s' % e
raise exceptions.InvalidOptions(msg)
# Finally, convert the string back into a dict and store it.
self._options = json.loads(new_options_string)
[docs] def get_orgchart(self, parent=''):
"""Construct organizational chart describing this actor.
Return a list of actors handled by this actor. Most actors will return
a list of just one object. Grouping actors will return a list of all
actors that are called.
orgchart object:
id: unique string identifying this actor's instance.
class: kingpin class name
desc: actor description
parent_id: organizational relationship. Same as `id` above.
"""
return [{
'id': str(id(self)),
'desc': self._desc,
'class': self.__class__.__name__,
# 'options': self._options, # May include tokens & ENV vars
'parent_id': parent,
}]
@gen.coroutine
@timer
def execute(self):
"""Executes an actor and yields the results when its finished.
Calls an actors private _execute() method and either returns the result
(through gen.Return) or handles any exceptions that are raised.
RecoverableActorFailure exceptions are potentially swallowed up (and
warned) if the self._warn_on_failure flag is set. Otherwise, they're
logged and re-raised. All other ActorException exceptions are caught,
logged and re-raised.
We have a generic catch-all exception handling block as well, because
third party Actor classes may or may not catch all appropriate
exceptions. This block is mainly here to prevent the entire app from
failing due to a poorly written Actor.
Raises:
gen.Return(result)
"""
self.log.debug('Beginning')
# Any exception thats raised by an actors _execute() method will
# automatically cause actor failure and we return right away.
result = None
if not self._check_condition():
self.log.warning('Skipping execution. Condition: %s' %
self._condition)
raise gen.Return()
try:
result = yield self.timeout(self._execute)
except exceptions.ActorException as e:
# If exception is not RecoverableActorFailure
# or if warn_on_failure is not set, then escalate.
recover = isinstance(e, exceptions.RecoverableActorFailure)
if not recover or not self._warn_on_failure:
self.log.critical(e)
raise
# Otherwise - flag this failure as a warning, and continue
self.log.warning(e)
self.log.warning(
'Continuing execution even though a failure was '
'detected (warn_on_failure=%s)' % self._warn_on_failure)
except Exception as e:
# We don't like general exception catch clauses like this, but
# because actors can be written by third parties and automatically
# imported, its impossible for us to catch every exception
# possible. This is a failsafe thats meant to throw a strong
# warning.
log.critical('Unexpected exception caught! '
'Please contact the author (%s) and provide them '
'with this stacktrace' %
sys.modules[self.__module__].__author__)
self.log.exception(e)
raise exceptions.ActorException(e)
else:
self.log.debug('Finished successfully, return value: %s' % result)
# If we got here, we're exiting the actor cleanly and moving on.
raise gen.Return(result)
[docs]class EnsurableBaseActor(BaseActor):
"""Base Class for Actors that "ensure" the state of a resource.
Many of our actors have a goal of ensuring that a particular resource is in
a given state. This leads to a ton of boiler plate code to "get" the state
of something, "compare" that to the desired state, and then maybe "set" the
state.
This actor provides a framework allowing the user to simply write the
getters and setters (and optionally compare), and lets the rest of the
actor handle the order of operations.
**Required Methods:**
:`_set_state`: Creates or destroys the resource depending on
the 'state' parameter that was passed in.
*Note: The 'state' parameter is automatically added to
the options. You do not need to define it.*
:`_get_state`: Gets the current state of the resource.
:`_set_[option]`: A 'setter' for each option name passed in.
:`_get_[option]`: A 'getter' for each option name passed in.
**Optional Methods:**
:`_precache`: Called before any setters/getters are triggered. Used
to optionally populate a cache of data to make the
getters faster. For example, if you can make one API
call to get all of the data about a resource, then
store that data locally for fast access.
:`_compare_[option]`: Optionally you can write your own comparison
method if you're not doing a pure string
comparison between the source and destination.
**Examples**
.. code-block:: python
class MyClass(base.EnsurableBaseActor):
all_options = {
'name': (str, REQUIRED, 'Name of thing'),
'description': (str, None, 'Description of thing')
}
unmanaged_options = ['name']
@gen.coroutine
def _set_state(self):
if self.option('state') == 'absent':
yield self.conn.delete_resource(
name=self.option('name'))
else:
yield self.conn.create_resource(
name=self.option('name'),
desc=self.option('description'))
@gen.coroutine
def _set_description(self):
yield self.conn.set_desc_of_resource(
name=self.option('name'),
desc=self.option('description'))
@gen.coroutine
def _get_description(self):
yield self.conn.get_desc_of_resource(
name=self.option('name'))
"""
# A list of option names that are _not_ automatically managed. These are
# useful if you have special behaviors like 'commit' on change, or if you
# have parameters that are unmutable ('name').
unmanaged_options = []
def __init__(self, *args, **kwargs):
# The 'state' parameter is a given, so make sure its set,
self.all_options['state'] = (
STATE, 'present', 'Desired state: present or absent')
# Now go ahead and validate all of the user inputs the normal way
super(EnsurableBaseActor, self).__init__(*args, **kwargs)
# Generate a list of options that will be ensured ...
self._ensurable_options = self.all_options.keys()
for option in self.unmanaged_options:
self._ensurable_options.remove(option)
# Finally, do a class validation... make sure that we have actual
# getter/setter methods for each of the options. This populates dicts
# that provide references to the actual methods for execution later.
self._gather_methods()
def _gather_methods(self):
"""Generates pointers to the Getter and Setter methods.
Walks through the list of options in self.all_options and discovers the
pointers to the getter/setter methods. If any are missing, throws an
exception quickly.
"""
self.setters = {}
self.getters = {}
self.comparers = {}
for option in self._ensurable_options:
setter = '_set_%s' % option
getter = '_get_%s' % option
comparer = '_compare_%s' % option
if not self._is_method(getter) or not self._is_method(setter):
raise exceptions.UnrecoverableActorFailure(
'Invalid Actor Code Detected in %s: '
'Unable to find required methods: %s, %s'
% (self.__class__.__name__, setter, getter))
if not self._is_method(comparer):
@gen.coroutine
def _comparer(option=option):
existing = yield self.getters[option]()
new = self.option(option)
raise gen.Return(existing == new)
setattr(self, comparer, _comparer)
# self.log.debug('Creating dynamic method %s' % comparer)
self.setters[option] = getattr(self, setter)
self.getters[option] = getattr(self, getter)
self.comparers[option] = getattr(self, comparer)
def _is_method(self, name):
return hasattr(self, name) and inspect.ismethod(getattr(self, name))
@gen.coroutine
def _precache(self):
"""Override this method to pre-cache data in your actor.
This method can be overridden to go off and pre-fetch data for your
actors _set and _get methods. This helps if you can execute a single
API call that gets most of the data you need, before any of the actual
get/set operations take place.
"""
raise gen.Return()
@gen.coroutine
def _get_state(self):
raise NotImplementedError('_get_state is required for Ensurable')
@gen.coroutine
def _set_state(self):
raise NotImplementedError('_set_state is required for Ensurable')
@gen.coroutine
def _ensure(self, option):
"""Compares the desired state with the actual state of a resource.
Uses the getter for a resource option to determine its current state,
and then compares it with the desired state. Generally does a simple
string comparison of the states, but user can optionally define their
own comparison mechanism as well.
If the states do not match, then the setter method is called.
"""
equals = yield self.comparers[option]()
if equals:
self.log.debug('Option "%s" matches' % option)
raise gen.Return()
self.log.debug('Option "%s" DOES NOT match, calling setter' % option)
yield self.setters[option]()
@gen.coroutine
def _execute(self):
"""A pretty simple execution pipeline for the actor.
Note: An OrderedDict can be used instead of a plain dict when order
actually matters for the option setting.
"""
yield self._precache()
yield self._ensure('state')
if self.option('state') == 'absent':
raise gen.Return()
for option in self._ensurable_options:
# We've already managed state .. so make sure we skip the state
# option and only manage the others.
if option != 'state':
yield self._ensure(option)
[docs]class HTTPBaseActor(BaseActor):
"""Abstract base class for an HTTP-client based Actor object.
This class provides common methods for getting access to asynchronous
HTTP clients, wrapping the executions in appropriate try/except blocks,
timeouts, etc.
If you're writing an Actor that uses a remote REST API, this is the
base class you should subclass from.
"""
headers = None
def _get_http_client(self):
"""Returns an asynchronous web client object
The object is actually of type SimpleAsyncHTTPClient
"""
return httpclient.AsyncHTTPClient()
def _get_method(self, post):
"""Returns the appropriate HTTP Method based on the supplied Post data.
Args:
post: The post body you intend to submit in the URL request
Returns:
'GET' or 'POST'
"""
# If there is no post data, set the request method to GET
if post is not None:
return 'POST'
else:
return 'GET'
def _generate_escaped_url(self, url, args):
"""Takes in a dictionary of arguments and returns a URL line.
Sorts the arguments so that the returned string is predictable and in
alphabetical order. Effectively wraps the tornado.httputil.url_concat
method and properly strips out None values, as well as lowercases
Bool values.
Args:
url: (Str) The URL to append the arguments to
args: (Dict) Key/Value arguments. Values should be primitives.
Returns:
A URL encoded string like this: <url>?foo=bar&abc=xyz
"""
# Remove keys from the arguments where the value is None
args = dict((k, v) for k, v in args.iteritems() if v)
# Convert all Bool values to lowercase strings
for key, value in args.iteritems():
if type(value) is bool:
args[key] = str(value).lower()
# Now generate the URL
full_url = httputil.url_concat(url, sorted(args.items()))
self.log.debug('Generated URL: %s' % full_url)
return full_url
# TODO: Add a retry/backoff timer here. If the remote endpoint returns
# garbled data (ie, maybe a 500 errror or something else thats not in
# JSON format, we should back off and try again.
@gen.coroutine
def _fetch(self, url, post=None, auth_username=None, auth_password=None):
"""Executes a web request asynchronously and yields the body.
Args:
url: (Str) The full url path of the API call
post: (Str) POST body data to submit (if any)
auth_username: (str) HTTP auth username
auth_password: (str) HTTP auth password
"""
# Generate the full request URL and log out what we're doing...
self.log.debug('Making HTTP request to %s with data: %s' % (url, post))
# Create the http_request object
http_client = self._get_http_client()
http_request = httpclient.HTTPRequest(
url=url,
method=self._get_method(post),
body=post,
headers=self.headers,
auth_username=auth_username,
auth_password=auth_password,
follow_redirects=True,
max_redirects=10)
# Execute the request and raise any exception. Exceptions are not
# caught here because they are unique to the API endpoints, and thus
# should be handled by the individual Actor that called this method.
http_response = yield http_client.fetch(http_request)
try:
body = json.loads(http_response.body)
except ValueError as e:
raise exceptions.UnparseableResponseFromEndpoint(
'Unable to parse response from remote API as JSON: %s' % e)
# Receive a successful return
raise gen.Return(body)