from datetime import datetime
from sqlalchemy import JSON, Column, DateTime, Integer, String
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm.exc import NoResultFound
from flask_dance.consumer.storage import BaseStorage
from flask_dance.utils import FakeCache, first
try:
from flask_login import AnonymousUserMixin
except ImportError:
AnonymousUserMixin = None
class OAuthConsumerMixin:
"""
A :ref:`SQLAlchemy declarative mixin <sqlalchemy:declarative_mixins>` with
some suggested columns for a model to store OAuth tokens:
``id``
an integer primary key
``provider``
a short name to indicate which OAuth provider issued
this token
``created_at``
an automatically generated datetime that indicates when
the OAuth provider issued this token
``token``
a :class:`JSON <sqlalchemy.types.JSON>` field to store
the actual token received from the OAuth provider
"""
@declared_attr
def __tablename__(cls):
return f"flask_dance_{cls.__name__.lower()}"
id = Column(Integer, primary_key=True)
provider = Column(String(50), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
token = Column(MutableDict.as_mutable(JSON), nullable=False)
def __repr__(self):
parts = []
parts.append(self.__class__.__name__)
if self.id:
parts.append(f"id={self.id}")
if self.provider:
parts.append(f'provider="{self.provider}"')
return "<{}>".format(" ".join(parts))
[docs]
class SQLAlchemyStorage(BaseStorage):
"""
Stores and retrieves OAuth tokens using a relational database through
the `SQLAlchemy`_ ORM.
.. _SQLAlchemy: http://www.sqlalchemy.org/
"""
[docs]
def __init__(
self,
model,
session,
user=None,
user_id=None,
user_required=None,
anon_user=None,
cache=None,
):
"""
Args:
model: The SQLAlchemy model class that represents the OAuth token
table in the database. At a minimum, it must have a
``provider`` column and a ``token`` column. If tokens are to be
associated with individual users in the application, it must
also have a ``user`` relationship to your User model.
It is recommended, though not required, that your model class
inherit from
:class:`~flask_dance.consumer.storage.sqla.OAuthConsumerMixin`.
session:
The :class:`SQLAlchemy session <sqlalchemy.orm.session.Session>`
for the database. If you're using `Flask-SQLAlchemy`_, this is
``db.session``.
user:
If you want OAuth tokens to be associated with individual users
in your application, this is a reference to the user that you
want to use for the current request. It can be an actual User
object, a function that returns a User object, or a proxy to the
User object. If you're using `Flask-Login`_, this is
:attr:`~flask.ext.login.current_user`.
user_id:
If you want to pass an identifier for a user instead of an actual
User object, use this argument instead. Sometimes it can save
a database query or two. If both ``user`` and ``user_id`` are
provided, ``user_id`` will take precendence.
user_required:
If set to ``True``, an exception will be raised if you try to
set or retrieve an OAuth token without an associated user.
If set to ``False``, OAuth tokens can be set with or without
an associated user. The default is auto-detection: it will
be ``True`` if you pass a ``user`` or ``user_id`` parameter,
``False`` otherwise.
anon_user:
If anonymous users are represented by a class in your application,
provide that class here. If you are using `Flask-Login`_,
anonymous users are represented by the
:class:`flask_login.AnonymousUserMixin` class, but you don't have
to provide that -- Flask-Dance treats it as the default.
cache:
An instance of `Flask-Caching`_. Providing a caching system is
highly recommended, but not required.
.. _Flask-SQLAlchemy: http://pythonhosted.org/Flask-SQLAlchemy/
.. _Flask-Login: https://flask-login.readthedocs.io/
.. _Flask-Caching: https://flask-caching.readthedocs.io/en/latest/
"""
self.model = model
self.session = session
self.user = user
self.user_id = user_id
if user_required is None:
self.user_required = user is not None or user_id is not None
else:
self.user_required = user_required
self.anon_user = anon_user or AnonymousUserMixin
self.cache = cache or FakeCache()
def make_cache_key(self, blueprint, user=None, user_id=None):
uid = first([user_id, self.user_id, blueprint.config.get("user_id")])
if not uid:
u = first(
_get_real_user(ref, self.anon_user)
for ref in (user, self.user, blueprint.config.get("user"))
)
uid = getattr(u, "id", u)
return "flask_dance_token|{name}|{user_id}".format(
name=blueprint.name, user_id=uid
)
[docs]
def get(self, blueprint, user=None, user_id=None):
"""When you have a statement in your code that says
"if <provider>.authorized:" (for example "if google.authorized:"),
a long string of function calls result in this function being used to
check the Flask server's cache and database for any records associated
with the current_user. The `user` and `user_id` parameters are actually
not set in that case (see base.py:token(), that's what calls this
function), so the user information is instead loaded from the
current_user (if that's what you specified when you created the
blueprint) with blueprint.config.get('user_id').
:param blueprint:
:param user:
:param user_id:
:return:
"""
# check cache
cache_key = self.make_cache_key(blueprint=blueprint, user=user, user_id=user_id)
token = self.cache.get(cache_key)
if token:
return token
# if not cached, make database queries
query = self.session.query(self.model).filter_by(provider=blueprint.name)
uid = first([user_id, self.user_id, blueprint.config.get("user_id")])
u = first(
_get_real_user(ref, self.anon_user)
for ref in (user, self.user, blueprint.config.get("user"))
)
if self.user_required and not u and not uid:
raise ValueError("Cannot get OAuth token without an associated user")
# check for user ID
if hasattr(self.model, "user_id") and uid:
query = query.filter_by(user_id=uid)
# check for user (relationship property)
elif hasattr(self.model, "user") and u:
query = query.filter_by(user=u)
# if we have the property, but not value, filter by None
elif hasattr(self.model, "user_id"):
query = query.filter_by(user_id=None)
# run query
try:
token = query.one().token
except NoResultFound:
token = None
# cache the result
self.cache.set(cache_key, token)
return token
def set(self, blueprint, token, user=None, user_id=None):
uid = first([user_id, self.user_id, blueprint.config.get("user_id")])
u = first(
_get_real_user(ref, self.anon_user)
for ref in (user, self.user, blueprint.config.get("user"))
)
if self.user_required and not u and not uid:
raise ValueError("Cannot set OAuth token without an associated user")
# if there was an existing model, delete it
existing_query = self.session.query(self.model).filter_by(
provider=blueprint.name
)
# check for user ID
has_user_id = hasattr(self.model, "user_id")
if has_user_id and uid:
existing_query = existing_query.filter_by(user_id=uid)
# check for user (relationship property)
has_user = hasattr(self.model, "user")
if has_user and u:
existing_query = existing_query.filter_by(user=u)
# queue up delete query -- won't be run until commit()
existing_query.delete()
# create a new model for this token
kwargs = {"provider": blueprint.name, "token": token}
if has_user_id and uid:
kwargs["user_id"] = uid
if has_user and u:
kwargs["user"] = u
self.session.add(self.model(**kwargs))
# commit to delete and add simultaneously
self.session.commit()
# invalidate cache
self.cache.delete(
self.make_cache_key(blueprint=blueprint, user=user, user_id=user_id)
)
def delete(self, blueprint, user=None, user_id=None):
query = self.session.query(self.model).filter_by(provider=blueprint.name)
uid = first([user_id, self.user_id, blueprint.config.get("user_id")])
u = first(
_get_real_user(ref, self.anon_user)
for ref in (user, self.user, blueprint.config.get("user"))
)
if self.user_required and not u and not uid:
raise ValueError("Cannot delete OAuth token without an associated user")
# check for user ID
if hasattr(self.model, "user_id") and uid:
query = query.filter_by(user_id=uid)
# check for user (relationship property)
elif hasattr(self.model, "user") and u:
query = query.filter_by(user=u)
# if we have the property, but not value, filter by None
elif hasattr(self.model, "user_id"):
query = query.filter_by(user_id=None)
# run query
query.delete()
self.session.commit()
# invalidate cache
self.cache.delete(
self.make_cache_key(blueprint=blueprint, user=user, user_id=user_id)
)
def _get_real_user(user, anon_user=None):
"""
Given a "user" that could be:
* a real user object
* a function that returns a real user object
* a LocalProxy to a real user object (like Flask-Login's ``current_user``)
This function returns the real user object, regardless of which we have.
"""
if hasattr(user, "_get_current_object"):
# this is a proxy
user = user._get_current_object()
if callable(user):
# this is a function
user = user()
if anon_user and isinstance(user, anon_user):
return None
return user