最快的方法来初始化ConcurrentHashMap

最快的方法来初始化ConcurrentHashMap

本文介绍了最快的方法来初始化ConcurrentHashMap的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

ConcurrentHashMap通常用于并发环境中,用于在键下聚合某些事件 - 比如计算某些字符串值的命中率。如果我们事先不知道密钥,我们需要有一个很好的方法来初始化密钥的需要,它应该是快速和安全的并发性。
这个问题的最佳模式(就效率而言)是什么?

ConcurrentHashMap is often used in concurrent environments for aggregation of some events under a key - like counting hits for some string values. In case we don't know the keys in advance we need to have a good way to initialize key on need, it should to be fast and safe in terms of concurrency.What is the best pattern (in terms of efficiency) for this problem?

我将使用带有的模型映射< String ,AtomicInteger> 声明如下:

I will use a model map with <String, AtomicInteger> declared like:

ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();

但它可能是一个包含任何键值对的地图,我们需要初始化一个键 - 值对如果键中的键尚未存在并且将值状态变为记录事件。

But it could be a map with any key-value pair, where we need to initialize a key-value pair if the key does not already exists in the map and mutate the state of value to record event.

有两种流行的方法:
第一个使用 ConcurrentHashMap.putIfAbsent

AtomicInteger count = map.get(s);
if (count == null) {
    count = new AtomicInteger(0);
    AtomicInteger prevCount = map.putIfAbsent(s, count);
    if (prevCount != null) {
        count = prevCount;
    }
}
count.incrementAndGet();

第二个使用 ConcurrentHashMap.computeIfAbsent

AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
count.incrementAndGet();

哪一个更适合这项任务?还有其他方法吗?

Which one is better suited for this task? Are there other approaches?

推荐答案

不幸的是jdk1.8.0_131, computeIfAbsent 总是进入 synchronized 块,看看密钥是否已经存在,这使得它比 putIfAbsent

Unfortunatelly up to jdk1.8.0_131, the computeIfAbsent always go into the synchronized block, regardles if the key is already there or not, which makes it way slower than the putIfAbsent.

此基准确认了这一点,看来根据争用等级 putIfAbsent 是从2到比 computeIfAbsent 快50倍。

This benchmark confirms this, it appears that depending on contention level putIfAbsent is from 2 to 50 times faster than computeIfAbsent.

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentHashMapTest {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
    private final static int keysSize = 10;
    private final static String[] strings = new String[keysSize];
    static {
        for (int n = 0; n < keysSize; n++) {
            strings[n] = "" + (char) ('A' + n);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testPutIfAbsent();
            testComputeIfAbsentLambda();
        }
    }

    private static void testPutIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.get(s);
                        if (count == null) {
                            count = new AtomicInteger(0);
                            AtomicInteger prevCount = map.putIfAbsent(s, count);
                            if (prevCount != null) {
                                count = prevCount;
                            }
                        }
                        count.incrementAndGet();
                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    private static void testComputeIfAbsentLambda() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
                        count.incrementAndGet();

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

}

输出:

Test testPutIfAbsent average time per run: 115.756501 ns
Test testComputeIfAbsentLambda average time per run: 276.9667055 ns
Test testPutIfAbsent average time per run: 134.2332435 ns
Test testComputeIfAbsentLambda average time per run: 223.222063625 ns
Test testPutIfAbsent average time per run: 119.968893625 ns
Test testComputeIfAbsentLambda average time per run: 216.707419875 ns
Test testPutIfAbsent average time per run: 116.173902375 ns
Test testComputeIfAbsentLambda average time per run: 215.632467375 ns
Test testPutIfAbsent average time per run: 112.21422775 ns
Test testComputeIfAbsentLambda average time per run: 210.29563725 ns
Test testPutIfAbsent average time per run: 120.50643475 ns
Test testComputeIfAbsentLambda average time per run: 200.79536475 ns

我们可以使用 putIfAbsent 批准ach创建更快 computeIfAbsent
唯一的区别是这个新的 computeIfAbsent 可以多次调用初始化函数并发初始化相同的密钥。基准测试结果与'putIfAbsent'相同,因为它是相同的代码,这不是一个大惊喜,但如果有人想测试这个,这里是基准:

We can use the putIfAbsent approach to create faster computeIfAbsent.The only difference would be that this new computeIfAbsent could call the initialization function more than once in case of concurrent initialization of the same key. The benchmark results are identical as with the 'putIfAbsent', since it is the same code, it is not a big surprise, but in case anyone would like to test this, here is the benchmark:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class CocnurrentHashMap2Benchmark {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
    private final static int keysSize = 10;
    private final static String[] strings = new String[keysSize];
    static {
        for (int n = 0; n < keysSize; n++) {
            strings[n] = "" + (char) ('A' + n);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testPutIfAbsent();
            testComputeIfAbsent2Lambda();
        }
    }

    private static void testPutIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.get(s);
                        if (count == null) {
                            count = new AtomicInteger(0);
                            AtomicInteger prevCount = map.putIfAbsent(s, count);
                            if (prevCount != null) {
                                count = prevCount;
                            }
                        }
                        count.incrementAndGet();
                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }


     private static void testComputeIfAbsent2Lambda() throws InterruptedException {
            final AtomicLong totalTime = new AtomicLong();
            final ConcurrentHashMap2<String, AtomicInteger> map = new ConcurrentHashMap2<String, AtomicInteger>();
            final Random random = new Random();
            ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
            for (int i = 0; i < numberOfThreads; i++) {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        long start, end;
                        for (int n = 0; n < numberOfRuns; n++) {
                            String s = strings[random.nextInt(strings.length)];
                            start = System.nanoTime();

                            AtomicInteger count = map.computeIfAbsent2(s, (k) -> new AtomicInteger(0));
                            count.incrementAndGet();

                            end = System.nanoTime();
                            totalTime.addAndGet(end - start);
                        }
                    }
                });
            }
            executorService.shutdown();
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                    + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
        }

        public static class ConcurrentHashMap2<K,V> extends ConcurrentHashMap<K,V> {

            /**
             * If there is no mapping for the key then computes and puts the mapping,
             * otherwise it simply return the value for that key.
             * In case of concurrent initialization of the same key the mappingFunction can be called more than once.
             * @param key - the key to be initialized or retrieved
             * @param mappingFunction - the function to be called for computation of initial value.
             * @return computed value if the key wasn't already in the map otherwise return the actual value for provided key.
             */
            public V computeIfAbsent2(K key, Function<K,V> mappingFunction) {
                V value = get(key);
                if (value == null) {
                    value = mappingFunction.apply(key);
                    V prevValue = putIfAbsent(key, value);
                    if (prevValue != null) {
                        value = prevValue;
                    }
                }
                return value;
            }
        }
}

结果:

Test testComputeIfAbsent2Lambda average time per run: 138.1053415 ns
Test testPutIfAbsent average time per run: 129.45236425 ns
Test testComputeIfAbsent2Lambda average time per run: 128.48006825 ns
Test testPutIfAbsent average time per run: 118.733798375 ns
Test testComputeIfAbsent2Lambda average time per run: 134.038046625 ns
Test testPutIfAbsent average time per run: 119.7947695 ns
Test testComputeIfAbsent2Lambda average time per run: 134.183876375 ns
Test testPutIfAbsent average time per run: 137.969932625 ns
Test testComputeIfAbsent2Lambda average time per run: 137.97531275 ns
Test testPutIfAbsent average time per run: 136.904379125 ns
Test testComputeIfAbsent2Lambda average time per run: 148.899750125 ns
Test testPutIfAbsent average time per run: 129.788293125 ns
Test testComputeIfAbsent2Lambda average time per run: 141.50586625 ns
Test testPutIfAbsent average time per run: 129.081558875 ns
Test testComputeIfAbsent2Lambda average time per run: 122.36628625 ns
Test testPutIfAbsent average time per run: 127.1215535 ns
Test testComputeIfAbsent2Lambda average time per run: 108.129917625 ns
Test testPutIfAbsent average time per run: 133.630786875 ns
Test testComputeIfAbsent2Lambda average time per run: 134.978805625 ns
Test testPutIfAbsent average time per run: 132.7747585 ns
Test testComputeIfAbsent2Lambda average time per run: 132.4352885 ns
Test testPutIfAbsent average time per run: 133.753792875 ns
Test testComputeIfAbsent2Lambda average time per run: 134.09569175 ns
Test testPutIfAbsent average time per run: 145.610141125 ns
Test testComputeIfAbsent2Lambda average time per run: 139.437622125 ns

如果我们比较将映射放入的速度映射当密钥尚不存在时,它会出现新的'computeIfAbsent2'也快得多。基准:

If we compare the speed of putting mapping into the map when the key doesn't already exists it appears that the new 'computeIfAbsent2' is also much faster. The benchmark:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class CocnurrentHashMap2PutBenchmark {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testComputeIfAbsent2();
            testComputeIfAbsent();
        }
    }

    private static void testComputeIfAbsent2() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap2<Integer, String> map = new ConcurrentHashMap2<Integer, String>();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        Integer key = Integer.valueOf(n);
                        start = System.nanoTime();

                        String value = map.computeIfAbsent2(key, (k) -> "value");

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    private static void testComputeIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<Integer, String>();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        Integer key = Integer.valueOf(n);
                        start = System.nanoTime();

                        String value = map.computeIfAbsent(key, (k) -> "value");

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    public static class ConcurrentHashMap2<K, V> extends ConcurrentHashMap<K, V> {

        /**
         * If there is no mapping for the key then computes and puts the
         * mapping, otherwise it simply return the value for that key. In case
         * of concurrent initialization of the same key the mappingFunction can
         * be called more than once.
         *
         * @param key
         *            - the key to be initialized or retrieved
         * @param mappingFunction
         *            - the function to be called for computation of initial
         *            value.
         * @return computed value if the key wasn't already in the map otherwise
         *         return the actual value for provided key.
         */
        public V computeIfAbsent2(K key, Function<K, V> mappingFunction) {
            V value = get(key);
            if (value == null) {
                value = mappingFunction.apply(key);
                V prevValue = putIfAbsent(key, value);
                if (prevValue != null) {
                    value = prevValue;
                }
            }
            return value;
        }
    }
}

结果:

Test testComputeIfAbsent2 average time per run: 445.077932375 ns
Test testComputeIfAbsent average time per run: 784.786391 ns
Test testComputeIfAbsent2 average time per run: 294.10136375 ns
Test testComputeIfAbsent average time per run: 314.8724765 ns
Test testComputeIfAbsent2 average time per run: 236.56533275 ns
Test testComputeIfAbsent average time per run: 350.863664625 ns
Test testComputeIfAbsent2 average time per run: 346.19498275 ns
Test testComputeIfAbsent average time per run: 641.995172625 ns
Test testComputeIfAbsent2 average time per run: 255.441646125 ns
Test testComputeIfAbsent average time per run: 326.399150125 ns
Test testComputeIfAbsent2 average time per run: 275.626666125 ns
Test testComputeIfAbsent average time per run: 201.207314125 ns
Test testComputeIfAbsent2 average time per run: 289.19059725 ns
Test testComputeIfAbsent average time per run: 318.448059 ns
Test testComputeIfAbsent2 average time per run: 225.19701825 ns
Test testComputeIfAbsent average time per run: 306.461814125 ns
Test testComputeIfAbsent2 average time per run: 213.460366 ns
Test testComputeIfAbsent average time per run: 334.325044625 ns
Test testComputeIfAbsent2 average time per run: 256.4048955 ns
Test testComputeIfAbsent average time per run: 256.366700625 ns
Test testComputeIfAbsent2 average time per run: 231.88875575 ns
Test testComputeIfAbsent average time per run: 246.076624 ns
Test testComputeIfAbsent2 average time per run: 222.4649485 ns
Test testComputeIfAbsent average time per run: 266.505719625 ns
Test testComputeIfAbsent2 average time per run: 228.708391375 ns
Test testComputeIfAbsent average time per run: 261.866442625 ns
Test testComputeIfAbsent2 average time per run: 198.614718875 ns
Test testComputeIfAbsent average time per run: 225.43031925 ns
Test testComputeIfAbsent2 average time per run: 300.478359 ns
Test testComputeIfAbsent average time per run: 306.03640225 ns
Test testComputeIfAbsent2 average time per run: 195.0444215 ns
Test testComputeIfAbsent average time per run: 271.461982625 ns
Test testComputeIfAbsent2 average time per run: 224.306529875 ns
Test testComputeIfAbsent average time per run: 334.52790425 ns
Test testComputeIfAbsent2 average time per run: 212.217131625 ns
Test testComputeIfAbsent average time per run: 184.541579125 ns
Test testComputeIfAbsent2 average time per run: 265.417909625 ns
Test testComputeIfAbsent average time per run: 213.9811425 ns
Test testComputeIfAbsent2 average time per run: 298.76602575 ns
Test testComputeIfAbsent average time per run: 347.883728125 ns

这篇关于最快的方法来初始化ConcurrentHashMap的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-23 16:00