Skip to content

Commit

Permalink
129 improve websocket client (#130)
Browse files Browse the repository at this point in the history
Added streaming client connection events.
Implementing IDisposable in streaming rpc client.
Allowing reconnection after disconnection.
  • Loading branch information
tiago18c committed Jun 29, 2021
1 parent 0089446 commit 46e2f4f
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/Solnet.Examples/SolnetStreamingRpcTester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class SolnetStreamingRpcTester
static void Example(string[] args)
{
IStreamingRpcClient c = ClientFactory.GetStreamingClient(Cluster.MainNet);
c.Init().Wait();
c.ConnectAsync().Wait();

var sub = c.SubscribeAccountInfo(
"4tSvZvnbyzHXLMTiFonMyxZoHmFqau1XArcRCVHLZ5gX",
Expand Down
4 changes: 3 additions & 1 deletion src/Solnet.Rpc/Core/Sockets/IWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ namespace Solnet.Rpc.Core.Sockets
internal interface IWebSocket : IDisposable
{
WebSocketState State { get; }

string CloseStatusDescription { get; }
WebSocketCloseStatus? CloseStatus { get; }
Task ConnectAsync(Uri uri, CancellationToken cancellationToken);
Task CloseAsync(CancellationToken cancellationToken);
ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken);
Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken);
ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken);
Expand Down
83 changes: 78 additions & 5 deletions src/Solnet.Rpc/Core/Sockets/StreamingRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ namespace Solnet.Rpc.Core.Sockets
/// <summary>
/// Base streaming Rpc client class that abstracts the websocket handling.
/// </summary>
internal abstract class StreamingRpcClient
internal abstract class StreamingRpcClient : IDisposable
{
private SemaphoreSlim _sem;

/// <summary>
/// The web socket client abstraction.
/// </summary>
protected readonly IWebSocket ClientSocket;
protected IWebSocket ClientSocket;

private bool disposedValue;

/// <summary>
/// The logger instance.
Expand All @@ -25,6 +29,12 @@ internal abstract class StreamingRpcClient
/// <inheritdoc cref="IStreamingRpcClient.NodeAddress"/>
public Uri NodeAddress { get; }

/// <inheritdoc cref="IStreamingRpcClient.State"/>
public WebSocketState State => ClientSocket.State;

/// <inheritdoc cref="IStreamingRpcClient.ConnectionStateChangedEvent"/>
public event EventHandler<WebSocketState> ConnectionStateChangedEvent;

/// <summary>
/// The internal constructor that setups the client.
/// </summary>
Expand All @@ -36,16 +46,42 @@ protected StreamingRpcClient(string url, ILogger logger, IWebSocket socket = def
NodeAddress = new Uri(url);
ClientSocket = socket ?? new WebSocketWrapper(new ClientWebSocket());
_logger = logger;
_sem = new SemaphoreSlim(1, 1);
}

/// <summary>
/// Initializes the websocket connection and starts receiving messages asynchronously.
/// </summary>
/// <returns>Returns the task representing the asynchronous task.</returns>
public async Task Init()
public async Task ConnectAsync()
{
_sem.Wait();
if (ClientSocket.State != WebSocketState.Open)
{
await ClientSocket.ConnectAsync(NodeAddress, CancellationToken.None).ConfigureAwait(false);
_ = Task.Run(StartListening);
ConnectionStateChangedEvent?.Invoke(this, State);
}
_sem.Release();
}

/// <inheritdoc cref="IStreamingRpcClient.DisconnectAsync"/>
public async Task DisconnectAsync()
{
await ClientSocket.ConnectAsync(NodeAddress, CancellationToken.None).ConfigureAwait(false);
_ = Task.Run(StartListening);
_sem.Wait();
if (ClientSocket.State == WebSocketState.Open)
{
await ClientSocket.CloseAsync(CancellationToken.None);

//notify at the end of StartListening loop, given that it should end as soon as we terminate connection here
//and will also notify when there is a non-user triggered disconnection event

// handle disconnection cleanup
ClientSocket.Dispose();
ClientSocket = new WebSocketWrapper(new ClientWebSocket());
CleanupSubscriptions();
}
_sem.Release();
}

/// <summary>
Expand All @@ -66,6 +102,7 @@ private async Task StartListening()
}
}
_logger?.LogDebug(new EventId(), $"Stopped reading messages. ClientSocket.State changed to {ClientSocket.State}");
ConnectionStateChangedEvent?.Invoke(this, State);
}

/// <summary>
Expand Down Expand Up @@ -115,5 +152,41 @@ private async Task ReadNextMessage(CancellationToken cancellationToken = default
/// </summary>
/// <param name="messagePayload">The message payload.</param>
protected abstract void HandleNewMessage(Memory<byte> messagePayload);

/// <summary>
/// Clean up subscription objects after disconnection.
/// </summary>
protected abstract void CleanupSubscriptions();

protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
ClientSocket.Dispose();
_sem.Dispose();
}

// TODO: free unmanaged resources (unmanaged objects) and override finalizer
// TODO: set large fields to null
disposedValue = true;
}
}

// // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
// ~StreamingRpcClient()
// {
// // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
// Dispose(disposing: false);
// }

/// <inheritdoc cref="IDisposable.Dispose"/>
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}
7 changes: 7 additions & 0 deletions src/Solnet.Rpc/Core/Sockets/WebSocketWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ internal WebSocketWrapper(ClientWebSocket webSocket)
this.webSocket = webSocket;
}

public WebSocketCloseStatus? CloseStatus => webSocket.CloseStatus;

public string CloseStatusDescription => webSocket.CloseStatusDescription;

public WebSocketState State => webSocket.State;

public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken)
Expand All @@ -22,6 +26,9 @@ public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescriptio
public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
=> webSocket.ConnectAsync(uri, cancellationToken);

public Task CloseAsync(CancellationToken cancellationToken)
=> webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);

public ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken)
=> webSocket.ReceiveAsync(buffer, cancellationToken);

Expand Down
22 changes: 19 additions & 3 deletions src/Solnet.Rpc/IStreamingRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@
using Solnet.Rpc.Models;
using Solnet.Rpc.Types;
using System;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace Solnet.Rpc
{
/// <summary>
/// Represents the streaming RPC client for the solana API.
/// </summary>
public interface IStreamingRpcClient
public interface IStreamingRpcClient : IDisposable
{
/// <summary>
/// Current connection state.
/// </summary>
WebSocketState State { get; }

/// <summary>
/// Event triggered when the connection status changes between connected and disconnected.
/// </summary>
event EventHandler<WebSocketState> ConnectionStateChangedEvent;

/// <summary>
/// The address this client connects to.
/// </summary>
Expand Down Expand Up @@ -180,9 +191,14 @@ public interface IStreamingRpcClient
void Unsubscribe(SubscriptionState subscription);

/// <summary>
/// Asynchronously initializes the client connection asynchronously.
/// Asynchronously initializes the client connection and starts listening for socket messages.
/// </summary>
/// <returns>The task object representing the asynchronous operation.</returns>
Task ConnectAsync();
/// <summary>
/// Asynchronously disconnects and removes all running subscriptions.
/// </summary>
/// <returns>The task object representing the asynchronous operation.</returns>
Task Init();
Task DisconnectAsync();
}
}
17 changes: 17 additions & 0 deletions src/Solnet.Rpc/SolanaStreamingRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ internal SolanaStreamingRpcClient(string url, ILogger logger = null, IWebSocket
{
}

/// <inheritdoc cref="StreamingRpcClient.CleanupSubscriptions"/>
protected override void CleanupSubscriptions()
{
foreach(var sub in confirmedSubscriptions)
{
sub.Value.ChangeState(SubscriptionStatus.Unsubscribed, "Connection terminated");
}

foreach (var sub in unconfirmedRequests)
{
sub.Value.ChangeState(SubscriptionStatus.Unsubscribed, "Connection terminated");
}
unconfirmedRequests.Clear();
confirmedSubscriptions.Clear();
}


/// <inheritdoc cref="StreamingRpcClient.HandleNewMessage(Memory{byte})"/>
protected override void HandleNewMessage(Memory<byte> messagePayload)
{
Expand Down
34 changes: 17 additions & 17 deletions test/Solnet.Rpc.Test/SolanaStreamingClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void SubscribeAccountInfoTest()

const string pubKey = "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12";

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeAccountInfo(pubKey, action);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -143,7 +143,7 @@ public void SubscribeAccountInfoTestProcessed()

const string pubKey = "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12";

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeAccountInfo(pubKey, action, Types.Commitment.Processed);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -180,7 +180,7 @@ public void UnsubscribeTest()

const string pubKey = "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12";

sut.Init().Wait();
sut.ConnectAsync().Wait();
var sub = sut.SubscribeAccountInfo(pubKey, action);
sub.SubscriptionChanged += (_, e) =>
{
Expand Down Expand Up @@ -217,7 +217,7 @@ public void SubscribeLogsMentionTest()

const string pubKey = "11111111111111111111111111111111";

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeLogInfo(pubKey, action);

_socketMock.Verify(s => s.SendAsync(It.IsAny<ReadOnlyMemory<byte>>(),
Expand Down Expand Up @@ -247,7 +247,7 @@ public void SubscribeLogsMentionConfirmed()

const string pubKey = "11111111111111111111111111111111";

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeLogInfo(pubKey, action, Types.Commitment.Confirmed);

_socketMock.Verify(s => s.SendAsync(It.IsAny<ReadOnlyMemory<byte>>(),
Expand Down Expand Up @@ -276,7 +276,7 @@ public void SubscribeLogsAllTest()

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeLogInfo(Types.LogsSubscriptionType.All, action);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -311,7 +311,7 @@ public void SubscribeLogsAllProcessed()

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeLogInfo(Types.LogsSubscriptionType.All, action, Types.Commitment.Processed);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -346,7 +346,7 @@ public void SubscribeLogsWithErrors()

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeLogInfo(Types.LogsSubscriptionType.All, action, Types.Commitment.Processed);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -382,7 +382,7 @@ public void SubscribeProgramTest()

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeProgram("11111111111111111111111111111111", action);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -419,7 +419,7 @@ public void SubscribeProgramConfirmed()

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeProgram("11111111111111111111111111111111", action, Types.Commitment.Confirmed);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -456,7 +456,7 @@ public void SubscribeSlotTest()

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.Init().Wait();
sut.ConnectAsync().Wait();
_ = sut.SubscribeSlotInfo(action);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -491,7 +491,7 @@ public void SubscribeRootTest()

var sut = new SolanaStreamingRpcClient("wss://api.mainnet-beta.solana.com/", null, _socketMock.Object);

sut.Init().Wait();
sut.ConnectAsync().Wait();
var sub = sut.SubscribeRoot(action);
_subConfirmEvent.Set();

Expand Down Expand Up @@ -527,7 +527,7 @@ public void SubscribeSignatureTest()
SubscriptionEvent evt = null;


sut.Init().Wait();
sut.ConnectAsync().Wait();
var sub = sut.SubscribeSignature("4orRpuqStpJDvcpBy3vDSV4TDTGNbefmqYUnG2yVnKwjnLFqCwY4h5cBTAKakKek4inuxHF71LuscBS1vwSLtWcx", action);
sub.SubscriptionChanged += (s, e) =>
{
Expand Down Expand Up @@ -573,7 +573,7 @@ public void SubscribeSignature_ErrorNotificationTest()
SubscriptionEvent evt = null;


sut.Init().Wait();
sut.ConnectAsync().Wait();
var sub = sut.SubscribeSignature("4orRpuqStpJDvcpBy3vDSV4TDTGNbefmqYUnG2yVnKwjnLFqCwY4h5cBTAKakKek4inuxHF71LuscBS1vwSLtWcx", action);
sub.SubscriptionChanged += (s, e) =>
{
Expand Down Expand Up @@ -623,7 +623,7 @@ public void SubscribeSignatureProcessed()
SubscriptionEvent evt = null;


sut.Init().Wait();
sut.ConnectAsync().Wait();
var sub = sut.SubscribeSignature("4orRpuqStpJDvcpBy3vDSV4TDTGNbefmqYUnG2yVnKwjnLFqCwY4h5cBTAKakKek4inuxHF71LuscBS1vwSLtWcx", action, Types.Commitment.Processed);
sub.SubscriptionChanged += (s, e) =>
{
Expand Down Expand Up @@ -666,7 +666,7 @@ public void SubscribeBadAccountTest()

const string pubKey = "invalidkey1";

sut.Init().Wait();
sut.ConnectAsync().Wait();
var sub = sut.SubscribeAccountInfo(pubKey, action);
SubscriptionEvent subEvent = null;
sub.SubscriptionChanged += (sub, evt) =>
Expand Down Expand Up @@ -747,7 +747,7 @@ public void SubscribeAccountBigPayloadTest()

const string pubKey = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";

sut.Init().Wait();
sut.ConnectAsync().Wait();
var sub = sut.SubscribeAccountInfo(pubKey, actionMock.Object);
SubscriptionEvent subEvent = null;
sub.SubscriptionChanged += (sub, evt) =>
Expand Down

0 comments on commit 46e2f4f

Please sign in to comment.