Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add timeout handling to QuicStream #56060

Merged
merged 7 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ internal QuicStream() { }
public override bool CanRead { get { throw null; } }
public override bool CanSeek { get { throw null; } }
public override bool CanWrite { get { throw null; } }
public override bool CanTimeout { get { throw null; } }
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
public long StreamId { get { throw null; } }
Expand All @@ -101,6 +102,7 @@ public override void Flush() { }
public override int Read(System.Span<byte> buffer) { throw null; }
public override System.Threading.Tasks.Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override int ReadTimeout { get { throw null; } set { } }
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
public override void SetLength(long value) { }
public void Shutdown() { }
Expand All @@ -114,6 +116,7 @@ public override void Write(System.ReadOnlySpan<byte> buffer) { }
public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override int WriteTimeout { get { throw null; } set { } }
}
public partial class QuicStreamAbortedException : System.Net.Quic.QuicException
{
Expand Down
6 changes: 6 additions & 0 deletions src/libraries/System.Net.Quic/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@
<data name="net_quic_writing_notallowed" xml:space="preserve">
<value>Writing is not allowed on stream.</value>
</data>
<data name="net_quic_timeout_use_gt_zero" xml:space="preserve">
<value>Timeout can only be set to 'System.Threading.Timeout.Infinite' or a value &gt; 0.</value>
</data>
<data name="net_quic_timeout" xml:space="preserve">
<value>Connection timed out.</value>
</data>
<data name="net_quic_ssl_option" xml:space="preserve">
<value>'{0}' is not supported by System.Net.Quic.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ internal override long StreamId

private StreamBuffer? ReadStreamBuffer => _isInitiator ? _streamState._inboundStreamBuffer : _streamState._outboundStreamBuffer;

internal override bool CanTimeout => false;

internal override int ReadTimeout
{
get => throw new InvalidOperationException();
set => throw new InvalidOperationException();
}

internal override int WriteTimeout
{
get => throw new InvalidOperationException();
set => throw new InvalidOperationException();
}

internal override bool CanRead => !_disposed && ReadStreamBuffer is not null;

internal override int Read(Span<byte> buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Quic.Implementations.MsQuic.Internal;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -192,6 +193,47 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

internal override bool CanWrite => _disposed == 0 && _canWrite;

internal override bool CanTimeout => true;

private int _readTimeout = Timeout.Infinite;

internal override int ReadTimeout
{
get
{
ThrowIfDisposed();
return _readTimeout;
}
set
{
ThrowIfDisposed();
if (value <= 0 && value != System.Threading.Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(value), SR.net_quic_timeout_use_gt_zero);
}
_readTimeout = value;
}
}

private int _writeTimeout = Timeout.Infinite;
internal override int WriteTimeout
{
get
{
ThrowIfDisposed();
return _writeTimeout;
}
set
{
ThrowIfDisposed();
if (value <= 0 && value != System.Threading.Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(value), SR.net_quic_timeout_use_gt_zero);
}
_writeTimeout = value;
}
}

internal override long StreamId
{
get
Expand Down Expand Up @@ -404,7 +446,6 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
{
var state = (State)obj!;
bool completePendingRead;

lock (state)
{
completePendingRead = state.ReadState == ReadState.PendingRead;
Expand Down Expand Up @@ -593,24 +634,54 @@ internal override int Read(Span<byte> buffer)
{
ThrowIfDisposed();
byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
CancellationTokenSource? cts = null;
try
{
int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask().GetAwaiter().GetResult();
if (_readTimeout > 0)
{
cts = new CancellationTokenSource(_readTimeout);
}
wfurt marked this conversation as resolved.
Show resolved Hide resolved
int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length), cts != null ? cts.Token : default).AsTask().GetAwaiter().GetResult();
rentedBuffer.AsSpan(0, readLength).CopyTo(buffer);
return readLength;
}
catch (OperationCanceledException) when (cts != null && cts.IsCancellationRequested)
{
// sync operations do not have Cancellation
throw new IOException(SR.net_quic_timeout);
}
finally
{
ArrayPool<byte>.Shared.Return(rentedBuffer);
cts?.Dispose();
}
}

internal override void Write(ReadOnlySpan<byte> buffer)
{
ThrowIfDisposed();
CancellationTokenSource? cts = null;


if (_writeTimeout > 0)
{
cts = new CancellationTokenSource(_writeTimeout);
}

// TODO: optimize this.
WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
try
{
WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
}
catch (OperationCanceledException) when (cts != null && cts.IsCancellationRequested)
{
// sync operations do not have Cancellation
throw new IOException(SR.net_quic_timeout);
}
finally
{
cts?.Dispose();
}
}

// MsQuic doesn't support explicit flushing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ internal abstract class QuicStreamProvider : IDisposable, IAsyncDisposable
{
internal abstract long StreamId { get; }

internal abstract bool CanTimeout { get; }

internal abstract bool CanRead { get; }

internal abstract int ReadTimeout { get; set; }

internal abstract int Read(Span<byte> buffer);

internal abstract ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default);
Expand All @@ -25,6 +29,8 @@ internal abstract class QuicStreamProvider : IDisposable, IAsyncDisposable

internal abstract void Write(ReadOnlySpan<byte> buffer);

internal abstract int WriteTimeout { get; set; }

internal abstract ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);

internal abstract ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool endStream, CancellationToken cancellationToken = default);
Expand Down
14 changes: 14 additions & 0 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati

public override void Write(ReadOnlySpan<byte> buffer) => _provider.Write(buffer);

public override bool CanTimeout => _provider.CanTimeout;

public override int ReadTimeout
{
get => _provider.ReadTimeout;
set => _provider.ReadTimeout = value;
}

public override int WriteTimeout
{
get => _provider.WriteTimeout;
set => _provider.WriteTimeout = value;
}

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => _provider.WriteAsync(buffer, cancellationToken);

public override void Flush() => _provider.Flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public sealed class MsQuicQuicStreamConformanceTests : QuicStreamConformanceTest
protected override QuicImplementationProvider Provider => QuicImplementationProviders.MsQuic;
protected override bool UsableAfterCanceledReads => false;
protected override bool BlocksOnZeroByteReads => true;
protected override bool CanTimeout => true;

public MsQuicQuicStreamConformanceTests(ITestOutputHelper output)
{
Expand Down