Source code for

# Copyright 2020 Akamai Technologies, Inc
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

Encapsulation of information returned by Athena GetQueryExecution method.

from __future__ import annotations

import datetime as dt
from typing import Any, Mapping, Optional, cast

from pallas.exceptions import get_error

unit_prefixes = ["k", "M", "G", "T"]

def format_price(v: float) -> str:
    Format price in dollars.
    if v > 1:
        return "$%.2f" % v
    return "%.2fยข" % (100 * v)

def format_size(v: float) -> str:
    Format size in bytes.

    This function assumes that 1kB = 1000B.
    if v < 1000:
        return f"{v:.0f}B"
    for prefix in unit_prefixes:
        v /= 1000
        if v < 1000:
    return f"{v:.2f}{prefix}B"

def format_time(v: dt.timedelta) -> str:
    Format time.
    if v < dt.timedelta(minutes=1):
        s = v.total_seconds()
        return f"{s:.1f}s"
    m, s = divmod(v.seconds, 60)
    return f"{m}min {s}s"

[docs]class QueryInfo: """ Information about query execution. Instances are returned by the :meth:`.Query.get_info` method. :param data: data returned by Athena GetQueryExecution API method. """ def __init__(self, data: Mapping[str, Any]) -> None: self._data = data def __repr__(self) -> str: return f"<{type(self).__name__}: {str(self)!r}>"
[docs] def __str__(self) -> str: """ Return summary info about the query execution. This is included in logs generated by the Athena client. """ return ( f"{self.state}, " f"scanned {format_size(self.scanned_bytes)} " f"in {format_time(self.execution_time)}, " f"approx. price {format_price(self.approx_price)}" )
@property def execution_id(self) -> str: """ID od the query execution.""" return cast(str, self._data["QueryExecutionId"]) @property def sql(self) -> str: """SQL query executed.""" rv = self._data["Query"] return cast(str, rv) @property def output_location(self) -> str: """URI of output location on S3 for the query""" rv = self._data["ResultConfiguration"].get("OutputLocation") return cast(str, rv) @property def database(self) -> str | None: """Name of database.""" rv = self._data["QueryExecutionContext"].get("Database") return cast(Optional[str], rv) @property def finished(self) -> bool: """Whether the query execution finished.""" return self.state in ("SUCCEEDED", "FAILED", "CANCELLED") @property def succeeded(self) -> bool: """Whether the query execution finished successfully.""" return self.state == "SUCCEEDED" @property def state(self) -> str: """State of the query execution.""" rv = self._data["Status"]["State"] return cast(str, rv) @property def state_reason(self) -> str | None: """Reason of the state of the query execution.""" rv = self._data["Status"].get("StateChangeReason") return cast(Optional[str], rv) @property def scanned_bytes(self) -> int: """Data scanned by Athena.""" rv = self._data["Statistics"].get("DataScannedInBytes", 0) return cast(int, rv) @property def execution_time(self) -> dt.timedelta: """Time spent by Athena.""" milliseconds = self._data["Statistics"].get("TotalExecutionTimeInMillis", 0) return dt.timedelta(milliseconds=milliseconds) @property def approx_price(self) -> float: price_per_tb = 5 # return price_per_tb * self.scanned_bytes / 10 ** 12
[docs] def check(self) -> None: """ Raises :class:`.AthenaQueryError` (or its subclass) if the query failed. Does not raise if the query is still running. """ if self.finished and not self.succeeded: raise get_error(self.execution_id, self.state, self.state_reason)