# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright 2018 Nextdoor.com, Inc
"""
This package provides a quick way of creating custom API clients for JSON-based
REST APIs. The majority of the work is in the creation of a _CONFIG dictionary
for the class. This dictionary dynamically configures the object at
instantiation time with the appropriate @gen.coroutine wrapped HTTP fetch
methods.
See the documentation in docs/DEVELOPMENT.md for more details on how to use
this package to create your own API client.
"""
import logging
import types
import urllib
from tornado import gen
from tornado import httpclient
from tornado import httputil
import simplejson as json
from kingpin import utils
from kingpin.actors import exceptions
log = logging.getLogger(__name__)
__author__ = 'Matt Wise <matt@nextdoor.com>'
def _retry(*f_or_args, **options):
"""Coroutine-compatible Retry Decorator.
This decorator provides a simple retry mechanism that compares the
exceptions it received against a configuration list (self._EXCEPTIONS), and
then performs the action defined in that list. For example, an HTTPError
with a '500' code might want to retry 3 times. On the otherhand, a 401/403
might want to throw an InvalidCredentials exception.
Examples:
>>> @_retry
def some_func(self):
yield ...
>>> @_retry(retries=5):
def some_func(self):
yield ...
"""
# Defaults...
retries = 3
delay = 0.25
# Have to determine if invoked as @_retry or @_retry()
if len(f_or_args) == 1 and callable(f_or_args[0]):
# Decorator invoked as @_retry
_call_with_args = False
else:
# Decorator invoked as @_retry(args...)
# `f` is unknown for now
_call_with_args = True
retries = options.pop('retries', retries)
delay = options.pop('delay', delay)
def decorator(f_or_self, *args, **kwargs):
# Depending on how this decorator is invoked
# The first argument is either the function, or the `self` object
if not _call_with_args:
self = f_or_self
f = f_or_args[0]
else:
f = f_or_self
def wrapper(self, *args, **kwargs):
i = 1
# Get a list of private kwargs to mask
private_kwargs = getattr(self, '_private_kwargs', [])
# For security purposes, create a patched kwargs string that
# removes passwords from the arguments. This is never guaranteed to
# work (an API could have 'foo' as their password field, and we
# just won't know ...), but we make a best effort here.
safe_kwargs = dict(kwargs)
remove = [k for k in safe_kwargs if k in private_kwargs]
for k in remove:
safe_kwargs[k] = '****'
while True:
# Don't log out the first try as a 'Try' ... just do it
if i > 1:
log.debug('Try (%s/%s) of %s(%s, %s)' %
(i, retries, f, args, safe_kwargs))
# Attempt the method. Catch any exception listed in
# self._EXCEPTIONS.
try:
ret = yield gen.coroutine(f)(self, *args, **kwargs)
raise gen.Return(ret)
except tuple(self._EXCEPTIONS.keys()) as e:
error = str(e)
if hasattr(e, 'message'):
error = e.message
log.warning('Exception raised on try %s: %s' % (i, error))
# If we've run out of retry attempts, raise the exception
if i >= retries:
log.debug('Raising exception: %s' % e)
raise e
# Gather the config for this exception-type from
# self._EXCEPTIONS. Iterate through the data and see if we
# have a matching exception string.
exc_conf = self._EXCEPTIONS[type(e)].copy()
# An empty string for the key is the default exception
# It's optional, but can match before others match, so we
# pop it before searching.
default_exc = exc_conf.pop('', False)
log.debug('Searching through %s' % exc_conf)
matched_exc = [exc for key, exc in exc_conf.items()
if key in str(e)]
log.debug('Matched exceptions: %s' % matched_exc)
if matched_exc and matched_exc[0] is not None:
exception = matched_exc[0]
log.debug('Matched exception: %s' % exception)
raise exception(error)
elif matched_exc and matched_exc[0] is None:
log.debug('Exception is retryable!')
pass
elif default_exc is not False:
raise default_exc(str(e))
elif default_exc is False:
# Reaching this part means no exception was matched
# and no default was specified.
log.debug('No explicit behavior for this exception'
' found. Raising.')
raise e
# Must have been a retryable exception. Retry.
i = i + 1
log.debug('Retrying in %s...' % delay)
yield utils.tornado_sleep(delay)
log.debug('Retrying..')
if not _call_with_args:
# Invoked as @_retry
# Should return the evaluated run
return wrapper(self, *args, **kwargs)
else:
# Invoked as @_retry(args...)
# Return a wrapper that expects all the new args
return wrapper
return decorator
[docs]def create_http_method(name, http_method):
"""Creates the get/put/delete/post coroutined-method for a resource.
This method is called during the __init__ of a RestConsumer object. The
method creates a custom method thats handles a GET, PUT, POST or DELETE
through the Tornado HTTPClient class.
Args:
http_method: Name of the method (get, put, post, delete)
Returns:
A method appropriately configured and named.
"""
@gen.coroutine
def method(self, *args, **kwargs):
# We don't support un-named args. Throw an exception.
if args:
raise exceptions.InvalidOptions('Must pass named-args (kwargs)')
ret = yield self._client.fetch(
url='%s%s' % (self._ENDPOINT, self._path),
method=http_method.upper(),
params=kwargs,
auth_username=self._CONFIG.get('auth', {}).get('user'),
auth_password=self._CONFIG.get('auth', {}).get('pass')
)
raise gen.Return(ret)
method.__name__ = http_method
return method
[docs]def create_method(name, config):
"""Creates a RestConsumer object.
Configures a fresh RestConsumer object with the supplied configuration
bits. The configuration includes information about the name of the method
being consumed and the configuration of that method (which HTTP methods it
supports, etc).
The final created method accepts any args (`*args, **kwargs`) and passes
them on to the RestConsumer object being created. This allows for passing
in unique resource identifiers (ie, the '%res%' in
'/v2/rooms/%res%/history').
Args:
name: The name passed into the RestConsumer object
config: The config passed into the RestConsumer object
Returns:
A method that returns a fresh RestConsumer object
"""
def method(self, *args, **kwargs):
# Merge the supplied kwargs to the method with any kwargs supplied to
# the RestConsumer parent object. This ensures that tokens replaced in
# the 'path' variables are passed all the way down the instantiation
# chain.
merged_kwargs = dict(self._kwargs.items() + kwargs.items())
return self.__class__(
name=name,
config=self._attrs[name],
client=self._client,
*args, **merged_kwargs)
method.__name__ = name
return method
[docs]class RestConsumer(object):
"""An abstract object that self-defines its own API access methods.
At init time, this object reads its `_CONFIG` and pre-defines all of the
API access methods that have been described. It does not handle actual HTTP
calls directly, but is passed in a `client` object (anything that
subclasses the RestClient class) and leverages that for the actual web
calls.
"""
_CONFIG = {}
_ENDPOINT = None
def __init__(self, name=None, config=None, client=None, *args, **kwargs):
"""Initialize the RestConsumer object.
The generic RestConsumer object (with no parameters passed in) looks at
the self.__class__._CONFIG dictionary and dynamically generates access
methods for the various API methods.
The GET, PUT, POST and DELETE methods optionally listed in
CONFIG['http_methods'] represent the possible types of HTTP methods
that the CONFIG['path'] supports. For each one of these listed, a
@coroutine wrapped get/put/post/delete() method will be created in the
RestConsumer that knows how to make the HTTP request.
For each item listed in CONFIG['attrs'], an access method is created
that will dynamically create and return a new RestConsumer object thats
configured for this endpoint. These methods are not asynchronous, but
are non-blocking.
Args:
name: Name of the resource method (default: None)
config: The dictionary object with the configuration for this API
endpoint call.
client: <TBD>
*args,**kwargs: <TBD>
"""
# If these aren't passed in, then get them from the class definition
name = name or self.__class__.__name__
config = config or self._CONFIG
# Get the basic options for this particular REST endpoint access object
self._path = config.get('path', None)
self._http_methods = config.get('http_methods', None)
self._attrs = config.get('attrs', None)
self._kwargs = kwargs
# If no client was supplied, then we
self._client = client or RestClient()
# Ensure that any tokens that need filling-in in the self._path setting
# are pulled from the **kwargs passed into this init. This is used on
# API paths like Hipchats '/v2/room/%(res)/...' URLs.
self._path = self._replace_path_tokens(self._path, kwargs)
# Create all of the methods based on the self._http_methods and
# self._attrs dict.
self._create_methods()
self._create_attrs()
# Log some things
log.debug('%s/%s initialized' %
(self.__class__.__name__, self._client))
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self)
def __str__(self):
return str(self._path)
def _replace_path_tokens(self, path, tokens):
"""Search and replace %xxx% with values from tokens.
Used to replace any values of %xxx% with 'xxx' from tokens. Can replace
one, or many fields at aonce.
Args:
path: String of the path
tokens: A dictionary of tokens to search through.
Returns:
path: A modified string
"""
if not path:
return
try:
path = utils.populate_with_tokens(path, tokens)
except LookupError as e:
msg = 'Path (%s), tokens: (%s) error: %s' % (path, tokens, e)
raise TypeError(msg)
return path
def _create_methods(self):
"""Create @gen.coroutine wrapped HTTP methods.
Iterates through the methods described in self._methods and creates
@gen.coroutine wrapped access methods that perform these actions.
"""
if not self._http_methods:
return
for name in self._http_methods.keys():
full_method_name = 'http_%s' % name
method = create_http_method(full_method_name, name)
setattr(self,
full_method_name,
types.MethodType(method, self, self.__class__))
def _create_attrs(self):
"""Creates access methods to the attributes in self._attrs.
Iterates through the attributes described in self._attrs and creates
access methods that return RestConsumer objects for those attributes.
"""
if not self._attrs:
return
for name in self._attrs.keys():
method = create_method(name, self._attrs[name])
setattr(self, name, types.MethodType(method, self, self.__class__))
[docs]class RestClient(object):
"""Very simple REST client for the RestConsumer. Implements a
AsyncHTTPClient(), some convinience methods for URL escaping, and a single
fetch() method that can handle GET/POST/PUT/DELETEs.
This code is nearly identical to the kingpin.actors.base.BaseHTTPActor
class, but is not actor-specific.
Args:
headers: Headers to pass in on every HTTP request
"""
_EXCEPTIONS = {
httpclient.HTTPError: {
'401': exceptions.InvalidCredentials,
'403': exceptions.InvalidCredentials,
'500': None,
'502': None,
'503': None,
'504': None,
# Rrepresents a standard HTTP Timeout
'599': None,
'': exceptions.RecoverableActorFailure,
}
}
def __init__(self, client=None, headers=None):
self._client = client or httpclient.AsyncHTTPClient()
self._private_kwargs = ['auth_password']
self.headers = headers
def _generate_escaped_url(self, url, args):
"""Takes in a dictionary of arguments and returns a URL line.
Sorts the arguments so that the returned string is predictable and in
alphabetical order. Effectively wraps the tornado.httputil.url_concat
method and properly strips out None values, as well as lowercases
Bool values.
Args:
url: (Str) The URL to append the arguments to
args: (Dict) Key/Value arguments. Values should be primitives.
Returns:
A URL encoded string like this: <url>?foo=bar&abc=xyz
"""
# Remove keys from the arguments where the value is None
args = dict((k, v) for k, v in args.iteritems() if v)
# Convert all Bool values to lowercase strings
for key, value in args.iteritems():
if type(value) is bool:
args[key] = str(value).lower()
# Now generate the URL
full_url = httputil.url_concat(url, sorted(args.items()))
log.debug('Generated URL: %s' % full_url)
return full_url
# TODO: Add a retry/backoff timer here. If the remote endpoint returns
# garbled data (ie, maybe a 500 errror or something else thats not in
# JSON format, we should back off and try again.
@gen.coroutine
@_retry
def fetch(self, url, method, params={},
auth_username=None, auth_password=None):
"""Executes a web request asynchronously and yields the body.
Args:
url: (Str) The full url path of the API call
params: (Dict) Arguments (k/v pairs) to submit either as POST data
or URL argument options.
method: (Str) GET/PUT/POST/DELETE
auth_username: (str) HTTP auth username
auth_password: (str) HTTP auth password
"""
# Start with empty post data. If we're doing a PUT/POST, then just pass
# args directly into the ch() method and let it take care of
# things. If we're doing a GET/DELETE though, convert kwargs into a
# modified URL string and pass that into the fetch() method.
body = None
if method in ('PUT', 'POST'):
body = urllib.urlencode(params) or None
elif method in ('GET', 'DELETE') and params:
url = self._generate_escaped_url(url, params)
# Generate the full request URL and log out what we're doing...
log.debug('Making %s request to %s. Data: %s' % (method, url, body))
# Create the http_request object
http_request = httpclient.HTTPRequest(
url=url,
method=method,
body=body,
headers=self.headers,
auth_username=auth_username,
auth_password=auth_password,
follow_redirects=True,
max_redirects=10)
# Execute the request and raise any exception. Exceptions are not
# caught here because they are unique to the API endpoints, and thus
# should be handled by the individual Actor that called this method.
log.debug('HTTP Request: %s' % http_request)
try:
http_response = yield self._client.fetch(http_request)
except httpclient.HTTPError as e:
log.critical('Request for %s failed: %s' % (url, e))
raise
log.debug('HTTP Response: %s' % http_response.body)
try:
body = json.loads(http_response.body)
except ValueError:
raise gen.Return(http_response.body)
# Receive a successful return
raise gen.Return(body)
[docs]class SimpleTokenRestClient(RestClient):
"""Simple RestClient that appends a 'token' to every web request for
authentication. Used in most simple APIs where a token is provided to the
end user.
Args:
tokens: (dict) A dict with the token name/value(s) to append to every
we request.
"""
def __init__(self, tokens, *args, **kwargs):
super(SimpleTokenRestClient, self).__init__(*args, **kwargs)
self._tokens = tokens
for key in tokens.keys():
self._private_kwargs.append(key)
@gen.coroutine
def fetch(self, *args, **kwargs):
if 'params' not in kwargs:
kwargs['params'] = {}
kwargs['params'].update(self._tokens)
ret = yield super(SimpleTokenRestClient, self).fetch(*args, **kwargs)
raise gen.Return(ret)