internal class Program
{static int WriteDelay = 50;static int ReadDelay = 20;static async Task Main(string[] args){/* pauseWriterThreshold: 当PipeWriter的缓冲区大小超过pauseWriterThreshold时,会暂停写入* resumeWriterThreshold: 当PipeWriter的缓冲区大小低于resumeWriterThreshold时,会恢复写入* PipeOptions的pauseWriterThreshold和resumeWriterThreshold的值有以下规则:* 1. pauseWriterThreshold为-1时,使用默认值64k, resumeWriterThreshold为-1时,使用默认值32k* 2. pauseWriterThreshold和resumeWriterThreshold的值要么同时为0,要么同时大于0* 3. pauseWriterThreshold > resumeWriterThreshold* 4. 不传options时,pauseWriterThreshold和resumeWriterThreshold的值都为-1(最终分别为64k和32k)*/var options = new PipeOptions(pauseWriterThreshold: -1, resumeWriterThreshold: -1);var pipe = new Pipe(options);var writing = WriteAsync(pipe.Writer);var reading = ReadAsync(pipe.Reader);await Task.WhenAll(reading, writing);}static async Task WriteAsync(PipeWriter writer){while (true){await Task.Delay(WriteDelay);var memory = writer.GetMemory(100);try{var bytesRead = await ReadFromStreamAsync(memory);if (bytesRead == 0){break;}writer.Advance(bytesRead);// 可考虑await writer.WriteAsync, 相当于writer.Advance(bytesRead); await writer.FlushAsync();}catch (Exception ex){// 记录日志Console.WriteLine(ex);break;}var result = await writer.FlushAsync();if (result.IsCompleted){break;}}}static long Number = 0;/// <summary>/// 模拟异步读取/// </summary>/// <param name="memory"></param>/// <returns></returns>static Task<int> ReadFromStreamAsync(Memory<byte> memory){var tcs = new TaskCompletionSource<int>();Task.Run(() =>{var str = (++Number).ToString();var length = Encoding.UTF8.GetBytes(str, memory.Span);memory.Span[length] = (byte)'\n';tcs.SetResult(length + 1);Console.WriteLine("写入:" + Number);});return tcs.Task;}static async Task ReadAsync(PipeReader reader){try{while (true){await Task.Delay(ReadDelay);ReadResult result = await reader.ReadAsync();ReadOnlySequence<byte> buffer = result.Buffer;try{if (result.IsCanceled){break;}while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)){var str = Encoding.UTF8.GetString(line);Console.WriteLine("读取:" + str);}// 不要放在while之前,否则会导致数据未处理(因为本次可能返回了新数据)if (result.IsCompleted){break;}}finally{/** 第一个参数告诉pipe已处理的数据量,以便将内存池中的内存释放回去* 第二个参数告诉pipe已观察的数据量,这样相对观察数据未变化,下次read时处于阻塞状态,否则立马返回数据(观察的数据和新数据)*/reader.AdvanceTo(buffer.Start, buffer.End);}}}finally{await reader.CompleteAsync();}}static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line){var pos = buffer.PositionOf((byte)'\n');if (pos == null){line = default;return false;}line = buffer.Slice(0, pos.Value);// pos为\n, 所以offset为1buffer = buffer.Slice(buffer.GetPosition(1, pos.Value));return true;}
}