Source code for regparse.sigcheck

# -*- coding: utf-8 -*-
"""
Tests RCS request signatures for validity
"""
from __future__ import division, print_function, unicode_literals

import hashlib, hmac, base64, logging, flask, iso8601, datetime, db

from flask.ext.restful import abort
from functools import wraps

[docs]def get_logger(): """This function does not deserve a comment""" return flask.current_app.logger
[docs]def sign( key, *parts ): """ Creates an HMAC_SHA256 signature from a key and a set of message parts :param key: The signing key to use :type key: str :returns: str -- a URL safe base64 encoded signature """ u8parts = [p.encode('utf8') for p in parts] msg = str('').join( u8parts ) logging.debug( msg ) h = hmac.new( str(key), msg, digestmod=hashlib.sha256 ) return base64.urlsafe_b64encode( h.digest() ).replace('=','')
[docs]def test_request( request ): """ Test the signature of a given request. Pulls the signature components from the request and generates a reference signature. Tests if the reference signature matches the Authorization header in the request. :param request: A flask request object containing the request to validate :returns: bool -- if the generated signature matches the authorization header """ logger = get_logger() for h in 'Authorization TimeStamp Sender'.split(): logger.debug( h+': '+request.headers.get(h,'MISSING') ) dt = request.headers.get( 'TimeStamp', None ) cid = request.headers.get( 'Sender', None ) msg_sig = request.headers.get('Authorization', None) if not (dt and cid and msg_sig): logger.warning( 'Missing data from headers, sig check failed' ) return False rqpath = request.path rqbody = request.data psk = db.auth.get_key(cid) ref_sig = sign( psk, rqpath, cid, dt, rqbody ) logger.info( 'Signature received: {0} ## Signature generated: {1}'.format(msg_sig,ref_sig) ) return ref_sig is not None and ref_sig == msg_sig
[docs]def validate(func): """ Wraps a service endpoint and checks the authentication headers. Tests the signature on the request and ensures the request time is current. Failure results in a 401 HTTP error if SIG_CHECK is enabled, if not validation will always pass. """ def validation_fail( msg ): if flask.current_app.config['SIG_CHECK']: abort(401,msg=msg) else: # be vocal if SIG_CHECK is accidentally set to False in production get_logger().warning('Authorization failure [{0}] ignored: SIG_CHECK is off in the config'.format(msg)) @wraps(func) def decorated_function(*args, **kwargs): if not test_request( flask.request ): validation_fail( 'Invalid signature' ) try: if not check_time( flask.request ): validation_fail( 'Request time out of sync' ) except iso8601.ParseError: if flask.current_app.config['SIG_CHECK']: abort(400,msg="Unparsable timestamp") else: get_logger().warning('Timestamp parse failure ignored: SIG_CHECK is off in the config') return func(*args, **kwargs) return decorated_function
[docs]def check_time( request ): """ Ensure the sent time is within 2 minutes of the current time :param sent: An ISO8601 encoded date :type sent: str :raises: ParseError -- if the datetime cannot be parsed :returns: bool -- sent time is within 2 minutes of the current time """ logger = get_logger() sent = request.headers.get( 'TimeStamp' ) dt = iso8601.parse_date( sent ) now = datetime.datetime.now( iso8601.iso8601.Utc() ) two_min = datetime.timedelta(minutes=2) logger.info( 'Header date: {0} ## Current timestamp: {1}'.format(sent,now) ) return -two_min < now-dt < two_min
if __name__ == '__main__': dt = '2007-01-25T12:00:00Z' psk = 'test' cid = '1' rqpath = '/register/22' rqbody = '{"a":1}' sig = sign( psk, rqpath, cid, dt, rqbody ) print( sig ) REQUEST_PATH = '/register/23ax5t' SENDER_ID = 'jstest' TIME_STAMP = '2014-12-05T18:28:56.714Z' REQUEST_BODY = '''{"version":"1.0.0","payload_type":"wms","en":{"service_url":"http://wms.ess-ws.nrcan.gc.ca/wms/toporama_en","service_name":"Key Areas for Birds in Coastal Regions of the Canadian Beaufort Sea - la Mue ou l'élevage des couvées (de la mi-juillet à la mi-août)","layer":"limits"},"fr":{"service_url":"http://wms.ess-ws.nrcan.gc.ca/wms/toporama_en","layer":"limits"}}''' psk = 'test_-k' sig = sign( psk, REQUEST_PATH, SENDER_ID, TIME_STAMP, REQUEST_BODY ) print( sig )