Skip to content

Commit 471dd8d

Browse files
authored
Refactor and add Pyth replicator provider (#11)
* Add provider_engine to allow multiple providers * Refactor and add Pythnet replicator * Cleanup * Update coingecko product * Minor code refactor * Improve logs
1 parent 44963ee commit 471dd8d

File tree

10 files changed

+286
-144
lines changed

10 files changed

+286
-144
lines changed

config/config.toml

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
1+
[publisher]
2+
3+
# Set it to either 'coin_gecko' or 'pyth_replicator'. You need to provide
4+
# the configuration for the chosen engine as described below.
5+
provider_engine = 'pyth_replicator'
6+
7+
product_update_interval_secs = 10
8+
19
[publisher.pythd]
210
endpoint = 'ws://127.0.0.1:8910'
311

4-
[publisher.coin_gecko]
5-
update_interval_secs = 15
6-
confidence_ratio_bps = 10
12+
# [publisher.coin_gecko]
13+
# update_interval_secs = 15
14+
# confidence_ratio_bps = 10
15+
16+
# [[publisher.coin_gecko.products]]
17+
# symbol = 'Crypto.BTC/USD'
18+
# coin_gecko_id = 'bitcoin'
719

8-
[[ publisher.products ]]
9-
pythd_symbol = 'Crypto.BTC/USD'
10-
coin_gecko_id = 'bitcoin'
20+
[publisher.pyth_replicator]
21+
http_endpoint = 'https://pythnet.rpcpool.com'
22+
ws_endpoint = 'wss://pythnet.rpcpool.com'
23+
first_mapping = 'AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J'
24+
program_key = 'FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH'

publisher/__main__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
import typed_settings as ts
77
import click
88
import logging
9-
from structlog import get_logger
9+
import structlog
1010

1111
_DEFAULT_CONFIG_PATH = os.path.join("config", "config.toml")
1212

13-
logging.basicConfig(
14-
level=logging.DEBUG,
15-
)
1613

17-
log = get_logger()
14+
log_level = logging._nameToLevel[os.environ.get("LOG_LEVEL", "DEBUG").upper()]
15+
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(log_level))
16+
17+
log = structlog.get_logger()
1818

1919

2020
@click.command()

publisher/coin_gecko.py

Lines changed: 0 additions & 40 deletions
This file was deleted.

publisher/config.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33

44

55
@ts.settings
6-
class Product:
7-
# The value of attr_dict["symbol"] for this product, which will be used to retrieve the price account
8-
pythd_symbol: str
6+
class CoinGeckoProduct:
7+
# Symbol name. e.g., Crypto.BTC/USD
8+
symbol: str
99
# The CoinGecko API ID for this product, used to query reference prices
1010
coin_gecko_id: str
1111

@@ -17,31 +17,28 @@ class Pythd:
1717

1818

1919
@ts.settings
20-
class CoinGecko:
20+
class CoinGeckoConfig:
2121
# How often to poll CoinGecko for price information
2222
update_interval_secs: int
2323
# The confidence interval rate (to the price) in basis points to use for CoinGecko updates
2424
confidence_ratio_bps: int
25+
products: List[CoinGeckoProduct]
2526

2627

2728
@ts.settings
28-
class AUST:
29-
# The Terra RPC node to use to query contracts
30-
terra_rpc_node: str
31-
# The Chain ID to connect to
32-
chain_id: str
33-
# The address of the Anchor Money Market contract to query the AUST exchange rate from
34-
anchor_money_market_contract_address: str
35-
# How often to query the exchange rate from the Anchor Money Market contract
36-
update_interval_secs: int
37-
# The Pythd symbol
38-
pythd_symbol: str
39-
# The confidence interval in basis points
40-
confidence_bps: int
29+
class PythReplicatorConfig:
30+
http_endpoint: str
31+
ws_endpoint: str
32+
first_mapping: str
33+
program_key: str
34+
staleness_time_in_secs: int = ts.option(default=30)
35+
account_update_interval_secs: int = ts.option(default=300)
4136

4237

4338
@ts.settings
4439
class Config:
40+
provider_engine: str
4541
pythd: Pythd
46-
products: List[Product]
47-
coin_gecko: Optional[CoinGecko] = ts.option(default=None)
42+
product_update_interval_secs: int = ts.option(default=60)
43+
coin_gecko: Optional[CoinGeckoConfig] = ts.option(default=None)
44+
pyth_replicator: Optional[PythReplicatorConfig] = ts.option(default=None)

publisher/provider.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dataclasses import dataclass
2+
from typing import List, Optional, Protocol
3+
4+
Symbol = str
5+
6+
7+
@dataclass
8+
class Price:
9+
price: float
10+
conf: float
11+
12+
13+
class Provider(Protocol):
14+
def upd_products(self, product_symbols: List[Symbol]):
15+
...
16+
17+
def start(self):
18+
...
19+
20+
def latest_price(self, symbol: Symbol) -> Optional[Price]:
21+
...

publisher/providers/coin_gecko.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import asyncio
2+
from typing import Dict, List, Optional
3+
from pycoingecko import CoinGeckoAPI
4+
from structlog import get_logger
5+
6+
from publisher.provider import Price, Provider, Symbol
7+
from ..config import CoinGeckoConfig
8+
9+
log = get_logger()
10+
11+
Id = str # The "API id" of the CoinGecko price, listed on CoinGecko page for each coin.
12+
13+
USD = "usd"
14+
15+
16+
class CoinGecko(Provider):
17+
def __init__(self, config: CoinGeckoConfig) -> None:
18+
self._api: CoinGeckoAPI = CoinGeckoAPI()
19+
self._prices: Dict[Id, float] = {}
20+
self._symbol_to_id: Dict[Symbol, Id] = {
21+
product.symbol: product.coin_gecko_id for product in config.products
22+
}
23+
self._config = config
24+
25+
def start(self) -> None:
26+
asyncio.create_task(self._update_loop())
27+
28+
def upd_products(self, product_symbols: List[Symbol]) -> None:
29+
new_prices = {}
30+
for coin_gecko_product in self._config.products:
31+
if coin_gecko_product.symbol in product_symbols:
32+
id = coin_gecko_product.coin_gecko_id
33+
new_prices[id] = self._prices.get(id, None)
34+
else:
35+
raise ValueError(
36+
f"{coin_gecko_product.symbol} not found in available products"
37+
)
38+
39+
self._prices = new_prices
40+
41+
async def _update_loop(self) -> None:
42+
while True:
43+
self._update_prices()
44+
await asyncio.sleep(self._config.update_interval_secs)
45+
46+
def _update_prices(self) -> None:
47+
result = self._api.get_price(
48+
ids=list(self._prices.keys()), vs_currencies=USD, precision=18
49+
)
50+
for id_, prices in result.items():
51+
self._prices[id_] = prices[USD]
52+
log.info("updated prices from CoinGecko", prices=self._prices)
53+
54+
def _get_price(self, id: Id) -> float:
55+
return self._prices.get(id, None)
56+
57+
def latest_price(self, symbol: Symbol) -> Optional[Price]:
58+
id = self._symbol_to_id.get(symbol)
59+
if not id:
60+
return None
61+
62+
price = self._get_price(id)
63+
if not price:
64+
return None
65+
return Price(price, price * self._config.confidence_ratio_bps / 10000)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import asyncio
2+
from typing import Dict, List, Optional, Tuple
3+
from pythclient.pythclient import PythClient
4+
from pythclient.pythaccounts import PythPriceAccount
5+
import time
6+
7+
8+
from structlog import get_logger
9+
10+
from publisher.provider import Price, Provider, Symbol
11+
12+
from ..config import PythReplicatorConfig
13+
14+
log = get_logger()
15+
16+
UnixTimestamp = int
17+
18+
19+
class PythReplicator(Provider):
20+
def __init__(self, config: PythReplicatorConfig) -> None:
21+
self._config = config
22+
self._client = PythClient(
23+
solana_endpoint=config.http_endpoint,
24+
solana_ws_endpoint=config.ws_endpoint,
25+
first_mapping_account_key=config.first_mapping,
26+
program_key=config.program_key,
27+
)
28+
self._prices: Dict[
29+
str, Tuple[float | None, float | None, UnixTimestamp | None]
30+
] = {}
31+
32+
def start(self) -> None:
33+
asyncio.create_task(self._ws_loop())
34+
35+
async def _ws_loop(self) -> None:
36+
self._ws = self._client.create_watch_session()
37+
log.info("Creating Pyth replicator WS")
38+
39+
await self._ws.connect()
40+
await self._ws.program_subscribe(
41+
self._config.program_key, await self._client.get_all_accounts()
42+
)
43+
44+
asyncio.create_task(self._update_accounts_loop())
45+
46+
while True:
47+
update = await self._ws.next_update()
48+
log.debug("Received a WS update", account_key=update.key, slot=update.slot)
49+
if isinstance(update, PythPriceAccount):
50+
symbol = update.product.symbol
51+
52+
self._prices[symbol] = [
53+
update.aggregate_price,
54+
update.aggregate_price_confidence_interval,
55+
update.timestamp,
56+
]
57+
58+
log.info(
59+
"Received a price update", symbol=symbol, price=self._prices[symbol]
60+
)
61+
62+
async def _update_accounts_loop(self) -> None:
63+
while True:
64+
log.info("Update Pyth accounts")
65+
await self._client.refresh_products()
66+
await self._client.refresh_all_prices()
67+
self._ws.update_program_accounts(
68+
self._config.program_key, await self._client.get_all_accounts()
69+
)
70+
log.info("Finished updating Pyth accounts")
71+
72+
await asyncio.sleep(self._config.account_update_interval_secs)
73+
74+
def upd_products(self, _: List[Symbol]) -> None:
75+
# This provider stores all the possible feeds and
76+
# does not care about the desired products as knowing
77+
# them does not improve the performance of the replicator
78+
# websocket. Although the websocket filters the given accounts
79+
# but this filtering happens in the client-side and not on the server-side.
80+
pass
81+
82+
def latest_price(self, symbol: Symbol) -> Optional[Price]:
83+
price, conf, timestamp = self._prices.get(symbol, [None, None, None])
84+
85+
if not price or not conf or not timestamp:
86+
return None
87+
88+
if time.time() - timestamp > self._config.staleness_time_in_secs:
89+
return None
90+
91+
return Price(price, conf)

0 commit comments

Comments
 (0)