Source code for pallas.caching

# Copyright 2020 Akamai Technologies, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Caching of Athena queries.
"""

from __future__ import annotations

import hashlib
import logging
from urllib.parse import urlencode

from pallas.results import QueryResults
from pallas.storage import NotFoundError, Storage, storage_from_uri

logger = logging.getLogger(__name__)


def _get_execution_key(database: str | None, sql: str) -> str:
    parts = {}
    if database is not None:
        parts["database"] = database
    parts["sql"] = sql
    encoded = urlencode(parts).encode("utf-8")
    digest = hashlib.sha256(encoded).hexdigest()
    return f"query-{digest}"


def _get_results_key(execution_id: str) -> str:
    return f"results-{execution_id}"


def _load_execution_id(
    storage: Storage | None, database: str | None, sql: str
) -> str | None:
    if storage is None:
        return None
    key = _get_execution_key(database, sql)
    try:
        execution_id = storage.get(key)
    except NotFoundError:
        return None
    logger.debug(
        f"Query execution loaded from cache {storage}{key}:"
        f" QueryExecutionId={execution_id!r}"
    )
    return execution_id


def _save_execution_id(
    storage: Storage | None, database: str | None, sql: str, execution_id: str
) -> None:
    if storage is None:
        return
    key = _get_execution_key(database, sql)
    storage.set(key, execution_id)
    logger.debug(
        f"Query execution saved to cache {storage}{key}:"
        f" QueryExecutionId={execution_id!r}"
    )


def _has_results(storage: Storage | None, execution_id: str) -> bool:
    if storage is None:
        return False
    key = _get_results_key(execution_id)
    if not storage.has(key):
        return False
    logger.debug(
        f"Query results are available in cache {storage}{key}:"
        f" QueryExecutionId={execution_id!r}"
    )
    return True


def _load_results(storage: Storage | None, execution_id: str) -> QueryResults | None:
    if storage is None:
        return None
    key = _get_results_key(execution_id)
    try:
        with storage.reader(key) as stream:
            results = QueryResults.load(stream)
    except NotFoundError:
        return None
    logger.debug(
        f"Query results loaded from cache {storage}{key}:"
        f" QueryExecutionId={execution_id!r}: {len(results)} rows"
    )
    return results


def _save_results(
    storage: Storage | None, execution_id: str, results: QueryResults
) -> None:
    if storage is None:
        return
    key = _get_results_key(execution_id)
    with storage.writer(key) as stream:
        results.save(stream)
    logger.debug(
        f"Query results saved to cache {storage}{key}:"
        f" QueryExecutionId={execution_id!r}: {len(results)} rows"
    )


[docs]class AthenaCache: """ Caches queries and its results. Athena always stores results in S3, so it is possible to retrieve past results without manually copying data. This class can hold a reference to two instances of cache storage: - local, which caches both query execution IDs and query results - remote, which cache query execution IDs only. It is possible to configure one the backends, both of them, or none of them. Queries cached in the local storage can be executed without an internet connection. Queries cached in the remote storage are not executed twice, but results have to be downloaded from AWS. In theory, it is possible to use remote backend for the local cache (or vice versa), but we assume that the local cache is actually stored locally Instance of this class is returned by the :attr:`.Athena.cache` property. It can be updated to reconfigure the caching. """ local_storage: Storage | None = None remote_storage: Storage | None = None #: Can be set to False to disable caching completely. #: #: Can be updated to enable or disable the caching. enabled: bool = True #: Can be set to False to disable reading the cache. #: #: Can be updated to reconfigure the caching. read: bool = True #: Can be set to False to disable writing the cache. #: #: Can be updated to reconfigure the caching. write: bool = True #: Whether to return failed queries found in cache. #: #: When this is false, failed queries found in cache are ignored. failed: bool = False @property def local(self) -> str | None: """ URI of storage for local cache. Can be updated to reconfigure the caching. """ if self.local_storage is None: return None return self.local_storage.uri @local.setter def local(self, uri: str | None) -> None: if uri is None: self.local_storage = None else: self.local_storage = storage_from_uri(uri) @property def remote(self) -> str | None: """ URI of storage for remote cache. Can be updated to reconfigure the caching. """ if self.remote_storage is None: return None return self.remote_storage.uri @remote.setter def remote(self, uri: str | None) -> None: if uri is None: self.remote_storage = None else: self.remote_storage = storage_from_uri(uri)
[docs] def load_execution_id(self, database: str | None, sql: str) -> str | None: """ Retrieve cached query execution ID for the given SQL. Looks into both the local and the remote storage. """ if not (self.enabled and self.read): return None execution_id = _load_execution_id(self.local_storage, database, sql) if execution_id is not None: return execution_id execution_id = _load_execution_id(self.remote_storage, database, sql) if execution_id is not None: _save_execution_id(self.local_storage, database, sql, execution_id) return execution_id return None
[docs] def save_execution_id( self, database: str | None, sql: str, execution_id: str ) -> None: """ Store cached query execution ID for the given SQL. Updates both the local and the remote storage. """ if not (self.enabled and self.write): return _save_execution_id(self.remote_storage, database, sql, execution_id) _save_execution_id(self.local_storage, database, sql, execution_id)
[docs] def has_results(self, execution_id: str) -> bool: """ Checks whether results are cached for the given execution ID. Looks into the local storage only. """ if not (self.enabled and self.read): return False return _has_results(self.local_storage, execution_id)
[docs] def load_results(self, execution_id: str) -> QueryResults | None: """ Retrieve cached results for the given execution ID. Looks into the local storage only. """ if not (self.enabled and self.read): return None return _load_results(self.local_storage, execution_id)
[docs] def save_results(self, execution_id: str, results: QueryResults) -> None: """ Store cached results for the given SQL. Updates the local storage only. """ if not (self.enabled and self.write): return _save_results(self.local_storage, execution_id, results)