pipe基础用法

news/2024/9/24 15:06:43
internal class Program
{static int WriteDelay = 50;static int ReadDelay = 20;static async Task Main(string[] args){/*  pauseWriterThreshold: 当PipeWriter的缓冲区大小超过pauseWriterThreshold时,会暂停写入*  resumeWriterThreshold: 当PipeWriter的缓冲区大小低于resumeWriterThreshold时,会恢复写入*  PipeOptions的pauseWriterThreshold和resumeWriterThreshold的值有以下规则:*  1. pauseWriterThreshold为-1时,使用默认值64k, resumeWriterThreshold为-1时,使用默认值32k*  2. pauseWriterThreshold和resumeWriterThreshold的值要么同时为0,要么同时大于0*  3. pauseWriterThreshold > resumeWriterThreshold*  4. 不传options时,pauseWriterThreshold和resumeWriterThreshold的值都为-1(最终分别为64k和32k)*/var options = new PipeOptions(pauseWriterThreshold: -1, resumeWriterThreshold: -1);var pipe = new Pipe(options);var writing = WriteAsync(pipe.Writer);var reading = ReadAsync(pipe.Reader);await Task.WhenAll(reading, writing);}static async Task WriteAsync(PipeWriter writer){while (true){await Task.Delay(WriteDelay);var memory = writer.GetMemory(100);try{var bytesRead = await ReadFromStreamAsync(memory);if (bytesRead == 0){break;}writer.Advance(bytesRead);// 可考虑await writer.WriteAsync, 相当于writer.Advance(bytesRead); await writer.FlushAsync();}catch (Exception ex){// 记录日志Console.WriteLine(ex);break;}var result = await writer.FlushAsync();if (result.IsCompleted){break;}}}static long Number = 0;/// <summary>/// 模拟异步读取/// </summary>/// <param name="memory"></param>/// <returns></returns>static Task<int> ReadFromStreamAsync(Memory<byte> memory){var tcs = new TaskCompletionSource<int>();Task.Run(() =>{var str = (++Number).ToString();var length = Encoding.UTF8.GetBytes(str, memory.Span);memory.Span[length] = (byte)'\n';tcs.SetResult(length + 1);Console.WriteLine("写入:" + Number);});return tcs.Task;}static async Task ReadAsync(PipeReader reader){try{while (true){await Task.Delay(ReadDelay);ReadResult result = await reader.ReadAsync();ReadOnlySequence<byte> buffer = result.Buffer;try{if (result.IsCanceled){break;}while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)){var str = Encoding.UTF8.GetString(line);Console.WriteLine("读取:" + str);}// 不要放在while之前,否则会导致数据未处理(因为本次可能返回了新数据)if (result.IsCompleted){break;}}finally{/**  第一个参数告诉pipe已处理的数据量,以便将内存池中的内存释放回去*  第二个参数告诉pipe已观察的数据量,这样相对观察数据未变化,下次read时处于阻塞状态,否则立马返回数据(观察的数据和新数据)*/reader.AdvanceTo(buffer.Start, buffer.End);}}}finally{await reader.CompleteAsync();}}static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line){var pos = buffer.PositionOf((byte)'\n');if (pos == null){line = default;return false;}line = buffer.Slice(0, pos.Value);// pos为\n, 所以offset为1buffer = buffer.Slice(buffer.GetPosition(1, pos.Value));return true;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/64180.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

APGL4SR论文阅读笔记

APGL4SR: A Generic Framework with Adaptive and Personalized Global Collaborative Information in Sequential Recommendation论文阅读笔记 Abstract 现存的问题: ​ 现有方法通常只关注序列内建模,而忽略了通过序列间建模来利用全局协作信息,从而导致推荐效果不佳。以往…

DDD学习与感悟——向屎山冲锋

软件系统是通过软件开发来解决某一个业务领域或问题单元而产生的一个交付物。而通过软件设计可以帮助我们开发出更加健壮的软件系统。因此,软件设计是从业务领域到软件开发之间的桥梁。而DDD是软件设计中的其中一种思想,旨在提供一种大型复杂软件的设计思路和规范。通过DDD思…

大数据从业者必知必会的Hive SQL调优技巧

大数据从业者必知必会的Hive SQL调优技巧 摘要:在大数据领域中,Hive SQL被广泛应用于数据仓库的数据查询和分析。然而,由于数据量庞大和复杂的查询需求,Hive SQL查询的性能往往不尽人意。本文针对Hive SQL的性能优化进行深入研究,提出了一系列可行的调优方案,并给出了相应…

.net core开源工作流程框架elsa源码阅读之容器的理解

官方文档:https://v3.elsaworkflows.io/官方文档:https://v3.elsaworkflows.io/ 这个框架的依赖注入容器,底层是靠原生的IServiceCollection,没有使用其他的三方容器;然后在这个基础上,作者进行了封装。 主要是用了Module类和继承了IFeature接口的类完成了依赖注入容器的…

22320102 张怡晨9.24

思维导图:亿图 Xmind PDF转换器 :CAJ lightPDF(免费)www.light pdf.com pdf Candy

详解Diffusion扩散模型:理论、架构与实现

本文深入探讨了Diffusion扩散模型的概念、架构设计与算法实现,详细解析了模型的前向与逆向过程、编码器与解码器的设计、网络结构与训练过程,结合PyTorch代码示例,提供全面的技术指导。关注TechLead,复旦AI博士,分享AI领域全维度知识与研究。拥有10+年AI领域研究经验、复旦…

2024.9.24第二次课

思维导图制作:使用Xmind