2010年11月21日日曜日

Concurrency Utilities - Java

OverlayWeaverのIterativeRoutingDriverクラスのrouteメソッドを読んでいて思ったこと。
java.util.concurrent.ExecutorServiceってなんだ?

ってことで調べてみました。


まず、並行プログラミングをするとき、よくThreadクラスを使って行いますね。しかし、これを使いこなすのはちょっと難しい。ってことで、J2SE 5.0からは並行プログラミングを容易にするのためのライブラリConcurrency Utilitiesが導入されました。これで、スレッドを簡単に?操れるみたい。
(ライブラリは、java.util.concurrentに含まれています。)


そんでExecutorServiceというのは、そのライブラリの中に含まれているタスクの非同期実行を担当するExectuorインターフェイスの派生インタフェースです。
そして、実際に使用するのはその実装クラスであるThreadPoolExecutorクラスで、これを利用するとスレッド・プール方式での処理が実行可能となる。(参考資料2のイグゼキュタの仕組みの項目を見るとわかりやすいです。)

詳しい説明は、参考資料を見てください。

いや、別に説明するのがめんどくさいわけではないんだからね。ξ゚⊿゚)ξ


とりあえず、勉強がてら作ってみたプログラムを載せときます。

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ThreadPoolExecutorTest {

 public static void main(final String[] args) {
  new ThreadPoolExecutorTest().start();
 }

 public void start() {
  System.out.println("開始");

  // Runnableを使う場合
  final BlockingQueue queue = new LinkedBlockingQueue();
  for (int i = 0; i < 10; i++) {// タスクをqueueに事前に積んでおき実行することができる
   queue.add(new Task(1));
  }

  System.out.println("タスク実行その1");
  final ExecutorService executerService = new ThreadPoolExecutor(3, 3,
    500, TimeUnit.MILLISECONDS, queue,
    new ThreadPoolExecutor.CallerRunsPolicy());
  
  System.out.println("タスク実行その2");
  for (int i = 0; i < 10; i++) {
   executerService.execute(new Task(2));// タスクの実行
  }
  
  // Callableを使う場合
  System.out.println("タスク実行その3");
  int timeout = 1000;
  for (int i = 0; i < 10; i++) {
   if (i == 5) {
    timeout = 50;
   }
   Future future = executerService.submit((Callable) new Task(3));// タスクの実行
   try {
    // タスクの結果を取得する。ただし、設定した時間までに処理が終わらなかったらタイムアウトする。
    Date time = (Date) future.get(timeout, TimeUnit.MILLISECONDS);
    System.out.println("タスク終了時間: " + time);
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (ExecutionException e) {
    e.printStackTrace();
   } catch (TimeoutException e) {
    System.err.println("タスクがタイムアウトしました");
    e.printStackTrace();
   }
  }

  // 必ずshutdownを実行する。実行しないとスレッドが開放されない。
  System.out.println("ShutDown実行。");
  executerService.shutdown();
  System.out.println("終了");
 }

 private static class Task implements Runnable, Callable {
  int taskNumber = 0;

  Task(int taskNumber) {
   this.taskNumber = taskNumber;
  }

  @Override
  public Date call() {
   try {
    System.out.println(taskNumber + "タスク開始");
    MILLISECONDS.sleep(100L);// 100 ミリ秒のスリープ
    System.out.println(taskNumber + "タスク終了");
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   return new Date();
  }

  @Override
  public void run() {
   try {
    Date time = this.call();
    System.out.println(taskNumber + "タスク終了時間:" + time);
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
}

参考資料
1. 「Java SE 6完全攻略」第49回 Concurrency Utilitiesの変更点 その1 
2. 第6回 並行プログラミング用ライブラリ(1)――Excecutorの仕組み

0 件のコメント:

コメントを投稿