Source code for flask_dance.consumer.oauth2

import json
import logging

import flask
from flask import current_app, redirect, request, url_for
from oauthlib.common import generate_token
from oauthlib.oauth2 import MissingCodeError
from werkzeug.utils import cached_property
from werkzeug.wrappers import Response

from .base import (
    BaseOAuthConsumerBlueprint,
    oauth_authorized,
    oauth_before_login,
    oauth_error,
)
from .requests import OAuth2Session

log = logging.getLogger(__name__)


[docs] class OAuth2ConsumerBlueprint(BaseOAuthConsumerBlueprint): """ A subclass of :class:`flask.Blueprint` that sets up OAuth 2 authentication. """
[docs] def __init__( self, name, import_name, client_id=None, client_secret=None, *, client=None, auto_refresh_url=None, auto_refresh_kwargs=None, scope=None, state=None, static_folder=None, static_url_path=None, template_folder=None, url_prefix=None, subdomain=None, url_defaults=None, root_path=None, login_url=None, authorized_url=None, base_url=None, authorization_url=None, authorization_url_params=None, token_url=None, token_url_params=None, redirect_url=None, redirect_to=None, session_class=None, storage=None, rule_kwargs=None, use_pkce=False, code_challenge_method="S256", **kwargs, ): """ Most of the constructor arguments are forwarded either to the :class:`flask.Blueprint` constructor or the :class:`requests_oauthlib.OAuth2Session` constructor, including ``**kwargs`` (which is forwarded to :class:`~requests_oauthlib.OAuth2Session`). Only the arguments that are relevant to Flask-Dance are documented here. Args: base_url: The base URL of the OAuth provider. If specified, all URLs passed to this instance will be resolved relative to this URL. authorization_url: The URL specified by the OAuth provider for obtaining an `authorization grant <https://datatracker.ietf.org/doc/html/rfc6749#section-1.3>`__. This can be an fully-qualified URL, or a path that is resolved relative to the ``base_url``. authorization_url_params (dict): A dict of extra key-value pairs to include in the query string of the ``authorization_url``, beyond those necessary for a standard OAuth 2 authorization grant request. token_url: The URL specified by the OAuth provider for obtaining an `access token <https://datatracker.ietf.org/doc/html/rfc6749#section-1.4>`__. This can be an fully-qualified URL, or a path that is resolved relative to the ``base_url``. token_url_params (dict): A dict of extra key-value pairs to include in the query string of the ``token_url``, beyond those necessary for a standard OAuth 2 access token request. login_url: The URL route for the ``login`` view that kicks off the OAuth dance. This string will be :ref:`formatted <python:formatstrings>` with the instance so that attributes can be interpolated. Defaults to ``/{bp.name}``, so that the URL is based on the name of the blueprint. authorized_url: The URL route for the ``authorized`` view that completes the OAuth dance. This string will be :ref:`formatted <python:formatstrings>` with the instance so that attributes can be interpolated. Defaults to ``/{bp.name}/authorized``, so that the URL is based on the name of the blueprint. redirect_url: When the OAuth dance is complete, redirect the user to this URL. redirect_to: When the OAuth dance is complete, redirect the user to the URL obtained by calling :func:`~flask.url_for` with this argument. If you do not specify either ``redirect_url`` or ``redirect_to``, the user will be redirected to the root path (``/``). session_class: The class to use for creating a Requests session between the consumer (your website) and the provider (e.g. Google). Defaults to :class:`~flask_dance.consumer.requests.OAuth2Session`. storage: A token storage class, or an instance of a token storage class, to use for this blueprint. Defaults to :class:`~flask_dance.consumer.storage.session.SessionStorage`. rule_kwargs (dict, optional): Additional arguments that should be passed when adding the login and authorized routes. Defaults to ``None``. use_pkce: If true then the authorization flow will follow the PKCE (Proof Key for Code Exchange). For more details please refer to `RFC7636 <https://www.rfc-editor.org/rfc/rfc7636#section-4.1>`__ code_challenge_method: Code challenge method to be used in authorization code flow with PKCE instead of client secret. It will be used only if ``use_pkce`` is set to True. Defaults to ``S256``. """ BaseOAuthConsumerBlueprint.__init__( self, name, import_name, static_folder=static_folder, static_url_path=static_url_path, template_folder=template_folder, url_prefix=url_prefix, subdomain=subdomain, url_defaults=url_defaults, root_path=root_path, login_url=login_url, authorized_url=authorized_url, storage=storage, rule_kwargs=rule_kwargs, ) self.base_url = base_url self.session_class = session_class or OAuth2Session # passed to OAuth2Session() self._client_id = client_id self.client = client self.auto_refresh_url = auto_refresh_url self.auto_refresh_kwargs = auto_refresh_kwargs self.scope = scope self.state = state self.kwargs = kwargs self.client_secret = client_secret # used by view functions self.authorization_url = authorization_url self.authorization_url_params = authorization_url_params or {} self.token_url = token_url self.token_url_params = token_url_params or {} self.redirect_url = redirect_url self.redirect_to = redirect_to self.code_challenge_method = code_challenge_method self.use_pkce = use_pkce self.teardown_app_request(self.teardown_session)
@property def client_id(self): return self.session.client_id @client_id.setter def client_id(self, value): self.session.client_id = value # due to a bug in requests-oauthlib, we need to set this manually self.session._client.client_id = value
[docs] @cached_property def session(self): """ This is a session between the consumer (your website) and the provider (e.g. Google). It is *not* a session between a user of your website and your website. :return: """ ret = self.session_class( client_id=self._client_id, client=self.client, auto_refresh_url=self.auto_refresh_url, auto_refresh_kwargs=self.auto_refresh_kwargs, scope=self.scope, state=self.state, blueprint=self, base_url=self.base_url, **self.kwargs, ) def token_updater(token): self.token = token ret.token_updater = token_updater return self.session_created(ret)
def session_created(self, session): return session def teardown_session(self, exception=None): try: del self.session except KeyError: pass def login(self): log.debug("client_id = %s", self.client_id) self.session.redirect_uri = url_for(".authorized", _external=True) if self.use_pkce: code_verifier = generate_token(length=48) code_challenge = self.session._client.create_code_challenge( code_verifier=code_verifier, code_challenge_method=self.code_challenge_method, ) self.authorization_url_params.update( { "code_challenge_method": self.code_challenge_method, "code_challenge": code_challenge, } ) code_verifier_key = f"{self.name}_oauth_code_verifier" flask.session[code_verifier_key] = code_verifier log.debug("code_verifier = %s", code_verifier) url, state = self.session.authorization_url( self.authorization_url, state=self.state, **self.authorization_url_params ) state_key = f"{self.name}_oauth_state" flask.session[state_key] = state log.debug("state = %s", state) log.debug("redirect URL = %s", url) oauth_before_login.send(self, url=url) return redirect(url) def authorized(self): """ This is the route/function that the user will be redirected to by the provider (e.g. Google) after the user has logged into the provider's website and authorized your app to access their account. """ if self.redirect_url: next_url = self.redirect_url elif self.redirect_to: next_url = url_for(self.redirect_to) else: next_url = "/" log.debug("next_url = %s", next_url) # check for error in request args error = request.args.get("error") if error: error_desc = request.args.get("error_description") error_uri = request.args.get("error_uri") log.warning( "OAuth 2 authorization error: %s description: %s uri: %s", error, error_desc, error_uri, ) results = oauth_error.send( self, error=error, error_description=error_desc, error_uri=error_uri ) if results: for _, ret in results: if isinstance(ret, (Response, current_app.response_class)): return ret return redirect(next_url) state_key = f"{self.name}_oauth_state" if state_key not in flask.session: # can't validate state, so redirect back to login view log.info("state not found, redirecting user to login") return redirect(url_for(".login")) state = flask.session[state_key] log.debug("state = %s", state) self.session._state = state del flask.session[state_key] if self.use_pkce: code_verifier_key = f"{self.name}_oauth_code_verifier" if code_verifier_key not in flask.session: # can't find code_verifier, so redirect back to login view log.info("code_verifier not found, redirecting user to login") return redirect(url_for(".login")) code_verifier = flask.session[code_verifier_key] log.debug("code_verifier = %s", code_verifier) del flask.session[code_verifier_key] self.token_url_params["code_verifier"] = code_verifier self.session.redirect_uri = url_for(".authorized", _external=True) log.debug("client_id = %s", self.client_id) log.debug("client_secret = %s", self.client_secret) try: token = self.session.fetch_token( self.token_url, authorization_response=request.url, client_secret=self.client_secret, **self.token_url_params, ) except MissingCodeError as e: e.args = ( e.args[0], "The redirect request did not contain the expected parameters. Instead I got: {}".format( json.dumps(request.args) ), ) raise results = oauth_authorized.send(self, token=token) or [] set_token = True for func, ret in results: if isinstance(ret, (Response, current_app.response_class)): return ret if ret == False: set_token = False if set_token: try: self.token = token except ValueError as error: log.warning("OAuth 2 authorization error: %s", str(error)) oauth_error.send(self, error=error) return redirect(next_url)