Skip to content

Commit

Permalink
Merge pull request bitrich-info#261 from badgerwithagun/coinbasepro-x…
Browse files Browse the repository at this point in the history
…change13

Rename GDAX to Coinbase Pro, bump XChange dependency to 4.3.13
  • Loading branch information
Flemingjp committed Dec 27, 2018
2 parents b1de781 + f47b2cf commit e27763b
Show file tree
Hide file tree
Showing 30 changed files with 177 additions and 169 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>info.bitrich.xchange-stream</groupId>
<artifactId>xchange-stream-parent</artifactId>
<packaging>pom</packaging>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>

<modules>
<module>xchange-stream-core</module>
Expand All @@ -19,7 +19,7 @@
<module>xchange-okcoin</module>
<module>xchange-poloniex</module>
<module>xchange-coinmate</module>
<module>xchange-gdax</module>
<module>xchange-coinbasepro</module>
<module>xchange-bitfinex</module>
<module>xchange-bitmex</module>
<module>xchange-poloniex2</module>
Expand Down Expand Up @@ -80,7 +80,7 @@
</repositories>

<properties>
<xchange.version>4.3.11</xchange.version>
<xchange.version>4.3.13</xchange.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down
2 changes: 1 addition & 1 deletion service-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-pubnub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-pusher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion service-wamp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-binance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-bitfinex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-bitflyer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-bitmex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-bitstamp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion xchange-cexio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
6 changes: 3 additions & 3 deletions xchange-gdax/pom.xml → xchange-coinbasepro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.3.12-SNAPSHOT</version>
<version>4.3.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>xchange-gdax</artifactId>
<artifactId>xchange-coinbasepro</artifactId>

<dependencies>
<dependency>
Expand All @@ -22,7 +22,7 @@
</dependency>
<dependency>
<groupId>org.knowm.xchange</groupId>
<artifactId>xchange-gdax</artifactId>
<artifactId>xchange-coinbasepro</artifactId>
<version>${xchange.version}</version>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
package info.bitrich.xchangestream.gdax;
package info.bitrich.xchangestream.coinbasepro;

import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.coinbasepro.CoinbaseProExchange;
import org.knowm.xchange.coinbasepro.dto.account.CoinbaseProWebsocketAuthData;
import org.knowm.xchange.coinbasepro.service.CoinbaseProAccountServiceRaw;

import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.reactivex.Completable;
import io.reactivex.Observable;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.gdax.GDAXExchange;
import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData;
import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw;

/**
* GDAX Streaming Exchange. Connects to live WebSocket feed.
* CoinbasePro Streaming Exchange. Connects to live WebSocket feed.
*/
public class GDAXStreamingExchange extends GDAXExchange implements StreamingExchange {
private static final String API_URI = "wss://ws-feed.gdax.com";
public class CoinbaseProStreamingExchange extends CoinbaseProExchange implements StreamingExchange {
private static final String API_URI = "wss://ws-feed.pro.coinbase.com";

private GDAXStreamingService streamingService;
private GDAXStreamingMarketDataService streamingMarketDataService;
private CoinbaseProStreamingService streamingService;
private CoinbaseProStreamingMarketDataService streamingMarketDataService;

public GDAXStreamingExchange() { }
public CoinbaseProStreamingExchange() { }

@Override
protected void initServices() {
Expand All @@ -32,18 +33,18 @@ public Completable connect(ProductSubscription... args) {
if (args == null || args.length == 0)
throw new UnsupportedOperationException("The ProductSubscription must be defined!");
ExchangeSpecification exchangeSpec = getExchangeSpecification();
this.streamingService = new GDAXStreamingService(API_URI, () -> authData(exchangeSpec));
this.streamingMarketDataService = new GDAXStreamingMarketDataService(this.streamingService);
this.streamingService = new CoinbaseProStreamingService(API_URI, () -> authData(exchangeSpec));
this.streamingMarketDataService = new CoinbaseProStreamingMarketDataService(streamingService);
streamingService.subscribeMultipleCurrencyPairs(args);

return streamingService.connect();
}

private GDAXWebsocketAuthData authData(ExchangeSpecification exchangeSpec) {
GDAXWebsocketAuthData authData = null;
private CoinbaseProWebsocketAuthData authData(ExchangeSpecification exchangeSpec) {
CoinbaseProWebsocketAuthData authData = null;
if ( exchangeSpec.getApiKey() != null ) {
try {
GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) getAccountService();
CoinbaseProAccountServiceRaw rawAccountService = (CoinbaseProAccountServiceRaw) getAccountService();
authData = rawAccountService.getWebsocketAuthData();
}
catch (Exception e) {
Expand All @@ -56,7 +57,7 @@ private GDAXWebsocketAuthData authData(ExchangeSpecification exchangeSpec) {

@Override
public Completable disconnect() {
GDAXStreamingService service = this.streamingService;
CoinbaseProStreamingService service = streamingService;
streamingService = null;
streamingMarketDataService = null;
return service.disconnect();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package info.bitrich.xchangestream.gdax;
package info.bitrich.xchangestream.coinbasepro;

import static io.netty.util.internal.StringUtil.isNullOrEmpty;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptOrderBook;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptTicker;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptTradeHistory;
import static org.knowm.xchange.gdax.GDAXAdapters.adaptTrades;
import static org.knowm.xchange.coinbasepro.CoinbaseProAdapters.adaptOrderBook;
import static org.knowm.xchange.coinbasepro.CoinbaseProAdapters.adaptTicker;
import static org.knowm.xchange.coinbasepro.CoinbaseProAdapters.adaptTradeHistory;
import static org.knowm.xchange.coinbasepro.CoinbaseProAdapters.adaptTrades;

import java.math.BigDecimal;
import java.util.HashMap;
Expand All @@ -13,37 +13,33 @@
import java.util.SortedMap;
import java.util.TreeMap;

import org.knowm.xchange.coinbasepro.dto.marketdata.CoinbaseProProductBook;
import org.knowm.xchange.coinbasepro.dto.marketdata.CoinbaseProProductTicker;
import org.knowm.xchange.coinbasepro.dto.marketdata.CoinbaseProTrade;
import org.knowm.xchange.coinbasepro.dto.trade.CoinbaseProFill;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.marketdata.Trades;
import org.knowm.xchange.gdax.dto.marketdata.GDAXProductBook;
import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker;
import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade;
import org.knowm.xchange.gdax.dto.trade.GDAXFill;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.DeserializationFeature;

import com.fasterxml.jackson.databind.ObjectMapper;

import info.bitrich.xchangestream.coinbasepro.dto.CoinbaseProWebSocketTransaction;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;

/**
* Created by luca on 4/3/17.
*/
public class GDAXStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(GDAXStreamingMarketDataService.class);
public class CoinbaseProStreamingMarketDataService implements StreamingMarketDataService {

private final GDAXStreamingService service;
private final CoinbaseProStreamingService service;
private final Map<CurrencyPair, SortedMap<BigDecimal, String>> bids = new HashMap<>();
private final Map<CurrencyPair, SortedMap<BigDecimal, String>> asks = new HashMap<>();

GDAXStreamingMarketDataService(GDAXStreamingService service) {
CoinbaseProStreamingMarketDataService(CoinbaseProStreamingService service) {
this.service = service;
}

Expand All @@ -68,8 +64,8 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a

final int maxDepth = (args.length > 0 && args[0] instanceof Integer) ? (int) args[0] : 100;

Observable<GDAXWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), GDAXWebSocketTransaction.class));
Observable<CoinbaseProWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), CoinbaseProWebSocketTransaction.class));

return subscribedChannel
.filter(message -> !isNullOrEmpty(message.getType()) &&
Expand All @@ -81,39 +77,39 @@ public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... a
asks.put(currencyPair, new TreeMap<>());
}

GDAXProductBook productBook = s.toGDAXProductBook(bids.get(currencyPair), asks.get(currencyPair), maxDepth);
CoinbaseProProductBook productBook = s.toCoinbaseProProductBook(bids.get(currencyPair), asks.get(currencyPair), maxDepth);
return adaptOrderBook(productBook, currencyPair);
});
}

/**
* Returns an Observable of {@link GDAXProductTicker}, not converted to {@link Ticker}
* Returns an Observable of {@link CoinbaseProProductTicker}, not converted to {@link Ticker}
*
* @param currencyPair the currency pair.
* @param args optional arguments.
* @return an Observable of {@link GDAXProductTicker}.
* @return an Observable of {@link CoinbaseProProductTicker}.
*/
public Observable<GDAXProductTicker> getRawTicker(CurrencyPair currencyPair, Object... args) {
public Observable<CoinbaseProProductTicker> getRawTicker(CurrencyPair currencyPair, Object... args) {
if (!containsPair(service.getProduct().getTicker(), currencyPair))
throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for ticker", currencyPair));

String channelName = currencyPair.base.toString() + "-" + currencyPair.counter.toString();

final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

Observable<GDAXWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), GDAXWebSocketTransaction.class));
Observable<CoinbaseProWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), CoinbaseProWebSocketTransaction.class));

return subscribedChannel
.filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("match") &&
message.getProductId().equals(channelName))
.map(GDAXWebSocketTransaction::toGDAXProductTicker);
.map(CoinbaseProWebSocketTransaction::toCoinbaseProProductTicker);
}

/**
* Returns the GDAX ticker converted to the normalized XChange object.
* GDAX does not directly provide ticker data via web service.
* As stated by: https://docs.gdax.com/#get-product-ticker, we can just listen for 'match' messages.
* Returns the CoinbasePro ticker converted to the normalized XChange object.
* CoinbasePro does not directly provide ticker data via web service.
* As stated by: https://docs.coinbasepro.com/#get-product-ticker, we can just listen for 'match' messages.
*
* @param currencyPair Currency pair of the ticker
* @param args optional parameters.
Expand All @@ -128,13 +124,13 @@ public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {

final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

Observable<GDAXWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), GDAXWebSocketTransaction.class));
Observable<CoinbaseProWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), CoinbaseProWebSocketTransaction.class));

return subscribedChannel
.filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("ticker") &&
message.getProductId().equals(channelName))
.map(s -> adaptTicker(s.toGDAXProductTicker(), s.toGDAXProductStats(), currencyPair));
.map(s -> adaptTicker(s.toCoinbaseProProductTicker(), s.toCoinbaseProProductStats(), currencyPair));
}

@Override
Expand All @@ -146,18 +142,18 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {

final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

Observable<GDAXWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), GDAXWebSocketTransaction.class));
Observable<CoinbaseProWebSocketTransaction> subscribedChannel = service.subscribeChannel(channelName)
.map(s -> mapper.readValue(s.toString(), CoinbaseProWebSocketTransaction.class));

return subscribedChannel
.filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("match") &&
message.getProductId().equals(channelName))
.map(s -> {
Trades adaptedTrades = null;
if ( s.getUserId() != null )
adaptedTrades = adaptTradeHistory(new GDAXFill[]{s.toGDAXFill()});
adaptedTrades = adaptTradeHistory(new CoinbaseProFill[]{s.toCoinbaseProFill()});
else
adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair);
adaptedTrades = adaptTrades(new CoinbaseProTrade[]{s.toCoinbaseProTrade()}, currencyPair);
return adaptedTrades.getTrades().get(0);
}
);
Expand Down
Loading

0 comments on commit e27763b

Please sign in to comment.