JMH 的基本使用与 JCTools 的性能优化

JMH 学习笔记 + JCTools 性能优化效果示例。

废话不多说,直接上代码。

依赖

<dependencies>
    <!-- Junit 5 -->
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-engine</artifactId>
    </dependency>
    <dependency>
        <groupId>org.junit.platform</groupId>
        <artifactId>junit-platform-runner</artifactId>
        <scope>test</scope>
    </dependency>
    <!-- JCTools -->
    <dependency>
        <groupId>org.jctools</groupId>
        <artifactId>jctools-core</artifactId>
    </dependency>
    <!-- JMH -->
    <dependency>
        <groupId>org.openjdk.jmh</groupId>
        <artifactId>jmh-core</artifactId>
    </dependency>
    <dependency>
        <groupId>org.openjdk.jmh</groupId>
        <artifactId>jmh-generator-annprocess</artifactId>
    </dependency>
</dependencies>

Demo

import cn.hutool.core.util.ObjectUtil;
import lombok.AllArgsConstructor;
import org.jctools.queues.MpscUnboundedArrayQueue;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.*;

/**
 * @author ChenDifan
 * @date 2021-02-25
 */
// 模式,默认 Mode.Throughput,如果是 All,你可以看到整个性能测试跑了多遍,每次关注的指标不一样
@BenchmarkMode(Mode.Throughput)
// 预热次数,默认每次运行 3 秒,运行 3 次,这里大概需要 2 秒钟跑 1 次方法调用,保证可以跑完 1 次方法
@Warmup(iterations = 3, time = 3)
// 预热后的正式测试,一次运行 3 秒,总共运行 5 次
@Measurement(iterations = 5, time = 3)
// 多线程
@Threads(1)
// 多进程(每个方法一个进程)
@Fork(1)
// 类实例的生命周期,Scope.Benchmark 表示所有测试线程共享一个实例,用于测试有状态实例在多线程共享下的性能
@State(value = Scope.Benchmark)
// 统计结果的时间单元
@OutputTimeUnit(TimeUnit.MINUTES)
public class ProducerConsumerBenchmarkTests {

    private static final int EVENT_PER_THREAD = 10000;
    private static final int THREAD_COUNT = 100;

    @AllArgsConstructor
    static class BlockingQueueProduceTask implements Runnable {

        private final int eventPerThread;
        private final CyclicBarrier barrier;
        private final BlockingQueue<Object> notifierQueue;
        private final CountDownLatch mainLatch;

        @Override
        public void run() {
            try {
                this.barrier.await();
            } catch (Throwable e) {
                System.out.println(e.getMessage());
            }
            doProduce();
        }

        private void doProduce() {
            for (int i = 0; i < this.eventPerThread; i++) {
                this.notifierQueue.offer(new Object());
            }
            this.mainLatch.countDown();
        }
    }

    @AllArgsConstructor
    static class BlockingQueueConsumerTask implements Runnable {

        private final BlockingQueue<Object> notifierQueue;

        @Override
        public void run() {
            doConsume();
        }

        private void doConsume() {
            int count = THREAD_COUNT * EVENT_PER_THREAD;
            while (count > 0) {
                Object polled = null;
                try {
                    polled = notifierQueue.poll();
                } catch (Throwable t) {
                    System.out.printf("exit current thread: %s\n", Thread.currentThread().getName());
                }
                if (ObjectUtil.isNotNull(polled)) {
                    count--;
                }
            }
        }
    }

    @AllArgsConstructor
    static class NonBlockingQueueProduceTask implements Runnable {

        private final int eventPerThread;
        private final CyclicBarrier barrier;
        private final MpscUnboundedArrayQueue<Object> notifierQueue;
        private final CountDownLatch mainLatch;

        @Override
        public void run() {
            try {
                this.barrier.await();
            } catch (Throwable e) {
                System.out.println(e.getMessage());
            }
            doProduce();
        }

        private void doProduce() {
            for (int i = 0; i < this.eventPerThread; i++) {
                this.notifierQueue.offer(new Object());
            }
            this.mainLatch.countDown();
        }
    }

    @AllArgsConstructor
    static class NonBlockingQueueConsumerTask implements Runnable {

        private final MpscUnboundedArrayQueue<Object> notifierQueue;

        @Override
        public void run() {
            doConsume();
        }

        private void doConsume() {
            int count = THREAD_COUNT * EVENT_PER_THREAD;
            while (count > 0) {
                Object polled = null;
                try {
                    polled = notifierQueue.poll();
                } catch (Throwable t) {
                    System.out.printf("exit current thread: %s\n", Thread.currentThread().getName());
                }
                if (ObjectUtil.isNotNull(polled)) {
                    count--;
                }
            }
        }
    }

    private BlockingQueue<Object> notifierBlockingQueue;
    private MpscUnboundedArrayQueue<Object> notifierQueue;
    private CyclicBarrier barrier;
    private ThreadPoolExecutor producerGroup;

    // 每次测试之前加载一遍,也可以改变 Level
    @Setup(value = Level.Trial)
    public void setup() {
        notifierBlockingQueue = new LinkedBlockingQueue<>(THREAD_COUNT * EVENT_PER_THREAD);
        notifierQueue = new MpscUnboundedArrayQueue<>(THREAD_COUNT * EVENT_PER_THREAD);
        barrier = new CyclicBarrier(THREAD_COUNT);
        producerGroup = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT,
                0, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>());
    }

    @Benchmark
    public void testBlockingQueue() throws Throwable {
        CountDownLatch mainLatch = new CountDownLatch(THREAD_COUNT);
        barrier.reset();
        Thread consumer = new Thread(new BlockingQueueConsumerTask(notifierBlockingQueue));

        consumer.start();

        for (int i = 0; i < THREAD_COUNT; i++) {
            producerGroup.submit(new BlockingQueueProduceTask(EVENT_PER_THREAD, barrier, notifierBlockingQueue, mainLatch));
        }

        // -- concurrence loaded, wait all producers done --
        mainLatch.await();

        // -- wait consumer done --
        consumer.join();
    }

    @Benchmark
    public void testNonBlockingQueue() throws Throwable {
        CountDownLatch mainLatch = new CountDownLatch(THREAD_COUNT);
        barrier.reset();
        Thread consumer = new Thread(new NonBlockingQueueConsumerTask(notifierQueue));

        consumer.start();

        for (int i = 0; i < THREAD_COUNT; i++) {
            producerGroup.submit(new NonBlockingQueueProduceTask(EVENT_PER_THREAD, barrier, notifierQueue, mainLatch));
        }

        // -- concurrence loaded, wait all producers done --
        mainLatch.await();

        // -- wait consumer done --
        consumer.join();
    }

    // 每次测试之后作清理
    @TearDown
    public void tearDown() {
        producerGroup.shutdownNow();
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(ProducerConsumerBenchmarkTests.class.getSimpleName())
                // 输出 json 可以上 http://deepoove.com/jmh-visual-chart 生成图表
                .result("ProducerConsumerBenchmarkTests_Result.json")
                .resultFormat(ResultFormatType.JSON).build();
        new Runner(opt).run();
    }

}

效果

在自己 MacBookPro 上测的结果:

image-20210225230406192