summaryrefslogtreecommitdiffstats
path: root/third_party/python/taskcluster/taskcluster/aio/asyncclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/taskcluster/taskcluster/aio/asyncclient.py')
-rw-r--r--third_party/python/taskcluster/taskcluster/aio/asyncclient.py306
1 files changed, 306 insertions, 0 deletions
diff --git a/third_party/python/taskcluster/taskcluster/aio/asyncclient.py b/third_party/python/taskcluster/taskcluster/aio/asyncclient.py
new file mode 100644
index 0000000000..5882d81ad2
--- /dev/null
+++ b/third_party/python/taskcluster/taskcluster/aio/asyncclient.py
@@ -0,0 +1,306 @@
+"""This module is used to interact with taskcluster rest apis"""
+
+from __future__ import absolute_import, division, print_function
+
+import os
+import logging
+from six.moves import urllib
+
+import mohawk
+import mohawk.bewit
+import aiohttp
+
+from .. import exceptions
+from .. import utils
+from ..client import BaseClient, createTemporaryCredentials
+from . import asyncutils, retry
+
+log = logging.getLogger(__name__)
+
+
+# Default configuration
+_defaultConfig = config = {
+ 'credentials': {
+ 'clientId': os.environ.get('TASKCLUSTER_CLIENT_ID'),
+ 'accessToken': os.environ.get('TASKCLUSTER_ACCESS_TOKEN'),
+ 'certificate': os.environ.get('TASKCLUSTER_CERTIFICATE'),
+ },
+ 'maxRetries': 5,
+ 'signedUrlExpiration': 15 * 60,
+}
+
+
+def createSession(*args, **kwargs):
+ """ Create a new aiohttp session. This passes through all positional and
+ keyword arguments to the asyncutils.createSession() constructor.
+
+ It's preferred to do something like
+
+ async with createSession(...) as session:
+ queue = Queue(session=session)
+ await queue.ping()
+
+ or
+
+ async with createSession(...) as session:
+ async with Queue(session=session) as queue:
+ await queue.ping()
+
+ in the client code.
+ """
+ return asyncutils.createSession(*args, **kwargs)
+
+
+class AsyncBaseClient(BaseClient):
+ """ Base Class for API Client Classes. Each individual Client class
+ needs to set up its own methods for REST endpoints and Topic Exchange
+ routing key patterns. The _makeApiCall() and _topicExchange() methods
+ help with this.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(AsyncBaseClient, self).__init__(*args, **kwargs)
+ self._implicitSession = False
+ if self.session is None:
+ self._implicitSession = True
+
+ def _createSession(self):
+ """ If self.session isn't set, don't create an implicit.
+
+ To avoid `session.close()` warnings at the end of tasks, and
+ various strongly-worded aiohttp warnings about using `async with`,
+ let's set `self.session` to `None` if no session is passed in to
+ `__init__`. The `asyncutils` functions will create a new session
+ per call in that case.
+ """
+ return None
+
+ async def _makeApiCall(self, entry, *args, **kwargs):
+ """ This function is used to dispatch calls to other functions
+ for a given API Reference entry"""
+
+ x = self._processArgs(entry, *args, **kwargs)
+ routeParams, payload, query, paginationHandler, paginationLimit = x
+ route = self._subArgsInRoute(entry, routeParams)
+
+ if paginationLimit and 'limit' in entry.get('query', []):
+ query['limit'] = paginationLimit
+
+ if query:
+ _route = route + '?' + urllib.parse.urlencode(query)
+ else:
+ _route = route
+ response = await self._makeHttpRequest(entry['method'], _route, payload)
+
+ if paginationHandler:
+ paginationHandler(response)
+ while response.get('continuationToken'):
+ query['continuationToken'] = response['continuationToken']
+ _route = route + '?' + urllib.parse.urlencode(query)
+ response = await self._makeHttpRequest(entry['method'], _route, payload)
+ paginationHandler(response)
+ else:
+ return response
+
+ async def _makeHttpRequest(self, method, route, payload):
+ """ Make an HTTP Request for the API endpoint. This method wraps
+ the logic about doing failure retry and passes off the actual work
+ of doing an HTTP request to another method."""
+
+ url = self._constructUrl(route)
+ log.debug('Full URL used is: %s', url)
+
+ hawkExt = self.makeHawkExt()
+
+ # Serialize payload if given
+ if payload is not None:
+ payload = utils.dumpJson(payload)
+
+ async def tryRequest(retryFor):
+ # Construct header
+ if self._hasCredentials():
+ sender = mohawk.Sender(
+ credentials={
+ 'id': self.options['credentials']['clientId'],
+ 'key': self.options['credentials']['accessToken'],
+ 'algorithm': 'sha256',
+ },
+ ext=hawkExt if hawkExt else {},
+ url=url,
+ content=payload if payload else '',
+ content_type='application/json' if payload else '',
+ method=method,
+ )
+
+ headers = {'Authorization': sender.request_header}
+ else:
+ log.debug('Not using hawk!')
+ headers = {}
+ if payload:
+ # Set header for JSON if payload is given, note that we serialize
+ # outside this loop.
+ headers['Content-Type'] = 'application/json'
+
+ try:
+ response = await asyncutils.makeSingleHttpRequest(
+ method, url, payload, headers, session=self.session
+ )
+ except aiohttp.ClientError as rerr:
+ return retryFor(exceptions.TaskclusterConnectionError(
+ "Failed to establish connection",
+ superExc=rerr
+ ))
+
+ status = response.status
+ if status == 204:
+ return None
+
+ # Catch retryable errors and go to the beginning of the loop
+ # to do the retry
+ if 500 <= status and status < 600:
+ try:
+ response.raise_for_status()
+ except Exception as exc:
+ return retryFor(exc)
+
+ # Throw errors for non-retryable errors
+ if status < 200 or status >= 300:
+ # Parse messages from errors
+ data = {}
+ try:
+ data = await response.json()
+ except Exception:
+ pass # Ignore JSON errors in error messages
+ # Find error message
+ message = "Unknown Server Error"
+ if isinstance(data, dict) and 'message' in data:
+ message = data['message']
+ else:
+ if status == 401:
+ message = "Authentication Error"
+ elif status == 500:
+ message = "Internal Server Error"
+ else:
+ message = "Unknown Server Error %s\n%s" % (str(status), str(data)[:1024])
+ # Raise TaskclusterAuthFailure if this is an auth issue
+ if status == 401:
+ raise exceptions.TaskclusterAuthFailure(
+ message,
+ status_code=status,
+ body=data,
+ superExc=None
+ )
+ # Raise TaskclusterRestFailure for all other issues
+ raise exceptions.TaskclusterRestFailure(
+ message,
+ status_code=status,
+ body=data,
+ superExc=None
+ )
+
+ # Try to load JSON
+ try:
+ await response.release()
+ return await response.json()
+ except (ValueError, aiohttp.client_exceptions.ContentTypeError):
+ return {"response": response}
+
+ return await retry.retry(self.options['maxRetries'], tryRequest)
+
+ async def __aenter__(self):
+ if self._implicitSession and not self.session:
+ self.session = createSession()
+ return self
+
+ async def __aexit__(self, *args):
+ if self._implicitSession and self.session:
+ await self.session.close()
+ self.session = None
+
+
+def createApiClient(name, api):
+ api = api['reference']
+
+ attributes = dict(
+ name=name,
+ __doc__=api.get('description'),
+ classOptions={},
+ funcinfo={},
+ )
+
+ # apply a default for apiVersion; this can be removed when all services
+ # have apiVersion
+ if 'apiVersion' not in api:
+ api['apiVersion'] = 'v1'
+
+ copiedOptions = ('exchangePrefix',)
+ for opt in copiedOptions:
+ if opt in api:
+ attributes['classOptions'][opt] = api[opt]
+
+ copiedProperties = ('serviceName', 'apiVersion')
+ for opt in copiedProperties:
+ if opt in api:
+ attributes[opt] = api[opt]
+
+ for entry in api['entries']:
+ if entry['type'] == 'function':
+ def addApiCall(e):
+ async def apiCall(self, *args, **kwargs):
+ return await self._makeApiCall(e, *args, **kwargs)
+ return apiCall
+ f = addApiCall(entry)
+
+ docStr = "Call the %s api's %s method. " % (name, entry['name'])
+
+ if entry['args'] and len(entry['args']) > 0:
+ docStr += "This method takes:\n\n"
+ docStr += '\n'.join(['- ``%s``' % x for x in entry['args']])
+ docStr += '\n\n'
+ else:
+ docStr += "This method takes no arguments. "
+
+ if 'input' in entry:
+ docStr += "This method takes input ``%s``. " % entry['input']
+
+ if 'output' in entry:
+ docStr += "This method gives output ``%s``" % entry['output']
+
+ docStr += '\n\nThis method does a ``%s`` to ``%s``.' % (
+ entry['method'].upper(), entry['route'])
+
+ f.__doc__ = docStr
+ attributes['funcinfo'][entry['name']] = entry
+
+ elif entry['type'] == 'topic-exchange':
+ def addTopicExchange(e):
+ def topicExchange(self, *args, **kwargs):
+ return self._makeTopicExchange(e, *args, **kwargs)
+ return topicExchange
+
+ f = addTopicExchange(entry)
+
+ docStr = 'Generate a routing key pattern for the %s exchange. ' % entry['exchange']
+ docStr += 'This method takes a given routing key as a string or a '
+ docStr += 'dictionary. For each given dictionary key, the corresponding '
+ docStr += 'routing key token takes its value. For routing key tokens '
+ docStr += 'which are not specified by the dictionary, the * or # character '
+ docStr += 'is used depending on whether or not the key allows multiple words.\n\n'
+ docStr += 'This exchange takes the following keys:\n\n'
+ docStr += '\n'.join(['- ``%s``' % x['name'] for x in entry['routingKey']])
+
+ f.__doc__ = docStr
+
+ # Add whichever function we created
+ f.__name__ = str(entry['name'])
+ attributes[entry['name']] = f
+
+ return type(utils.toStr(name), (BaseClient,), attributes)
+
+
+__all__ = [
+ 'createTemporaryCredentials',
+ 'config',
+ 'BaseClient',
+ 'createApiClient',
+]