hamed hajiloo
hamed hajiloo
خواندن ۵ دقیقه·۴ سال پیش

معرفی System.Threading.Channels

در دنیای واقعی مثال های زیادی راجع به مسائل مربوط به چرخه تولید به مصرف وجود دارد.

مثلا در رستوران ها از زمان آماده سازی لوازم گرفته تا هنگام پخت غذا و تحویل سفارش کارهایی انجام میشود تا درخواست مشتری انجام شود، روش های مختلفی برای این کار وجود دارد:

  • هر قسمت منتظر این بماند که شخصی اعلام نیاز کند و سپس آن قسمت شروع به انجام وظیفه خود کند، که در این جا مشتری زمان بسیار زیادی را منتظر میماند.(پرفورمنس پایین)
  • تمام قسمت ها از قبل سفارشات آن روز را آماده داشته باشند. با این شرایط مشکلاتی از قبیل سرد شدن سفارش, نبود جا برای نگهداری سفارش و ... به وجود می آید.(استفاده غیر بهینه از منابع)
  • یک رویه مشخصی از چرخه تولید وجود داشته باشد که با تعداد و سرعت محاسبه شده ای هر قسمت از چرخه وظیفه خود را انجام دهد و محصول تولیدی را تحویل عنصر بعدی چرخه دهد.(پرفورمنس بالا و استفاده بهینه از منابع)


چرخه تولید به مصرف در برنامه نویسی

مثالی که خدمتتان ارائه شد در برنامه نویسی کاربرد فراوانی دارد و آن را تحت عنوان مسائل "Producer/consumer" میشناسیم.

Producer/consumer
Producer/consumer


فرض کنید سیستم ایمیل مارکتینگ شما قرار است یک میلیون کاربر را در حافظه لود کند و سپس به همه آن ها ایمیل ارسال کند. از زمانی که شما این کاربر ها را از دیتابیس لود میکنید هزینه بالایی بابت کوئری سنگین متحمل میشوید و همچنین بعد از لود کردن هم نگهداری این تعداد در حافظه باعث اشغال شدن مقدار زیادی از حافظه میشود. (هر چند عملیات با موفقیت انجام میشود ولی بهینه نیست!)

راه حل بهتر استفاده از سیستمی مثل چرخه رستورانی هست که مثال زده شد.

میتوانیم از Queue استفاده کنیم. سمت تولید کنند داده, داده را داخل صف قرار دهد و مصرف کننده از سمت دیگر از این داده استفاده کند.

در اینجا چالش های دیگری هم وجود دارد.

  • مثلا مصرف کننده باید مطلع شود که مقداری درون صف هست و یا این که هر موقع تولید کننده دیگر داده ای برای تولید نداشت باید به مصرف کننده این موضوع رو اطلاع دهد و قاعدتا این که سمت مصرف کننده مدام از صف بپرسد که آیا داده ای وجود دارد ایده خوبی نیست.

برای حل این چالش هم میتوان از semaphoreslim استفاده کرد.

  • چالش دیگری که وجود دارد این است که اگر برنامه ما به صورت موازی در حال کار کردن بود، بایستی صف های موجود در هر Thread از دسترسی توسط Thread دیگر مصون بمانند. (Thread safe)


همانطور که مشاهده میکنید با در نظر گرفتن این موارد میتوانید یک سیستم صف خوب برای پروژه خود ایجاد کنید یا این که از System.Threading.Channels استفاده کنید. زیرا تمام چالش های مطرح شده در این کلاس مدیریت شده است.

معرفی System.Threading.Channels

در این فضای نام، کلاس استاتیک Channel وجود دارد که برای ساخت یک چنل باید از یکی از متد های CreateBounded و CreateUnbounded استفاده کرد(هر کدام از این ها یک overload دیگر هم دارند که BoundedChannelOptions یا UnboundedChannelOptions به عنوان ورودی میگیرند که در ادامه راجع به هر کدام صحبت میشود.

public static Channel<T> CreateBounded<T>(int capacity) {} public static Channel<T> CreateBounded<T>(BoundedChannelOptions options) {} public static Channel<T> CreateUnbounded<T>() {} public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options) {}

تابع CreateBounded یک capacity به عنوان ماکسیمم سایز صف در نظر میگیرد در حالی که با استفاده از تابع CreateUnbounded هیچ محدودیتی برای سایز صف وجود ندارد (در این جا باید توجه داشت که اگر مقدار زیادی داده وجود دارد رم زیادی اشغال میشود و این موضوع خود عاملیست برای استفاده غیر صحیح از منابع)

اگر میخواهید از CreateBounded استفاده کنید، باید مشخص کنید که اگر صف پر بود چه استراتژی باید در نظر گرفته شود(منتظر بمانیم یا ...)؟

در اینجا میتوانیم از BoundedChannelOptions استفاده کنیم.

هر دوی کلاس های BoundedChannelOptions و UnboundedChannelOptions از کلاس ChannelOptions ارث بری میکنند. UnboundedChannelOptions به جز سه پراپرتی کلاس پدر ، چیز اضافی ای ندارد. ولی کلاس BoundedChannelOptions دو پراپرتی اضافه تر نیز دارد که به معرفی آنها میپردازیم.


Channel.CreateBounded<T>(new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.Wait });

پراپرتی FullMode میتواند 4 مقدار مختلف بگیرد که کاملا مشخص است هر کدام از این مقادیر چه کاری انجام میدهند.

public enum BoundedChannelFullMode { Wait, DropNewest, DropOldest, DropWrite }

حالت پیشفرض در اینجا wait است.

هم چنین با پراپرتی های دیگر این کلاس شما میتوانید مشخص کنید که فقط یک نفر میتواند داخل چنل، داده قرار دهد و بخواند یا بیشتر.

چگونه بر روی Channel داده قرار دهیم یا از آن بخوانیم؟

برای خواندن یا نوشتن بر روی چنل باید از دو کلاس ChannelReader و ChannelWriter استفاده کرد.

Channel<T> channel = Channel.CreateUnbounded<T>(); ChannelReader<T> reader = channel.Reader; ChannelWriter<T> writer = channel.Writer;

بهترین روش قرار دادن داده بر روی چنل استفاده از متد WriteAsync است. برای متوجه شدن این موضوع میتوانیم پیاده سازی این متد را ببینیم.

public async ValueTask WriteAsync(T item, CancellationToken cancellationToken) { while (await WaitToWriteAsync(cancellationToken).ConfigureAwait(false)) if (TryWrite(item)) return; throw new ChannelCompletedException(); }

همانطور که مشاهده میکنید اگر به هر دلیلی مثل پر بودن صف و ... در آن لحظه نتوانستیم مقداری داخل صف قرار دهیم، منتظر میمانیم تا صف این اجازه را به ما بدهد.


همچنین برای خواندن از روی صف نیز این موضوع صادق است.

public virtual async IAsyncEnumerable<T> ReadAllAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) while (TryRead(out T item)) yield return item; }

اعلام پایان کار Producer:

همانطور که مشاهده میکنید consumer همیشه منتظر خواندن داده میماند، پس قاعدتا Producer پس از پایان کار باید اعلام کند که دیگر داده ای برای نوشتن ندارد. در نتیجه در کلاس Writer از متد Complete میتوانیم برای انجام این کار استفاده کنیم.


جمع بندی:

برای به وجود آوردن بهترین حالت چه از لحاظ پرفورمنسی و چه از لحاظ استفاده صحیح منابع، از CreateBounded استفاده کرده و از متد های زیر برای خواندن و نوشتن بر روی آن.

//1 await channel.Writer.WriteAsync(item); //2 channel.Writer.Complete();


//3 await foreach (var item in channel.Reader.ReadAllAsync()) { Use(item); }

نمونه ساده ای از نحوه استفاده کردن از System.Threading.Channels را میتوانید در گیت هاب من مشاهده بفرمایید.


برنامه نویسیسی شارپqueuecsharp
حامد حاجیلو هستم، یک NET Full Stack Developer.
شاید از این پست‌ها خوشتان بیاید