Skip to content

Commit

Permalink
Add reactive support.
Browse files Browse the repository at this point in the history
  • Loading branch information
twitchax committed Mar 14, 2019
1 parent 9ff6c14 commit b9d2918
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Latest .NET Standard 2.0.

### Examples

This library is extendable, but you can run it a few ways depending on how you have extended it.
This library is extendable, but you can run it a few ways depending on how you have extended it. For more examples, check out the [tests](src/Test/BasicTests.cs).

With no extensions, you would run a command like this.

Expand Down
28 changes: 21 additions & 7 deletions src/Core/Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Tasks;
using Sheller.Implementations;
using Sheller.Implementations.Shells;
using Sheller.Models;

namespace Sheller
Expand Down Expand Up @@ -78,6 +79,12 @@ internal static void CopyToStringDictionary(this IEnumerable<KeyValuePair<string
}
}

internal static void ForEach<T>(this IEnumerable<T> list, Action<T> functor)
{
foreach(var item in list)
functor(item);
}

internal static string EscapeQuotes(this string s) => s.Replace("\"", "\\\"");

internal static Task<ICommandResult> RunCommand(
Expand All @@ -86,7 +93,8 @@ internal static Task<ICommandResult> RunCommand(
IEnumerable<string> standardInputs = null,
IEnumerable<Action<string>> standardOutputHandlers = null,
IEnumerable<Action<string>> standardErrorHandlers = null,
Func<string, string, Task<string>> inputRequestHandler = null)
Func<string, string, Task<string>> inputRequestHandler = null,
ObservableCommandEvent observableCommandEvent = null)
{
var t = new Task<ICommandResult>(() =>
{
Expand All @@ -104,22 +112,28 @@ internal static Task<ICommandResult> RunCommand(
process.OutputDataReceived += (s, e) =>
{
if(e.Data == null) return;
var data = e.Data;
if(data == null) return;
standardOutput.AppendLine(e.Data);
standardOutput.AppendLine(data);
if(standardOutputHandlers != null)
foreach(var handler in standardOutputHandlers)
handler(e.Data);
handler(data);
observableCommandEvent.FireEvent(new CommandEvent(CommandEventType.StandardOutput, data));
};
process.ErrorDataReceived += (s, e) =>
{
if(e.Data == null) return;
var data = e.Data;
if(data == null) return;
standardError.AppendLine(e.Data);
standardError.AppendLine(data);
if(standardErrorHandlers != null)
foreach(var handler in standardErrorHandlers)
handler(e.Data);
handler(data);
observableCommandEvent.FireEvent(new CommandEvent(CommandEventType.StandardError, data));
};
if(inputRequestHandler != null)
Expand Down
35 changes: 35 additions & 0 deletions src/Core/Implementations/CommandEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

using System;
using Sheller.Models;

namespace Sheller.Implementations
{
/// <summary>
/// The default result type for executables.
/// </summary>
public class CommandEvent : ICommandEvent
{
/// <summary>
/// StreamType property.
/// </summary>
/// <value>The stream type of the event.</value>
public CommandEventType Type { get; private set; }

/// <summary>
/// Data property.
/// </summary>
/// <value>The string data of the event.</value>
public string Data { get; private set; }

/// <summary>
/// The CommandEvent constructor.
/// </summary>
/// <param name="type"></param>
/// <param name="data"></param>
public CommandEvent(CommandEventType type, string data)
{
Type = type;
Data = data;
}
}
}
2 changes: 1 addition & 1 deletion src/Core/Implementations/CommandResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class CommandResult : ICommandResult
/// Succeeded property.
/// </summary>
/// <value>The succeeded status of an executable.</value>
public bool Succeeded { get; }
public bool Succeeded { get; private set; }

/// <summary>
/// ExitCode property.
Expand Down
8 changes: 8 additions & 0 deletions src/Core/Implementations/Executables/Executable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ async Task<TResult> executionTask()
public TExecutable UseInputRequestHandler(Func<string, string, Task<string>> inputRequestHandler) => CreateFrom(this, shell: _shell.UseInputRequestHandler(inputRequestHandler));
IExecutable IExecutable.UseInputRequestHandler(Func<string, string, Task<string>> inputRequestHandler) => UseInputRequestHandler(inputRequestHandler);

/// <summary>
/// Provides an <see cref="IObservable{T}"/> to which a subscription can be placed.
/// The observable never completes, since executions can be run many times.
/// </summary>
/// <returns>A `new` instance of type <typeparamref name="TExecutable"/> with the subscribers attached to the observable.</returns>
public TExecutable WithSubscribe(Action<IObservable<ICommandEvent>> subscriber) => CreateFrom(this, shell: _shell.WithSubscribe(subscriber));
IExecutable IExecutable.WithSubscribe(Action<IObservable<ICommandEvent>> subscriber) => WithSubscribe(subscriber);

/// <summary>
/// Adds a wait <see cref="Func{T}"/> (of which there may be many) to the execution context and returns a `new` context instance.
/// </summary>
Expand Down
75 changes: 75 additions & 0 deletions src/Core/Implementations/ObservableCommandEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System;
using System.Collections.Generic;
using Sheller.Models;

namespace Sheller.Implementations
{
/// <summary>
/// Observable for command events.
/// </summary>
public class ObservableCommandEvent : IObservable<ICommandEvent>
{
internal List<IObserver<ICommandEvent>> Observers { get; private set; } = new List<IObserver<ICommandEvent>>();

/// <summary>
/// The Subscribe method.
/// </summary>
/// <param name="observer">The observer.</param>
/// <returns>An <see cref="IDisposable"/>.</returns>
public IDisposable Subscribe(IObserver<ICommandEvent> observer)
{
if(!Observers.Contains(observer))
Observers.Add(observer);
return new Unsubscriber(Observers, observer);
}

internal void FireEvent(ICommandEvent commandEvent)
{
foreach(var observer in Observers)
observer.OnNext(commandEvent);
}

internal void FireError(Exception e)
{
foreach(var observer in Observers)
observer.OnError(e);
}

internal void FireCompleted()
{
foreach (var observer in Observers.ToArray())
if (Observers.Contains(observer))
observer.OnCompleted();

Observers.Clear();
}

internal static ObservableCommandEvent Merge(ObservableCommandEvent ob1, ObservableCommandEvent ob2)
{
var newObservable = new ObservableCommandEvent();

ob1.Observers.ForEach(o => newObservable.Subscribe(o));
ob2.Observers.ForEach(o => newObservable.Subscribe(o));

return newObservable;
}

private class Unsubscriber : IDisposable
{
private List<IObserver<ICommandEvent>> _observers;
private IObserver<ICommandEvent> _observer;

public Unsubscriber(List<IObserver<ICommandEvent>> observers, IObserver<ICommandEvent> observer)
{
this._observers = observers;
this._observer = observer;
}

public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
_observers.Remove(_observer);
}
}
}
}
35 changes: 32 additions & 3 deletions src/Core/Implementations/Shells/Shell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ namespace Sheller.Implementations.Shells
private IEnumerable<Action<string>> _standardErrorHandlers;
private Func<string, string, Task<String>> _inputRequestHandler;

private ObservableCommandEvent _observableCommandEvent;

private bool _throws;

/// <summary>
Expand All @@ -51,7 +53,7 @@ namespace Sheller.Implementations.Shells
/// Initializes the shell.
/// </summary>
/// <param name="shell">The name or path of the shell.</param>
public virtual TShell Initialize(string shell) => Initialize(shell, null, null, null, null, null, null);
public virtual TShell Initialize(string shell) => Initialize(shell, null, null, null, null, null, null, null);

/// <summary>
/// Initializes the shell.
Expand All @@ -62,6 +64,7 @@ namespace Sheller.Implementations.Shells
/// <param name="standardOutputHandlers">The standard output handlers for capture from the execution.</param>
/// <param name="standardErrorHandlers">The standard error handlers for capture from the execution.</param>
/// <param name="inputRequestHandler">The request handler from the execution.</param>
/// <param name="observableCommandEvent">The observable that fires on stdout/stderr.</param>
/// <param name="throws">Indicates that a non-zero exit code throws.</param>
protected virtual TShell Initialize(
string shell,
Expand All @@ -70,6 +73,7 @@ protected virtual TShell Initialize(
IEnumerable<Action<string>> standardOutputHandlers,
IEnumerable<Action<string>> standardErrorHandlers,
Func<string, string, Task<string>> inputRequestHandler,
ObservableCommandEvent observableCommandEvent,
bool? throws)
{
_shell = shell;
Expand All @@ -81,6 +85,8 @@ protected virtual TShell Initialize(
_standardErrorHandlers = standardErrorHandlers ?? new List<Action<string>>();
_inputRequestHandler = inputRequestHandler;

_observableCommandEvent = observableCommandEvent ?? new ObservableCommandEvent();

_throws = throws ?? true;

return this as TShell;
Expand All @@ -94,6 +100,7 @@ private static TShell CreateFrom(
IEnumerable<Action<string>> standardOutputHandlers = null,
IEnumerable<Action<string>> standardErrorHandlers = null,
Func<string, string, Task<string>> inputRequestHandler = null,
ObservableCommandEvent observableCommandEvent = null,
bool? throws = null) =>
new TShell().Initialize(
shell ?? old._shell,
Expand All @@ -102,6 +109,7 @@ private static TShell CreateFrom(
standardOutputHandlers ?? old._standardOutputHandlers,
standardErrorHandlers ?? old._standardErrorHandlers,
inputRequestHandler ?? old._inputRequestHandler,
observableCommandEvent ?? old._observableCommandEvent,
throws ?? old._throws
);

Expand All @@ -123,10 +131,18 @@ public virtual async Task<ICommandResult> ExecuteCommandAsync(string executable,
_standardInputs,
_standardOutputHandlers,
_standardErrorHandlers,
_inputRequestHandler);
_inputRequestHandler,
_observableCommandEvent);

if(_throws && result.ExitCode != 0)
throw new ExecutionFailedException($"The execution resulted in a non-zero exit code ({result.ExitCode}).", result);
{
var error = new ExecutionFailedException($"The execution resulted in a non-zero exit code ({result.ExitCode}).", result);
_observableCommandEvent.FireError(error);
throw error;
}

// TODO: Add a `UseSubscribeComplete` (or something like it) that instructs the observable to complete here.
//_observableCommandEvent.FireCompleted();

return result;
}
Expand Down Expand Up @@ -199,6 +215,19 @@ public virtual async Task<ICommandResult> ExecuteCommandAsync(string executable,
public TShell UseInputRequestHandler(Func<string, string, Task<string>> inputRequestHandler) => CreateFrom(this, inputRequestHandler: inputRequestHandler);
IShell IShell.UseInputRequestHandler(Func<string, string, Task<string>> inputRequestHandler) => UseInputRequestHandler(inputRequestHandler);

/// <summary>
/// Provides an <see cref="IObservable{T}"/> to which a subscription can be placed.
/// The observable never completes, since executions can be run many times.
/// </summary>
/// <returns>A `new` instance of type <typeparamref name="TShell"/> with the subscribers attached to the observable.</returns>
public TShell WithSubscribe(Action<IObservable<ICommandEvent>> subscriber)
{
var newObservable = new ObservableCommandEvent();
subscriber(newObservable);
return CreateFrom(this, observableCommandEvent: ObservableCommandEvent.Merge(_observableCommandEvent, newObservable));
}
IShell IShell.WithSubscribe(Action<IObservable<ICommandEvent>> subscriber) => WithSubscribe(subscriber);

/// <summary>
/// Ensures the shell context will not throw on a non-zero exit code and returns a `new` context instance.
/// </summary>
Expand Down
37 changes: 37 additions & 0 deletions src/Core/Models/ICommandEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;

namespace Sheller.Models
{
/// <summary>
/// The default result interface for executables.
/// </summary>
public interface ICommandEvent
{
/// <summary>
/// StreamType property.
/// </summary>
/// <value>The stream type of the event.</value>
CommandEventType Type { get; }

/// <summary>
/// Data property.
/// </summary>
/// <value>The string data of the event.</value>
string Data { get; }
}

/// <summary>
/// The type (stdout, stderr) of the event data.
/// </summary>
public enum CommandEventType
{
/// <summary>
/// The Standard Output type.
/// </summary>
StandardOutput,
/// <summary>
/// The Standard Error type.
/// </summary>
StandardError
}
}
14 changes: 14 additions & 0 deletions src/Core/Models/IExecutable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ public interface IExecutable
/// <returns>A `new` instance of <see cref="IShell"/> with the request handler passed to this call.</returns>
IExecutable UseInputRequestHandler(Func<string, string, Task<string>> inputRequestHandler);

/// <summary>
/// Provides an <see cref="IObservable{T}"/> to which a subscription can be placed.
/// The observable never completes, since executions can be run many times.
/// </summary>
/// <returns>A `new` instance of type <see cref="IExecutable"/> with the subscribers attached to the observable.</returns>
IExecutable WithSubscribe(Action<IObservable<ICommandEvent>> subscriber);

/// <summary>
/// Adds a wait <see cref="Func{T}"/> (of which there may be many) to the execution context and returns a `new` context instance.
/// </summary>
Expand Down Expand Up @@ -166,6 +173,13 @@ public interface IExecutable<out TExecutable> : IExecutable where TExecutable :
/// <returns>A `new` instance of <typeparamref name="TExecutable"/> with the request handler passed to this call.</returns>
new TExecutable UseInputRequestHandler(Func<string, string, Task<string>> inputRequestHandler);

/// <summary>
/// Provides an <see cref="IObservable{T}"/> to which a subscription can be placed.
/// The observable never completes, since executions can be run many times.
/// </summary>
/// <returns>A `new` instance of type <typeparamref name="TExecutable"/> with the subscribers attached to the observable.</returns>
new TExecutable WithSubscribe(Action<IObservable<ICommandEvent>> subscriber);

/// <summary>
/// Adds a wait <see cref="Func{T}"/> (of which there may be many) to the execution context and returns a `new` context instance.
/// </summary>
Expand Down
Loading

0 comments on commit b9d2918

Please sign in to comment.