// Copyright (c) The Geekeey Authors // SPDX-License-Identifier: EUPL-1.2 using System.Buffers; using System.Text; namespace Geekeey.Extensions.Process; /// /// Represents a pipe for the process's standard output or standard error stream. /// public abstract partial class PipeTarget { /// /// Reads the binary content from the origin stream and pushes it into the pipe. /// Origin stream represents the process's standard output or standard error stream. /// public abstract Task CopyFromAsync(Stream origin, CancellationToken cancellationToken = default); } public partial class PipeTarget { private class AnonymousPipeTarget(Func func) : PipeTarget { public override async Task CopyFromAsync(Stream origin, CancellationToken cancellationToken = default) => await func(origin, cancellationToken); } private class AggregatePipeTarget(IReadOnlyList targets) : PipeTarget { public IReadOnlyList Targets { get; } = targets; public override async Task CopyFromAsync(Stream origin, CancellationToken cancellationToken = default) { using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // create a separate sub-stream for each target var targetSubStreams = new Dictionary(); foreach (var target in Targets) { targetSubStreams[target] = new MemoryBufferStream(); } try { // start piping in the background async Task StartCopyAsync(KeyValuePair targetSubStream) { var (target, subStream) = targetSubStream; try { // ReSharper disable once AccessToDisposedClosure await target.CopyFromAsync(subStream, cts.Token); } catch { // abort the operation if any of the targets fail // ReSharper disable once AccessToDisposedClosure await cts.CancelAsync(); throw; } } var readingTask = Task.WhenAll(targetSubStreams.Select(StartCopyAsync)); try { // read from the main stream and replicate the data to each sub-stream using var buffer = MemoryPool.Shared.Rent(BufferSizes.Stream); while (true) { var bytesRead = await origin.ReadAsync(buffer.Memory, cts.Token); if (bytesRead <= 0) break; foreach (var (_, subStream) in targetSubStreams) { await subStream.WriteAsync(buffer.Memory[..bytesRead], cts.Token); } } // report that transmission is complete foreach (var (_, subStream) in targetSubStreams) { await subStream.ReportCompletionAsync(cts.Token); } } finally { // wait for all targets to finish and maybe propagate exceptions await readingTask; } } finally { foreach (var (_, stream) in targetSubStreams) { await stream.DisposeAsync(); } } } } } public partial class PipeTarget { /// /// Pipe target that discards all data. Functionally equivalent to a null device. /// /// /// Using this target results in the corresponding stream (standard output or standard error) not being opened for /// the underlying process at all. In the vast majority of cases, this behavior should be functionally equivalent to /// piping to a null stream, but without the performance overhead of consuming and discarding unneeded data. This /// may be undesirable in certain situations, in which case it's recommended to pipe to a null stream explicitly /// using with . /// public static PipeTarget Null { get; } = Create((_, cancellationToken) => !cancellationToken.IsCancellationRequested ? Task.CompletedTask : Task.FromCanceled(cancellationToken)); /// /// Creates an anonymous pipe target with the method /// implemented by the specified asynchronous delegate. /// public static PipeTarget Create(Func func) => new AnonymousPipeTarget(func); /// /// Creates an anonymous pipe target with the method /// implemented by the specified synchronous delegate. /// public static PipeTarget Create(Action action) => Create( (origin, _) => { action(origin); return Task.CompletedTask; }); /// /// Creates a pipe target that writes to the specified stream. /// public static PipeTarget ToStream(Stream stream) => Create( async (origin, cancellationToken) => await origin.CopyToAsync(stream, cancellationToken)); /// /// Creates a pipe target that writes to the specified file. /// public static PipeTarget ToFile(string filePath) => Create( async (origin, cancellationToken) => { await using var target = File.Create(filePath); await origin.CopyToAsync(target, cancellationToken); }); /// /// Creates a pipe target that writes to the specified string builder. /// public static PipeTarget ToStringBuilder(StringBuilder stringBuilder, Encoding encoding) => Create( async (origin, cancellationToken) => { using var reader = new StreamReader(origin, encoding, false, BufferSizes.StreamReader, true); using var buffer = MemoryPool.Shared.Rent(BufferSizes.StreamReader); while (!cancellationToken.IsCancellationRequested) { var charsRead = await reader.ReadAsync(buffer.Memory, cancellationToken); if (charsRead <= 0) break; stringBuilder.Append(buffer.Memory[..charsRead]); } }); /// /// Creates a pipe target that writes to the specified string builder. /// Uses for decoding. /// public static PipeTarget ToStringBuilder(StringBuilder stringBuilder) => ToStringBuilder(stringBuilder, Console.OutputEncoding); /// /// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream. /// public static PipeTarget ToDelegate(Func func, Encoding encoding) => Create( async (origin, cancellationToken) => { using var reader = new StreamReader(origin, encoding, false, BufferSizes.StreamReader, true); while (await reader.ReadLineAsync(cancellationToken) is { } line) { await func(line, cancellationToken); } }); /// /// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream. /// Uses for decoding. /// public static PipeTarget ToDelegate(Func func) => ToDelegate(func, Console.OutputEncoding); /// /// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream. /// public static PipeTarget ToDelegate(Func func, Encoding encoding) => ToDelegate( async (line, _) => await func(line), encoding); /// /// Creates a pipe target that invokes the specified asynchronous delegate on every line written to the stream. /// Uses for decoding. /// public static PipeTarget ToDelegate(Func func) => ToDelegate(func, Console.OutputEncoding); /// /// Creates a pipe target that invokes the specified synchronous delegate on every line written to the stream. /// public static PipeTarget ToDelegate(Action action, Encoding encoding) => ToDelegate( line => { action(line); return Task.CompletedTask; }, encoding); /// /// Creates a pipe target that invokes the specified synchronous delegate on every line written to the stream. /// Uses for decoding. /// public static PipeTarget ToDelegate(Action action) => ToDelegate(action, Console.OutputEncoding); /// /// Creates a pipe target that replicates data over multiple inner targets. /// public static PipeTarget Merge(IEnumerable targets) { // optimize targets to avoid unnecessary piping var optimizedTargets = OptimizeTargets(targets); return optimizedTargets.Count switch { // avoid merging if there are no targets 0 => Null, // avoid merging if there's only one target 1 => optimizedTargets.Single(), _ => new AggregatePipeTarget(optimizedTargets) }; static IReadOnlyList OptimizeTargets(IEnumerable targets) { var result = new List(); // unwrap merged targets UnwrapTargets(targets, result); // filter out no-op result.RemoveAll(t => t == Null); return result; } static void UnwrapTargets(IEnumerable targets, ICollection output) { foreach (var target in targets) { if (target is AggregatePipeTarget mergedTarget) { UnwrapTargets(mergedTarget.Targets, output); } else { output.Add(target); } } } } /// /// Creates a pipe target that replicates data over multiple inner targets. /// public static PipeTarget Merge(params PipeTarget[] targets) => Merge((IEnumerable)targets); }