最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

每日速读!Pipelines

来源:博客园

用于执行高性能的I/O,且代码不复杂


(资料图)

依赖库:System.IO.Pipelines

创建

var pipe = new Pipe();PipeReader reader = pipe.Reader;PipeWriter writer = pipe.Writer;

基本用法

// 对于socket进行读写操作async Task ProcessLinesAsync(Socket socket){// 声明一个Pipe    var pipe = new Pipe();    // 创建一个写入任务,大意是将socket中接受的数据写入到pipe中    Task writing = FillPipeAsync(socket, pipe.Writer);    // 创建一个读取任务,大意是将pipe获得数据读取出来    Task reading = ReadPipeAsync(pipe.Reader);    await Task.WhenAll(reading, writing);}// 写入任务代码async Task FillPipeAsync(Socket socket, PipeWriter writer){// 定义每个buffer为512    const int minimumBufferSize = 512;// 死循环    while (true)    {        // 获取512个byte的内存        Memory memory = writer.GetMemory(minimumBufferSize);        try        {            // 尝试从socket中读取byte数组,写入到memory中            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);            //如果没有读取到数据,则不继续执行            if (bytesRead == 0)            {                break;            }            // 告知 PipeWriter 有多少数据已写入缓冲区            writer.Advance(bytesRead);        }        catch (Exception ex)        {            LogError(ex);            break;        }        // 告知PipReader,数据可使用,如果不调用则reader一直认为不可使用        FlushResult result = await writer.FlushAsync();// 写入的数据已经全部操作完成        if (result.IsCompleted)        {            break;        }    }     // 写入完成,告诉PipeReader没有多余的数据进行操作了    await writer.CompleteAsync();}// 读取写入的数据async Task ReadPipeAsync(PipeReader reader){    while (true)    {    // 读取write的数据        ReadResult result = await reader.ReadAsync();        // 获取byte数组,该数据只读        ReadOnlySequence buffer = result.Buffer;// 将result.Buffer中的数据全部提取出来,感觉类似于数据库中的游标卡尺,一步一步的读取数据        while (TryReadLine(ref buffer, out ReadOnlySequence line))        {            // Process the line.            ProcessLine(line);        }        // 告知 PipeReader 已消耗和检查了多少数据        reader.AdvanceTo(buffer.Start, buffer.End);        // 停止读取,因为在buffer中没有多余的数据了        if (result.IsCompleted)        {            break;        }    }    // 结束,告知管道可以释放内存    await reader.CompleteAsync();}// 将buffer复制到line中,里面可以自己写逻辑,例如我们的数据是以字符总数开头,以某个字符结尾等等bool TryReadLine(ref ReadOnlySequence buffer, out ReadOnlySequence line){    // 查找结尾的字符串    SequencePosition? position = buffer.PositionOf((byte)"\n");// 如果结尾字符串未找到,则返回false    if (position == null)    {        line = default;        return false;    }    // 保存要截取的数据    line = buffer.Slice(0, position.Value);    // 将剩余的数据保存起来    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));    return true;}

实际中的差异

实际使用时,网络数据复制和读取后分析占用内存大小是存在差异的,因此需要根据特定环境进行配置。

假设分析数据慢于网络复制数据,则需要让网络数据多存一些然后再进行分析:

// 设置// PauseWriterThreshold:确定在调用 PipeWriter.FlushAsync 暂停之前应缓冲多少数据。// ResumeWriterThreshold:确定在恢复对 PipeWriter.FlushAsync 的调用之前,读取器必须观察多少数据。// 缓冲了10个byte后调用FlushAsync,分析了5个byte后调用PipeWriter.FlushAsyncvar options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);var pipe = new Pipe(options);

PipeWriter.FlushAsync:

  • Pipe中的数据量超过 PauseWriterThreshold时,返回不完整的 ValueTask
  • 低于 ResumeWriterThreshold时,返回完整的 ValueTask

使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

PipeScheduler

提供对异步回调运行位置的控制,默认情况下:

  • 使用当前的 SynchronizationContext。
  • 如果没有 SynchronizationContext,它将使用线程池运行回调。
public static void Main(string[] args){// 声明一个单线程的任务    var writeScheduler = new SingleThreadPipeScheduler();    var readScheduler = new SingleThreadPipeScheduler();    // 将声明的两个任务进行赋值    var options = new PipeOptions(readerScheduler: readScheduler,                                  writerScheduler: writeScheduler,                                  useSynchronizationContext: false);    var pipe = new Pipe(options);}public class SingleThreadPipeScheduler : PipeScheduler{    // 声明一个只读块,用于存放调用方法和读取的数据    private readonly BlockingCollection<(Action Action, object State)> _queue =     new BlockingCollection<(Action Action, object State)>();     // 创建单线程    private readonly Thread _thread;    public SingleThreadPipeScheduler()    {        _thread = new Thread(DoWork);        _thread.Start();    }    private void DoWork()    {    // 从只读块中获取数据        foreach (var item in _queue.GetConsumingEnumerable())        {        // 执行方法            item.Action(item.State);        }    }// 添加任务数据    public override void Schedule(Action action, object? state)    {    // 可以根据state进行判断是否进行数据解析或者其他操作        if (state is not null)        {            _queue.Add((action, state));        }    }}

流(Stream)

StreamPipeReaderOptions 允许使用以下参数控制 PipeReader实例的创建:

  • StreamPipeReaderOptions.BufferSize 是从池中租用内存时使用的最小缓冲区大小(以字节为单位),默认值为 4096
  • StreamPipeReaderOptions.LeaveOpen 标志确定在 PipeReader完成之后基础流是否保持打开状态,默认值为 false
  • StreamPipeReaderOptions.MinimumReadSize 表示分配新缓冲区之前缓冲区中剩余字节的阈值,默认值为 1024
  • StreamPipeReaderOptions.Pool 是分配内存时使用的 MemoryPool,默认值为 null

StreamPipeWriterOptions 允许使用以下参数控制 PipeWriter实例的创建:

  • StreamPipeWriterOptions.LeaveOpen 标志确定在 PipeWriter完成之后基础流是否保持打开状态,默认值为 false
  • StreamPipeWriterOptions.MinimumBufferSize 表示从 Pool 租用内存时要使用的最小缓冲区大小,默认值为 4096
  • StreamPipeWriterOptions.Pool 是分配内存时使用的 MemoryPool,默认值为 null

重要

使用 Create方法创建 PipeReaderPipeWriter实例时,需要考虑 Stream对象的生存期。 如果在读取器或编写器使用该方法完成操作后,你需要访问流,则需要在创建选项上将 LeaveOpen标志设置为 true。 否则,流将关闭。

using System.Buffers;using System.IO.Pipelines;using System.Text;class Program{    static async Task Main()    {        using var stream = File.OpenRead("Demo1.txt");// 创建一个读取的留        var reader = PipeReader.Create(stream);        var writer = PipeWriter.Create(            // 写入到命令行中            Console.OpenStandardOutput(),             // 重要中提到的还需要访问流,需要设置为常开,否则读取完比stream就自动关闭            new StreamPipeWriterOptions(leaveOpen: true));// 读取和写入的核心方法        var processMessagesTask = ProcessMessagesAsync(reader, writer);        var userCanceled = false;        var cancelProcessingTask = Task.Run(() =>        {            //点击键盘的C            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != "C")            {               //当只有输入了C,才会程序会关闭            }            userCanceled = true;            // No exceptions thrown            reader.CancelPendingRead();            writer.CancelPendingFlush();        });// 执行任务        await Task.WhenAny(cancelProcessingTask, processMessagesTask);        Console.WriteLine(            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");    }// 读取和写入的核心方法    static async Task ProcessMessagesAsync(PipeReader reader, PipeWriter writer)    {        try        {            while (true)            {                // 读取文本中的数据                ReadResult readResult = await reader.ReadAsync();                // 获得byte数组                ReadOnlySequence buffer = readResult.Buffer;                try                {                    // 如果没已取消,则跳出                    if (readResult.IsCanceled)                    {                        break;                    }// 尝试读取通道中的数据                    if (TryParseLines(ref buffer, out string message))                    {                        // 写入到通道中                        FlushResult flushResult =                            await WriteMessagesAsync(writer, message);// 如果写入完成或被取消则跳出                        if (flushResult.IsCanceled || flushResult.IsCompleted)                        {                            break;                        }                    }// 如果读取完成,则跳出                    if (readResult.IsCompleted)                    {                        // 读取完成,但是数据依然存在,则提示不正确(通过实际业务截取每段的数据,当截取后依然还有存留,表示截取的不正确)                        if (!buffer.IsEmpty)                        {                            throw new InvalidDataException("Incomplete message.");                        }                        break;                    }                }                finally                {                    // 告知读取通道已完成那些数据的读取                    reader.AdvanceTo(buffer.Start, buffer.End);                }            }        }        catch (Exception ex)        {            Console.Error.WriteLine(ex);        }        finally        {            //完成读写            await reader.CompleteAsync();            await writer.CompleteAsync();        }    }    static bool TryParseLines(        ref ReadOnlySequence buffer,        out string message)    {        SequencePosition? position;        StringBuilder outputMessage = new();        while(true)        {            // 判断回车,如果存在返回位置点            position = buffer.PositionOf((byte)"\n");// 如果位置点为空,就跳出            if (!position.HasValue)                break;// 获取这段数据            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))                        .AppendLine();// 保存剩余的数据供下次使用            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));        };// 获取截出的数据        message = outputMessage.ToString();        return message.Length != 0;    }    //将数据写入通道,本例子就是写入到控制台    static ValueTask WriteMessagesAsync(        PipeWriter writer,        string message) =>        writer.WriteAsync(Encoding.ASCII.GetBytes(message));}

关键词: