Java7 ForkJoin入门实例源码教程



Java7 ForkJoin入门实例源码教程。Java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。

有两个核心类ForkJoinPool和ForkJoinTask。

ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:

客户端非fork/join调用 内部调用fork/join
异步执行 execute(ForkJoinTask) ForkJoinTask.fork
等待获取结果 invoke(ForkJoinTask) ForkJoinTask.invoke
执行,获取Futrue submit(ForkJoinTask) ForkJoinTask.fork(ForkJoinTasks are Futures)

ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。

首先,用户需要创建一个自己的ForkJoinTask。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class MyForkJoinTask extends ForkJoinTask {
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    private V value;
    private boolean success = false;
    @Override
    public V getRawResult() {
        return value;
    }
    @Override
    protected void setRawResult(V value) {
        this.value = value;
    }
    @Override
    protected boolean exec() {
        System.out.println("exec");
        return this.success;
    }
    public boolean isSuccess() {
        return success;
    }
    public void setSuccess(boolean isSuccess) {
        this.success = isSuccess;
    }
}

测试ForkJoinPool.invoke(…):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Test
    public void testForkJoinInvoke() throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        MyForkJoinTask task = new MyForkJoinTask();
        task.setSuccess(true);
        task.setRawResult("test");
        String invokeResult = forkJoinPool.invoke(task);
        assertEquals(invokeResult, "test");
    }
    @Test
    public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask task = new MyForkJoinTask();
        new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                task.complete("test");
            }
        }).start();
        // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
        String result = forkJoinPool.invoke(task);
        System.out.println(result);
    }
    @Test
    public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask task = new MyForkJoinTask();
        task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
        ForkJoinTask result = forkJoinPool.submit(task);
        result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
    }

测试ForkJoinPool.submit(…):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Test
    public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask task = new MyForkJoinTask();
        task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
        ForkJoinTask result = forkJoinPool.submit(task);
        result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
    }
    @Test
    public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask task = new MyForkJoinTask();
        forkJoinPool.submit(task);
        Thread.sleep(1000);
    }
    @Test
    public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool();
        final MyForkJoinTask task = new MyForkJoinTask();
        new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                task.complete("test");
            }
        }).start();
        ForkJoinTask result = forkJoinPool.submit(task);
        // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
        result.get();
        Thread.sleep(1000);
    }

测试ForkJoinPool.execute(…):

1
2
3
4
5
6
@Test
   public void testForkJoinExecute() throws InterruptedException, ExecutionException {
       ForkJoinPool forkJoinPool = new ForkJoinPool();
       MyForkJoinTask task = new MyForkJoinTask();
       forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。
   }

在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。
分支/合并的完整过程如下:

下面列举一个分治算法的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MaximumFinder extends RecursiveTask<Integer> {
  private static final int SEQUENTIAL_THRESHOLD = 5;
  private final int[] data;
  private final int start;
  private final int end;
  public MaximumFinder(int[] data, int start, int end) {
    this.data = data;
    this.start = start;
    this.end = end;
  }
  public MaximumFinder(int[] data) {
    this(data, 0, data.length);
  }
  @Override
  protected Integer compute() {
    final int length = end - start;
    if (length < SEQUENTIAL_THRESHOLD) {
      return computeDirectly();
    }
    final int split = length / 2;
    final MaximumFinder left = new MaximumFinder(data, start, start + split);
    left.fork();
    final MaximumFinder right = new MaximumFinder(data, start + split, end);
    return Math.max(right.compute(), left.join());
  }
  private Integer computeDirectly() {
    System.out.println(Thread.currentThread() + ' computing: ' + start
                       + ' to ' + end);
    int max = Integer.MIN_VALUE;
    for (int i = start; i < end; i++) {
      if (data[i] > max) {
        max = data[i];
      }
    }
    return max;
  }
  public static void main(String[] args) {
    // create a random data set
    final int[] data = new int[1000];
    final Random random = new Random();
    for (int i = 0; i < data.length; i++) {
      data[i] = random.nextInt(100);
    }
    // submit the task to the pool
    final ForkJoinPool pool = new ForkJoinPool(4);
    final MaximumFinder finder = new MaximumFinder(data);
    System.out.println(pool.invoke(finder));
  }
}