Skip to content

Commit

Permalink
Improvements in engine process exit and termination (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
josesimoes committed Jun 8, 2018
1 parent fd30889 commit 0743675
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public partial class SerialPort : PortBase, IPort
/// </summary>
private List<NanoDeviceBase> _tentativeNanoFrameworkDevices = new List<NanoDeviceBase>();

DataWriter outputStreamWriter = null;
DataReader inputStreamReader = null;


/// <summary>
/// Creates an Serial debug client
/// </summary>
Expand Down Expand Up @@ -661,16 +665,17 @@ public async Task<uint> SendBufferAsync(byte[] buffer, TimeSpan waiTimeout, Canc
// device must be connected
if (EventHandlerForSerialDevice.Current.IsDeviceConnected && !cancellationToken.IsCancellationRequested)
{
DataWriter outputStreamWriter = new DataWriter(EventHandlerForSerialDevice.Current.Device.OutputStream);

try
{
outputStreamWriter = new DataWriter(EventHandlerForSerialDevice.Current.Device.OutputStream);

// write buffer to device
outputStreamWriter.WriteBytes(buffer);

Task<UInt32> storeAsyncTask = outputStreamWriter.StoreAsync().AsTask(cancellationToken.AddTimeout(waiTimeout));
cancellationToken.ThrowIfCancellationRequested();

return await storeAsyncTask;
return await outputStreamWriter.StoreAsync().AsTask(cancellationToken.AddTimeout(waiTimeout));
}
catch (TaskCanceledException)
{
Expand Down Expand Up @@ -701,10 +706,12 @@ public async Task<byte[]> ReadBufferAsync(uint bytesToRead, TimeSpan waiTimeout,
// device must be connected
if (EventHandlerForSerialDevice.Current.IsDeviceConnected && !cancellationToken.IsCancellationRequested)
{
DataReader inputStreamReader = new DataReader(EventHandlerForSerialDevice.Current.Device.InputStream);

try
{
inputStreamReader = new DataReader(EventHandlerForSerialDevice.Current.Device.InputStream);

cancellationToken.ThrowIfCancellationRequested();

Task<UInt32> loadAsyncTask = inputStreamReader.LoadAsync(bytesToRead).AsTask(cancellationToken.AddTimeout(waiTimeout));

UInt32 bytesRead = await loadAsyncTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ public async Task<uint> SendBufferAsync(byte[] buffer, TimeSpan waiTimeout, Canc
}
else
{
// FIXME
// NotifyDeviceNotConnected
throw new DeviceNotConnectedException();
}

return bytesWritten;
Expand Down Expand Up @@ -625,8 +624,7 @@ public async Task<byte[]> ReadBufferAsync(uint bytesToRead, TimeSpan waiTimeout,
}
else
{
// FIXME
// NotifyDeviceNotConnected
throw new DeviceNotConnectedException();
}

// return empty byte array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public Converter CreateConverter()
return new Converter(Capabilities);
}

private void ProcessExit()
{
App.ProcessExited();
}

public async Task<bool> SendAsync(MessageRaw raw, CancellationToken cancellationToken)
{
_sendSemaphore.WaitOne();
Expand Down Expand Up @@ -95,6 +100,10 @@ public async Task<bool> SendAsync(MessageRaw raw, CancellationToken cancellation
{
// don't do anything here, as this is expected
}
catch(DeviceNotConnectedException)
{
App.ProcessExited();
}
finally
{
_sendSemaphore.Release();
Expand Down Expand Up @@ -166,8 +175,13 @@ internal async Task<int> ReadBufferAsync(byte[] buffer, int offset, int bytesToR
{
// don't do anything here, as this is expected
}
catch (DeviceNotConnectedException)
{
App.ProcessExited();
}

return bytesToReadRequested - bytesToRead;
}

}
}
104 changes: 30 additions & 74 deletions source/nanoFramework.Tools.DebugLibrary.Shared/WireProtocol/Engine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public partial class Engine : IDisposable, IControllerHostLocal
ManualResetEvent m_evtShutdown;
ManualResetEvent m_evtPing;
TypeSysLookup m_typeSysLookup;
EngineState _state;
//bool m_fProcessExited;

private Task _backgroundProcessor;

Expand All @@ -81,13 +79,9 @@ internal Engine(NanoDeviceBase device)
// default to false
IsCRC32EnabledForWireProtocol = false;

_state.SetValue(EngineState.Value.Starting, true);

// start task to process background messages
_backgroundProcessor = Task.Factory.StartNew(() => IncomingMessagesListenerAsync(), _backgroundProcessorCancellation.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current);

_state.SetValue(EngineState.Value.Started, false);

_pendingRequestsTimer = new Timer(ClearPendingRequests, null, 1000, 1000);
}

Expand All @@ -104,8 +98,6 @@ private void Initialize()

m_notifyNoise = new FifoBuffer();
m_typeSysLookup = new TypeSysLookup();
_state = new EngineState(this);
//m_fProcessExited = false;

//default capabilities, used until clr can be queried.
Capabilities = new CLRCapabilities();
Expand All @@ -120,7 +112,7 @@ private void InitializeLocal(NanoDeviceBase device)
_controlller = new Controller(this);

Device = (INanoDevice)device;

Initialize();
}

Expand Down Expand Up @@ -155,6 +147,11 @@ public async Task<bool> ConnectAsync(int timeout, bool force = false, Connection
// connect to device
if (await Device.ConnectAsync())
{
if (_backgroundProcessor.Status != TaskStatus.Running)
{
// background processor is not running, start it
ResumeProcessing();
}

Commands.Monitor_Ping cmd = new Commands.Monitor_Ping();

Expand Down Expand Up @@ -262,48 +259,28 @@ public async Task IncomingMessagesListenerAsync()
{
var reassembler = new MessageReassembler(_controlller);

while (!_backgroundProcessorCancellation.IsCancellationRequested && _state.IsRunning)
while (!_backgroundProcessorCancellation.IsCancellationRequested)
{
if (await Device.ConnectAsync())
try
{
try
{
await reassembler.ProcessAsync(_backgroundProcessorCancellation.Token);
}
catch (DeviceNotConnectedException)
await reassembler.ProcessAsync(_backgroundProcessorCancellation.Token);
}
catch (DeviceNotConnectedException)
{
ProcessExited();
}
catch (Exception ex)
{
// look for I/O exception
// 0x800703E3
if (ex.HResult == -2147023901)
{
if (_backgroundProcessorCancellation.IsCancellationRequested || !_state.IsRunning)
{
return;
}
else
{
await Task.Delay(1000);
await Device.ConnectAsync();
}
ProcessExited();
}
catch (Exception ex)
{
if (_backgroundProcessorCancellation.IsCancellationRequested || !_state.IsRunning)
{
return;
}
else
{

if (ex.GetType().Equals(typeof(AggregateException)))
{
if (ex.GetBaseException().GetType().Name == typeof(DeviceNotConnectedException).Name)
{
await Task.Delay(1000);
await Device.ConnectAsync();
}
}
else if (ex.HResult == 0x80070006)
{
return;
}
}
if (_backgroundProcessorCancellation.IsCancellationRequested)
{
break;
}
}
}
Expand Down Expand Up @@ -644,7 +621,9 @@ public void SpuriousCharacters(byte[] buf, int offset, int count)

public void ProcessExited()
{
throw new NotImplementedException();
Stop();

_eventProcessExit?.Invoke(this, null);
}

public async Task<byte[]> ReadBufferAsync(uint bytesToRead, TimeSpan waitTimeout, CancellationToken cancellationToken)
Expand All @@ -659,8 +638,6 @@ private OutgoingMessage CreateMessage(uint cmd, uint flags, object payload)

public void StopProcessing()
{
_state.SetValue(EngineState.Value.Stopping, false);

m_evtShutdown.Set();

if (_backgroundProcessor != null)
Expand All @@ -673,18 +650,14 @@ public void StopProcessing()
Task.WaitAll(_backgroundProcessor);
}
catch { }

_backgroundProcessor = null;
}
}

public void ResumeProcessing()
{
m_evtShutdown.Reset();

_state.SetValue(EngineState.Value.Resume, false);

if (_backgroundProcessor == null)
if (_backgroundProcessor == null || _backgroundProcessor.IsCompleted)
{
_backgroundProcessor = Task.Factory.StartNew(() => IncomingMessagesListenerAsync(), _backgroundProcessorCancellation.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
Expand All @@ -697,19 +670,9 @@ public void Stop()
m_evtShutdown.Set();
}

if (_state.SetValue(EngineState.Value.Stopping, false))
{
StopProcessing();

//((IController)this).ClosePort();

_state.SetValue(EngineState.Value.Stopped, false);
}
StopProcessing();
}

public bool IsRunning => _state.IsRunning;


#region RPC Support

// comment from original code REVIEW: Can this be refactored out of here to a separate class dedicated to RPC?
Expand Down Expand Up @@ -1011,10 +974,7 @@ private void RpcReceiveSendDispatch(object obj)
{
EndPointRegistration.InboundRequest ir = (EndPointRegistration.InboundRequest)obj;

if (IsRunning)
{
ir.Owner.m_ep.DispatchMessage(ir.m_msg);
}
ir.Owner.m_ep.DispatchMessage(ir.m_msg);
}

internal bool RpcReply(Commands.Debugging_Messaging_Address addr, byte[] data)
Expand Down Expand Up @@ -1081,11 +1041,7 @@ internal async Task<WireProtocolRequest> RequestAsync(OutgoingMessage message, i
{
using (CancellationTokenSource cts = new CancellationTokenSource())
{
//Checking whether IsRunning and adding the request to m_requests
//needs to be atomic to avoid adding a request after the Engine
//has been stopped.

if (!IsRunning)
if(_backgroundProcessor.Status != TaskStatus.Running)
{
throw new ApplicationException("Engine is not running or process has exited.");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
<Compile Include="$(MSBuildThisFileDirectory)WireProtocol\Converter.cs" />
<Compile Include="$(MSBuildThisFileDirectory)WireProtocol\DeploymentBlock.cs" />
<Compile Include="$(MSBuildThisFileDirectory)WireProtocol\Engine.cs" />
<Compile Include="$(MSBuildThisFileDirectory)WireProtocol\EngineState.cs" />
<Compile Include="$(MSBuildThisFileDirectory)WireProtocol\FifoBuffer.cs" />
<Compile Include="$(MSBuildThisFileDirectory)WireProtocol\DeploymentSector.cs" />
<Compile Include="$(MSBuildThisFileDirectory)WireProtocol\IControllerRemote.cs" />
Expand Down

0 comments on commit 0743675

Please sign in to comment.