پارسا کاویان پور
پارسا کاویان پور
خواندن ۱۰ دقیقه·۲ سال پیش

Non-Blocking IO


فرض کنید میخوایم یه سرور بنویسیم که بتونه کانکشن هارو هندل کنه

چه راه هایی جلوی پای ماست؟

  1. Simple blocking server ( پردازش همه چیز با یک نخ)
  2. Threaded blocking server (پردازش هر درخواست توسط نخ جداگانه)
  3. Threaded pool blocking servers(استفاده از ExecuterService)
  4. Java NIO blocking(پردازش همه چیز با یک نخ البته کتابخانه ی جدید )
  5. Java NIO non-blocking pooling
  6. Java NIO non-blocking selector


در ادامه انواع روش های بالا به همراه مزایا و معایب آنها را بررسی میکنیم.


Simple Blocking Server

کد سرور:

import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; public class Server { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); while (true) { Socket client = serverSocket.accept(); System.out.println(&quotA new client accepted.&quot); int data; while ((data = client.getInputStream().read()) != -1) { { System.out.print((char) data); } } client.close(); } } }


کد کلاینت:

import java.io.IOException; import java.io.PrintWriter; import java.net.Socket; import java.util.Scanner; public class Client { public static void main(String[] args) { try { Socket socket = new Socket(&quotlocalhost&quot, 8080); Scanner scanner = new Scanner(System.in); PrintWriter pw = new PrintWriter(socket.getOutputStream()); while (true) { String msg = scanner.nextLine(); if (msg.equals(&quotclose&quot)) { scanner.close(); pw.close(); socket.close(); break; } pw.println(msg); pw.flush(); } socket.close(); } catch (IOException e) { throw new RuntimeException(e); } } }

(برای تست سرور بجز کد کلاینت میتونید از دستور telnet localhost 8080 استفاده کنید)


همانطور که ملاحظه میفرمایید, سرور تنها همزمان یک کلاینت رو پردازش میکنه. به محض اینکه کلاینت فعلی close شد , سرور توانایی پردازش کلاینت بعدی رو پیدا میکنه.

ضمنن متد accept() بلاک هست (blocking) درواقع سرور هیچکاری نمیتونه بکونه بجز اینکه کل توانش رو بذاره روی انتظار برای کانکت شدن کلاینت جدید.

اگر برنامه رو ران کنید میبینید که اگر چندین کلاینت وصل بشن تنها اولی میتونه داده بفرسته . زمانی که close رو نوشت تازه حالا میره سراغ کلاینت دومی و الی آخر !

سرور وحشتناکیه نه؟ اگر مثلا تلگرام اینطوری بود تا زمانی که اولین نفری که کانکت شده کارش تموم نشه ما نمیتونیم پیام بدیم (:


Threaded blocking server

import java.io.*; import java.net.*; import java.util.ArrayList; import java.util.List; import java.util.Scanner; public class MultiServer extends Thread { private static final List<PrintWriter> clients = new ArrayList<>(); public static void main(String[] args) { if (args.length != 1) { System.err.println(&quotMISSING PORT NUMBER!!!!&quot); System.err.println(&quotUsage: java MultiServer <Port Number>&quot); System.exit(1); } try { int portNumber = Integer.parseInt(args[0]); ServerSocket ss = new ServerSocket(portNumber); System.err.println(String.format(&quotServer with port number %d ready to start&quot, portNumber)); new MultiServer().start(); System.out.println(&quotWaiting for the client to connect&quot); while (true) { Socket sock = ss.accept(); synchronized (clients) { clients.add(new PrintWriter(sock.getOutputStream(), true)); } System.err.println(&quotA new client accepted on the port &quot.concat(String.valueOf(portNumber))); new IOThread(sock.getInputStream(), System.out).start(); } } catch (Exception e) { System.out.println(e); e.printStackTrace(); } } @Override public void run() { Scanner in = new Scanner(System.in); String line; while (in.hasNextLine()) { line = in.nextLine(); if (line.isEmpty()) { continue; } doBroadCast(&quotserver&quot, line); } } public static void doBroadCast(String sender, String input) { for (PrintWriter pw : clients) { String response = String.format(&quot%s: %s&quot, sender, input); System.out.println(&quotServer Terminal -> &quot.concat(response)); pw.println(response); } } }

همانطور که ملاحظه میکنید این کلاس داره کارها ی زیر رو انجام میده:

1- منتظر کانکشن جدید میمونه.هروقت کانکشن جدید برقرار شد دیگه با ترد خودش نمیاد منتظر داده بشینه! واگذارش میکنه به کلاسی به نام IOThread که کدش رو زیر قرار دادم.اینطوری میتونه فقط وظیفه ی منتظر کانکشن موندن رو برعهده بگیره. یعنی میتونه هر کانکشنی اومد رو اکسپت کنه و توی یک نخ دیگه بپردازه به داده هایی که میفرستند.

2-یه سری قابلیت اضافه هم نوشتم مثلا یک ترد جدا ایجاد میشه برای ورودی هایی که سرور میخواد بفرسته برای همه.

شاید سوال پیش بیاد براتون چرا synchronized(clients)?

بخاطر اینکه یه ترد نویسنده(تولید کننده) داریم و یه ترد خواننده (مصرف کننده)

حالا فرض کنید کلاینت کانکت شده ولی قبل ازینکه به لیست clients اددش کنیم یکی مسیج فرستاده و doBroadCast فراخوانی شده(این متد توسط کلاس IOThread که در زیر میبینید فراخوانی شده).اینجا باید بگیم clients قفل باشه وقتی یکی خواست کانکت شه ,وقتی کانکت شد(accept شد و اضافه شد به لیست کلاینت ها),اونوقت فرستاده بشه که همه بتونن دریافتش کنن!درواقع میخوایم کلاینت اخری که تازه کانکت شده جا نمونه!

import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; public class IOThread extends Thread { private BufferedReader input; private PrintWriter output; private String name; public IOThread(InputStream input, OutputStream output) { this.input = new BufferedReader(new InputStreamReader(input)); this.output = new PrintWriter(output, true); } @Override public void run() { try { String line; while ((line = input.readLine()) != null) { if (line.contains(&quotMY NAME IS:&quot)) { name = line.split(&quot:&quot)[1].trim(); }else MultiServer.doBroadCast(name, line); } } catch (Exception e) { e.printStackTrace(); } } }

کلاس بالا تنها وظیفه اش گوش دادن به پیام ها و ارسالش برای همه ی کلاینت هاست

مثل یه چتروم!حالا وقتشه بریم سراغ کلاس کلاینت:


import java.io.*; import java.net.*; import java.util.Date; import java.util.UUID; public class MyClient { public static void main(String[] args) { Date m = new Date(); System.out.println(m.toString()); if (args.length != 3 && args.length != 2) { System.err.println(&quotUsage: java MyClient <host name> <port number> <Optional: Client name>&quot); System.exit(1); } try { Socket s = new Socket(&quotlocalhost&quot, 8888); new Thread(new Runnable() { final BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream())); @Override public void run() { String fromServer; try { while ((fromServer = in.readLine()) != null) { System.out.println(fromServer); if (fromServer.equals(&quotShab Bekheir&quot)) { break; } } } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { PrintWriter out = new PrintWriter(s.getOutputStream(), true); BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in)); String fromUser; @Override public void run() { String name = (args.length != 3) ? UUID.randomUUID().toString().substring(0, 5) : args[2]; out.println(&quotMY NAME IS: &quot + name); try { while ((fromUser = stdIn.readLine()) != null) { out.println(fromUser); if (fromUser.equals(&quotShab Bekheir&quot)) { break; } } } catch (IOException e) { e.printStackTrace(); } } }).start(); } catch (IOException e) { System.out.println(e); } } }

خب اینجا دقیقا چندتا ترد داریم؟

واضح و مبرهن هست که کلا دوتا ترد نیازه یکی که داده بگیره و بفرسته به سرور و یکی که داده از سرور بگیره و نشون بده به کلاینت!


مشکلات پیاده سازی به روش بالا:

فرض کنید 3000 تا کلاینت میان کانکت میشن !

public class SimulateHeavyLoad { public static void main(String[] args) throws InterruptedException { final Socket[] sockets = new Socket[3000]; // Create 3000 connections for (int x = 0; x < sockets.length; x++) { try { sockets[x] = new Socket(&quotlocalhost&quot, 8080); } catch (final IOException e) { System.err.println(e); } } // To enable the sockets to live. for awhile Thread.sleep(Long.MAX_VALUE); } }

یعنی حدودا 6000 تا ترد باید ایجاد بشه توی سرور و بسته هم نشن. یا خدا!احتمالا آتیش میگیره سرور.

به هرحال احتمالا ارور های مختلفی میده. Crash میکنه. OutOfMemory میده و یا...

اگر هم منابع هم به طرز ایده آلی زیاد باشه به هرحال تعداد ترد هایی که میتونیم ایجاد کنیم محدودیت داره!

با این وجود برنامه ای مثل تلگرام که ممکنه میلیون ها کاربر بهش وصل شن از چه مکانیزمی استفاده میکنه؟ قطعا //از این روش نه!

شاید یه روش به ذهنمون برسه* برای مدیریت روش بالا.

روشی که به ذهنمون میرسه:

بیایم تعداد ترد هارو مدیریت کنیم! مثلا بگیم فقط 50 تا ترد ساخته بشه و بیشتر دیگه ساخته نشه اینطوری به Crash / OutOfMemoryError و... بر نمیخوریم!پس بریم سراغ روش سوم.


3-Threaded pool blocking servers

import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(8080)); ExecutorService executorService = Executors.newFixedThreadPool(2); while (true) { Socket client = serverSocket.accept(); System.out.println(&quotA new client accepted.&quot); executorService.submit(() -> { int data; try { while ((data = client.getInputStream().read()) != -1) { System.out.print((char) data); } } catch (IOException e) { throw new RuntimeException(e); }finally { try { client.close(); } catch (IOException e) { throw new RuntimeException(e); } } }); } } }

کد کلاینت رو همون کد کلاینت اولین مثال در نظر بگیرید و یا از telnet استفاده کنید.

برنامه ی اولی رو جوری تغییر دادیم که بتونه با چندتا کلاینت همزمان کار کنه ولی صرفا به تعدادی که ما مشخص کردیم, دوتا ترد که یعنی فقط دو نفر همزمان میتونن وصل شن.اینطوری دیگه ترد های اضافی زیادی ایجاد نمیشه و خب Cpu نمیترکه!

اما مشکل اصلی:

کاربر سوم باید منتظر بمونه تا یکی از ترد ها ازاد شه . چطوری آزاد میشه؟ با close شدن کلاینت!

4-NIO(New IO) Blocking Server

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; public class Server { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8080)); ByteBuffer buffer = ByteBuffer.allocate(80); while(true){ SocketChannel client= serverSocketChannel.accept(); for(int data;(data=client.read(buffer))!=-1;){ buffer.flip(); String s = StandardCharsets.UTF_8.decode(buffer).toString(); System.out.print(s); buffer.clear(); } } } }

این مثل همون اولی هست صرفا بوسیله ی Api جدید جاوا که NIO(منظور اینجا Non-blocking io نیست بلکه java.nio هست) نام داره نوشته شده!

کد کلاینت هم همون کد اولی هست و مثل همیشه میتونید از دستور telnet هم استفاده کنید.

حواستون باشه توی java.nio ما دیگه InputStream و OutputStream نداریم بلکه با ByteBuffer برای خواندن و نوشتن کار میکنیم.


یه سری توضیحات ریز درمورد ByteBuffer:

باید بهش سایز بدیم که توی کلاس ByteBuffer تحت متغیری به نام limit ذخیره میشه. الان ما Limit رو روی 80 ست کردیم.یعنی هر ریکوئست با هر سایزی ما هشتاد بایت هشتاد بایت میخونیمش.حالا اگر 80 تا خوندیم ولی فقط یه کاراکتر اومده بود چی؟ خب متغیری وجود داره در این کلاس به نام position که میگه چند بایت اشغال شده.

کار متد flip اینه که Position رو مجددا روی 0 تنظیم کنه و limit رو روی Position اینطوری موقع خوندن اون بیت های خالی در نظر گرفته نمیشن!.


برای اینکه چندتا کلاینت رو بتونه هندل کنه . باید non blocking باشه. پس بیاید ببینیم چطوری پیاده سازی میشه

Java NIO non-blocking pooling

import java.io.IOException; import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class NIONonBlockingPollingServer { public static void main(String[] args) throws IOException { final ServerSocketChannel serverSocket = ServerSocketChannel.open(); // Creating the server on port 8080 // Binding this server on the port serverSocket.bind(new InetSocketAddress(8080)); serverSocket.configureBlocking(false); // Make Server nonBlocking final Map<SocketChannel, ByteBuffer> sockets = new ConcurrentHashMap<>(); while (true) { // Accept means it will accept the incoming connection final SocketChannel socket = serverSocket.accept(); // It may return null, as since its Non-Blocking, there may not be anything on this socket everytime. if (socket != null) { socket.configureBlocking(false); // Required, socket should also be NonBlocking // Every socket will have its own byte buffer sockets.put(socket, ByteBuffer.allocateDirect(80)); } sockets.keySet().removeIf((socketChannel) -> !socketChannel.isOpen()); // Remove socketChannel which is not opened // Iterate through sockets to see f any socket has anything to say sockets.forEach((socketCh, byteBuffer) -> { try { int data = socketCh.read(byteBuffer); if (data == -1) { closeSocket(socketCh); } else if (data != 0) { // 0 means socket has nothing to say byteBuffer.flip(); String s = StandardCharsets.UTF_8.decode(byteBuffer).toString(); System.out.println(s); byteBuffer.compact(); } } catch (IOException e) { closeSocket(socketCh); throw new UncheckedIOException(e); } }); } } private static void closeSocket(final SocketChannel socket) { try { socket.close(); } catch (IOException ignore) { } } }

نکته : متد read اگر مقداری کمتر از صفر برگرداند به معنای بسته شدن سوکت از طرف کلاینت است.

توی io میتونید اینطوری بفهمید که بسته شده یا خیر:

if(socket.getInputStream().read(bytearray) < 0);


برای تست از کلاس کلاینت که برای اولین مثال نوشتیم یا از telnet استفاده کنید.

خب این کار میکنه ولی یه مشکل ریز داره!

وقتی میگیم non blocking هست یعنی چی؟یعنی دیگه روی متد accept نباید گیر کنه!

توی blocking سرور منتظر میموند که یه کلاینت بیاد وصل شه درسته؟و درواقع ترد همونجا نگه داشته میشد

ولی non blocking ترد رو نگه نمیداره. پس ممکنه accept چیزی که برمیگردونه null باشه!(بدیهی هست که اگر کلاینتی کانکت شه null نمیده و فقط وقتی میده که تو اون لحظه کلاینتی کانکت نشده و خیلی اتفاق میوفته این حالت). پس باید چک کنیم نال نباشه.

در ضمن هر لحظه باید تمام کلاینت هارو پیمایش کنیم و ببینیم ایا کسی چیزی فرستاده؟که با forEach این کارو انجام دادیم.

برای کانکشن های زیاد کار میکنه با یک Thread ولی خب حالت جالبی نیست.

در نهایت باید به این سوال پاسخ بدیم. Blocking IOبهتر است یا Non-blocking IO?


Blocking IO Vs Non blocking IO


با توجه به این لینک , اگر تعداد کانکشن ها بسیار زیاد است(هزاران) و هرکدام از کانکشن ها داده های نه چندان زیادی ارسال میکنند(مثلا سرور چت) استفاده از Non blocking به صرفه تر است زیرا Single thread است. اما اگر کانکشن ها انچنان زیاد نیست و پهنای باند بالاست بهتر است از IO استفاده شود.


NIO non-blocking selector

بهترین حالت موجود. در این حالت دیگه متد accept خروجی null به ما نمیده.

پیاده سازی این حالت رو در نوشته ای دیگر بررسی میکنیم.


non blockingblockingservernetworkingio and nio
شاید از این پست‌ها خوشتان بیاید