Skip to content

feat: Implement historical pricing for message cost recalculation #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"""

from decimal import Decimal
from typing import Dict
from alembic import op
import sqlalchemy as sa
import logging
Expand All @@ -17,7 +16,8 @@
from aleph.db.accessors.cost import make_costs_upsert_query
from aleph.db.accessors.messages import get_message_by_item_hash
from aleph.services.cost import _is_confidential_vm, get_detailed_costs, CostComputableContent
from aleph.types.cost import ProductComputeUnit, ProductPrice, ProductPriceOptions, ProductPriceType, ProductPricing
from aleph.services.pricing_utils import build_default_pricing_model
from aleph.types.cost import ProductPriceType
from aleph.types.db_session import DbSession

logger = logging.getLogger("alembic")
Expand All @@ -30,48 +30,6 @@
depends_on = None


hardcoded_initial_price: Dict[ProductPriceType, ProductPricing] = {
ProductPriceType.PROGRAM: ProductPricing(
ProductPriceType.PROGRAM,
ProductPrice(
ProductPriceOptions("0.05", "0.000000977"),
ProductPriceOptions("200", "0.011")
),
ProductComputeUnit(1, 2048, 2048)
),
ProductPriceType.PROGRAM_PERSISTENT: ProductPricing(
ProductPriceType.PROGRAM_PERSISTENT,
ProductPrice(
ProductPriceOptions("0.05", "0.000000977"),
ProductPriceOptions("1000", "0.055")
),
ProductComputeUnit(1, 20480, 2048)
),
ProductPriceType.INSTANCE: ProductPricing(
ProductPriceType.INSTANCE,
ProductPrice(
ProductPriceOptions("0.05", "0.000000977"),
ProductPriceOptions("1000", "0.055")
),
ProductComputeUnit(1, 20480, 2048)
),
ProductPriceType.INSTANCE_CONFIDENTIAL: ProductPricing(
ProductPriceType.INSTANCE_CONFIDENTIAL,
ProductPrice(
ProductPriceOptions("0.05", "0.000000977"),
ProductPriceOptions("2000", "0.11")
),
ProductComputeUnit(1, 20480, 2048)
),
ProductPriceType.STORAGE: ProductPricing(
ProductPriceType.STORAGE,
ProductPrice(
ProductPriceOptions("0.333333333"),
)
),
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we shouldn't have to modify a migration already executed on the rest of the CCNs to prevent unconsistencies on the database.

def _get_product_instance_type(
content: InstanceContent
) -> ProductPriceType:
Expand Down Expand Up @@ -112,12 +70,15 @@ def do_calculate_costs() -> None:

logger.debug("INIT: CALCULATE COSTS FOR: %r", msg_item_hashes)

# Build the initial pricing model from DEFAULT_PRICE_AGGREGATE
initial_pricing_model = build_default_pricing_model()

for item_hash in msg_item_hashes:
message = get_message_by_item_hash(session, item_hash)
if message:
content = message.parsed_content
type = _get_product_price_type(content)
pricing = hardcoded_initial_price[type]
pricing = initial_pricing_model[type]
costs = get_detailed_costs(session, content, message.item_hash, pricing)

if len(costs) > 0:
Expand Down
122 changes: 122 additions & 0 deletions src/aleph/services/pricing_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""
Utility functions for pricing model creation and management.
"""

import datetime as dt
from typing import Dict, List, Union

from aleph.db.accessors.aggregates import (
get_aggregate_elements,
merge_aggregate_elements,
)
from aleph.db.models import AggregateElementDb
from aleph.toolkit.constants import (
DEFAULT_PRICE_AGGREGATE,
PRICE_AGGREGATE_KEY,
PRICE_AGGREGATE_OWNER,
)
from aleph.types.cost import ProductPriceType, ProductPricing
from aleph.types.db_session import DbSession


def build_pricing_model_from_aggregate(
aggregate_content: Dict[Union[ProductPriceType, str], dict]
) -> Dict[ProductPriceType, ProductPricing]:
"""
Build a complete pricing model from an aggregate content dictionary.

This function converts the DEFAULT_PRICE_AGGREGATE format or any pricing aggregate
content into a dictionary of ProductPricing objects that can be used by the cost
calculation functions.

Args:
aggregate_content: Dictionary containing pricing information with ProductPriceType as keys

Returns:
Dictionary mapping ProductPriceType to ProductPricing objects
"""
pricing_model: Dict[ProductPriceType, ProductPricing] = {}

for price_type, pricing_data in aggregate_content.items():
try:
price_type = ProductPriceType(price_type)
pricing_model[price_type] = ProductPricing.from_aggregate(
price_type, aggregate_content
)
except (KeyError, ValueError) as e:
# Log the error but continue processing other price types
import logging

logger = logging.getLogger(__name__)
logger.warning(f"Failed to parse pricing for {price_type}: {e}")

return pricing_model


def build_default_pricing_model() -> Dict[ProductPriceType, ProductPricing]:
"""
Build the default pricing model from DEFAULT_PRICE_AGGREGATE constant.

Returns:
Dictionary mapping ProductPriceType to ProductPricing objects
"""
return build_pricing_model_from_aggregate(DEFAULT_PRICE_AGGREGATE)


def get_pricing_aggregate_history(session: DbSession) -> List[AggregateElementDb]:
"""
Get all pricing aggregate updates in chronological order.

Args:
session: Database session

Returns:
List of AggregateElementDb objects ordered by creation_datetime
"""
aggregate_elements = get_aggregate_elements(
session=session, owner=PRICE_AGGREGATE_OWNER, key=PRICE_AGGREGATE_KEY
)
return list(aggregate_elements)


def get_pricing_timeline(
session: DbSession,
) -> List[tuple[dt.datetime, Dict[ProductPriceType, ProductPricing]]]:
"""
Get the complete pricing timeline with timestamps and pricing models.

This function returns a chronologically ordered list of pricing changes,
useful for processing messages in chronological order and applying the
correct pricing at each point in time.

This properly merges aggregate elements up to each point in time to create
the cumulative pricing state, similar to how _update_aggregate works.

Args:
session: Database session

Returns:
List of tuples containing (timestamp, pricing_model)
"""
pricing_elements = get_pricing_aggregate_history(session)

timeline = []

# Add default pricing as the initial state
timeline.append(
(dt.datetime.min.replace(tzinfo=dt.timezone.utc), build_default_pricing_model())
)

# Build cumulative pricing models by merging elements up to each timestamp
elements_so_far = []
for element in pricing_elements:
elements_so_far.append(element)

# Merge all elements up to this point to get the cumulative state
merged_content = merge_aggregate_elements(elements_so_far)

# Build pricing model from the merged content
pricing_model = build_pricing_model_from_aggregate(merged_content)
timeline.append((element.creation_datetime, pricing_model))

return timeline
24 changes: 15 additions & 9 deletions src/aleph/toolkit/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from typing import Dict, Union

from aleph.types.cost import ProductPriceType

KiB = 1024
MiB = 1024 * 1024
GiB = 1024 * 1024 * 1024
Expand All @@ -8,8 +12,8 @@
PRICE_AGGREGATE_OWNER = "0xFba561a84A537fCaa567bb7A2257e7142701ae2A"
PRICE_AGGREGATE_KEY = "pricing"
PRICE_PRECISION = 18
DEFAULT_PRICE_AGGREGATE = {
"program": {
DEFAULT_PRICE_AGGREGATE: Dict[Union[ProductPriceType, str], dict] = {
ProductPriceType.PROGRAM: {
"price": {
"storage": {"payg": "0.000000977", "holding": "0.05"},
"compute_unit": {"payg": "0.011", "holding": "200"},
Expand All @@ -28,8 +32,8 @@
"memory_mib": 2048,
},
},
"storage": {"price": {"storage": {"holding": "0.333333333"}}},
"instance": {
ProductPriceType.STORAGE: {"price": {"storage": {"holding": "0.333333333"}}},
ProductPriceType.INSTANCE: {
"price": {
"storage": {"payg": "0.000000977", "holding": "0.05"},
"compute_unit": {"payg": "0.055", "holding": "1000"},
Expand All @@ -48,8 +52,10 @@
"memory_mib": 2048,
},
},
"web3_hosting": {"price": {"fixed": 50, "storage": {"holding": "0.333333333"}}},
"program_persistent": {
ProductPriceType.WEB3_HOSTING: {
"price": {"fixed": 50, "storage": {"holding": "0.333333333"}}
},
ProductPriceType.PROGRAM_PERSISTENT: {
"price": {
"storage": {"payg": "0.000000977", "holding": "0.05"},
"compute_unit": {"payg": "0.055", "holding": "1000"},
Expand All @@ -68,7 +74,7 @@
"memory_mib": 2048,
},
},
"instance_gpu_premium": {
ProductPriceType.INSTANCE_GPU_PREMIUM: {
"price": {
"storage": {"payg": "0.000000977"},
"compute_unit": {"payg": "0.56"},
Expand All @@ -93,7 +99,7 @@
"memory_mib": 6144,
},
},
"instance_confidential": {
ProductPriceType.INSTANCE_CONFIDENTIAL: {
"price": {
"storage": {"payg": "0.000000977", "holding": "0.05"},
"compute_unit": {"payg": "0.11", "holding": "2000"},
Expand All @@ -112,7 +118,7 @@
"memory_mib": 2048,
},
},
"instance_gpu_standard": {
ProductPriceType.INSTANCE_GPU_STANDARD: {
"price": {
"storage": {"payg": "0.000000977"},
"compute_unit": {"payg": "0.28"},
Expand Down
Loading
Loading