Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework async and sync requests #112

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ private async Task<bool> CheckValidNanoFrameworkSerialDeviceAsync(SerialDeviceIn
}

}

// default to false
return false;
}
Expand Down Expand Up @@ -570,7 +570,7 @@ protected virtual void OnDeviceEnumerationCompleted()

public async Task<bool> ConnectDeviceAsync(NanoDeviceBase device)
{
if(await ConnectSerialDeviceAsync((device as NanoDevice<NanoSerialDevice>).Device.DeviceInformation as SerialDeviceInformation, device.DeviceBase as SerialDevice))
if (await ConnectSerialDeviceAsync((device as NanoDevice<NanoSerialDevice>).Device.DeviceInformation as SerialDeviceInformation, device.DeviceBase as SerialDevice))
{
if (device.DeviceBase == null)
{
Expand Down Expand Up @@ -609,7 +609,7 @@ public void DisconnectDevice(NanoDeviceBase device)

// kill debug engine
device.DebugEngine.Stop();
device.DebugEngine = null;
device.DebugEngine = null;

EventHandlerForSerialDevice.Current.CloseDevice();
}
Expand Down Expand Up @@ -647,7 +647,7 @@ public void DisconnectDevice(SerialDevice device)
public async Task<uint> SendBufferAsync(byte[] buffer, TimeSpan waiTimeout, CancellationToken cancellationToken)
{
// device must be connected
if (EventHandlerForSerialDevice.Current.IsDeviceConnected)
if (EventHandlerForSerialDevice.Current.IsDeviceConnected && !cancellationToken.IsCancellationRequested)
{
DataWriter outputStreamWriter = new DataWriter(EventHandlerForSerialDevice.Current.Device.OutputStream);

Expand Down Expand Up @@ -687,7 +687,7 @@ public async Task<uint> SendBufferAsync(byte[] buffer, TimeSpan waiTimeout, Canc
public async Task<byte[]> ReadBufferAsync(uint bytesToRead, TimeSpan waiTimeout, CancellationToken cancellationToken)
{
// device must be connected
if (EventHandlerForSerialDevice.Current.IsDeviceConnected)
if (EventHandlerForSerialDevice.Current.IsDeviceConnected && !cancellationToken.IsCancellationRequested)
{
DataReader inputStreamReader = new DataReader(EventHandlerForSerialDevice.Current.Device.InputStream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@ public Converter CreateConverter()

public async Task<bool> SendAsync(MessageRaw raw, CancellationToken cancellationToken)
{
_sendSemaphore.WaitOne();
_sendSemaphore.WaitOne();

// check for cancellation request
if (cancellationToken.IsCancellationRequested)
{
// cancellation requested
return false;
}

try
{
Expand Down Expand Up @@ -116,9 +123,9 @@ public uint GetUniqueEndpointId()
throw new NotImplementedException();
}

private Task<uint> SendRawBufferAsync(byte[] buffer, TimeSpan waiTimeout, CancellationToken cancellationToken)
private async Task<uint> SendRawBufferAsync(byte[] buffer, TimeSpan waiTimeout, CancellationToken cancellationToken)
{
return App.SendBufferAsync(buffer, waiTimeout, cancellationToken);
return await App.SendBufferAsync(buffer, waiTimeout, cancellationToken);
}

internal async Task<int> ReadBufferAsync(byte[] buffer, int offset, int bytesToRead, TimeSpan waitTimeout, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,42 +414,40 @@ public void Dispose()

private IncomingMessage PerformSyncRequest(uint command, uint flags, object payload, int millisecondsTimeout = 5000)
{
var t = Task.Run(() => {
OutgoingMessage message = new OutgoingMessage(_controlller.GetNextSequenceId(), CreateConverter(), command, flags, payload);
OutgoingMessage message = new OutgoingMessage(_controlller.GetNextSequenceId(), CreateConverter(), command, flags, payload);

return PerformRequestAsync(message, _cancellationTokenSource.Token, millisecondsTimeout);
}, _cancellationTokenSource.Token.AddTimeout(new TimeSpan(0, 0, 0, 0, millisecondsTimeout)));

return t.Result;
return PerformRequestAsync(message, _cancellationTokenSource.Token, millisecondsTimeout).Result;
}

private IncomingMessage PerformSyncRequest(OutgoingMessage message, int millisecondsTimeout = 5000)
{
var t = Task.Run(() => {
return PerformRequestAsync(message, _cancellationTokenSource.Token, millisecondsTimeout);
}, _cancellationTokenSource.Token.AddTimeout(new TimeSpan(0, 0, 0, 0, millisecondsTimeout)));

return t.Result;
return PerformRequestAsync(message, _cancellationTokenSource.Token, millisecondsTimeout).Result;
}

public async Task<IncomingMessage> PerformRequestAsync(OutgoingMessage message, CancellationToken cancellationToken, int millisecondsTimeout = 5000)
public Task<IncomingMessage> PerformRequestAsync(OutgoingMessage message, CancellationToken cancellationToken, int millisecondsTimeout = 5000)
{
WireProtocolRequest request = new WireProtocolRequest(message, cancellationToken, millisecondsTimeout);
_requestsStore.Add(request);

try
{

await request.PerformRequestAsync(_controlller).ConfigureAwait(true);
// Start a background task that will complete tcs1.Task
Task.Factory.StartNew(() =>
{
var dummy = request.PerformRequestAsync(_controlller);
});
}
catch(Exception)
catch (Exception ex)
{
// perform request failed, remove it from store
_requestsStore.Remove(request.OutgoingMessage.Header);

request.TaskCompletionSource.SetException(ex);

return null;
}

var task = Convert<IncomingMessage>(request.TaskCompletionSource.Task);
return await task;
return request.TaskCompletionSource.Task;
}

private List<IncomingMessage> PerformRequestBatch(List<OutgoingMessage> messages, int timeout = 1000)
Expand All @@ -465,19 +463,6 @@ private List<IncomingMessage> PerformRequestBatch(List<OutgoingMessage> messages
return replies;
}

private static async Task<T> Convert<T>(Task<object> task)
{
try
{
var result = await task;
return (T)result;
}
catch
{
return default(T);
}
}

public Commands.Monitor_Ping.Reply GetConnectionSource()
{
IncomingMessage reply = PerformSyncRequest(Commands.c_Monitor_Ping, 0, null);
Expand Down Expand Up @@ -522,6 +507,10 @@ public bool ProcessMessage(IncomingMessage message, bool isReply)

return true;
}
else
{
reply.TaskCompletionSource.TrySetResult(null);
}
}
else
{
Expand Down Expand Up @@ -895,7 +884,7 @@ private EndPointRegistration.OutboundRequest RpcSend_Setup(Commands.Debugging_Me
cmd.m_addr = addr;
cmd.m_data = data;

IncomingMessage reply = PerformSyncRequest(Commands.c_Debugging_Messaging_Send, 0, cmd);
IncomingMessage reply = PerformSyncRequest(Commands.c_Debugging_Messaging_Send, 0, cmd);
if (reply != null)
{
Commands.Debugging_Messaging_Send.Reply res = reply.Payload as Commands.Debugging_Messaging_Send.Reply;
Expand Down Expand Up @@ -1936,7 +1925,7 @@ public bool SuspendThread(uint pid)
{
return reply.IsPositiveAcknowledge();
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class WireProtocolRequest
private CommandEventHandler _callback;

public CancellationToken CancellationToken { get; }
public TaskCompletionSource<object> TaskCompletionSource { get; }
public TaskCompletionSource<IncomingMessage> TaskCompletionSource { get; }

public DateTimeOffset Expires { get; }
public OutgoingMessage OutgoingMessage { get; }
Expand All @@ -30,10 +30,10 @@ public WireProtocolRequest(OutgoingMessage outgoingMessage, CancellationToken ca
Expires = DateTime.Now.AddMilliseconds(millisecondsTimeout);

// https://blogs.msdn.microsoft.com/pfxteam/2009/06/02/the-nature-of-taskcompletionsourcetresult/
TaskCompletionSource = new TaskCompletionSource<object>();
TaskCompletionSource = new TaskCompletionSource<IncomingMessage>();
CancellationToken = cancellationToken;
}

internal async Task<bool> PerformRequestAsync(IController controller)
{
Debug.WriteLine($"Performing request");
Expand Down