AbstractQueuedSynchronizer

AbstractQueuedSynchronizer

我正在寻找以下并发语义的Java实现。除了对称性外,我想要类似于ReadWriteLock的东西,即读和写双方都可以在许多线程之间共享,但读不包括写,反之亦然。

  • 有两个锁,我们称它们为A和B。
  • 锁A是共享的,即可能有多个线程同时持有它。锁B也是共享的,可能有多个线程同时持有它。
  • 如果任何线程持有锁A,则没有线程可以占用B –尝试占用B的线程将阻塞,直到持有A的所有线程都释放A。
  • 如果任何线程持有锁B,则没有线程可以占用A –尝试占用A的线程将阻塞,直到持有B的所有线程都释放了B。

    是否有一个现有的图书馆类(class)可以实现这一目标?目前,我已经用ReadWriteLock近似了所需的功能,因为幸运的是,在锁B的上下文中完成的任务要少一些。但是,这感觉像是一种骇客,它可能会影响我的程序在高负载下的性能。

    最佳答案

    简短答案:

    在标准库中,没有任何您需要的东西。

    长答案:

    为了轻松实现自定义Lock,您应该将其子类化或委托(delegate)给AbstractQueuedSynchronizer

    以下代码是non-fair锁的示例,该锁实现了您所需的功能,包括一些(不详尽的)测试。由于您的需求具有二进制性质,因此我将其称为 LeftRightLock

    这个概念非常简单:
    AbstractQueuedSynchronizer公开了一种方法,使用Compare and swap惯用语(compareAndSetState(int expect, int update))来自动设置int的状态,我们可以使用公开状态来保持持有锁的线程数,如果持有Right锁,则将其设置为正值如果持有Left锁,则为负值。

    比我们只需确保满足以下条件:
    -仅当内部Left的状态为AbstractQueuedSynchronizer或否定时,您才可以锁定zero-仅当内部Right的状态为AbstractQueuedSynchronizer或正数时,您才可以锁定zero
    LeftRightLock.java

    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    import java.util.concurrent.locks.Lock;
    
    /**
     * A binary mutex with the following properties:
     *
     * Exposes two different {@link Lock}s: LEFT, RIGHT.
     *
     * When LEFT is held other threads can acquire LEFT but thread trying to acquire RIGHT will be
     * blocked. When RIGHT is held other threads can acquire RIGHT but thread trying to acquire LEFT
     * will be blocked.
     */
    public class LeftRightLock {
    
        public static final int ACQUISITION_FAILED = -1;
        public static final int ACQUISITION_SUCCEEDED = 1;
    
        private final LeftRightSync sync = new LeftRightSync();
    
        public void lockLeft() {
            sync.acquireShared(LockSide.LEFT.getV());
        }
    
        public void lockRight() {
            sync.acquireShared(LockSide.RIGHT.getV());
        }
    
        public void releaseLeft() {
            sync.releaseShared(LockSide.LEFT.getV());
        }
    
        public void releaseRight() {
            sync.releaseShared(LockSide.RIGHT.getV());
        }
    
        public boolean tryLockLeft() {
            return sync.tryAcquireShared(LockSide.LEFT) == ACQUISITION_SUCCEEDED;
        }
    
        public boolean tryLockRight() {
            return sync.tryAcquireShared(LockSide.RIGHT) == ACQUISITION_SUCCEEDED;
        }
    
        private enum LockSide {
            LEFT(-1), NONE(0), RIGHT(1);
    
            private final int v;
    
            LockSide(int v) {
                this.v = v;
            }
    
            public int getV() {
                return v;
            }
        }
    
        /**
         * <p>
         * Keep count the count of threads holding either the LEFT or the RIGHT lock.
         * </p>
         *
         * <li>A state ({@link AbstractQueuedSynchronizer#getState()}) greater than 0 means one or more threads are holding RIGHT lock. </li>
         * <li>A state ({@link AbstractQueuedSynchronizer#getState()}) lower than 0 means one or more threads are holding LEFT lock.</li>
         * <li>A state ({@link AbstractQueuedSynchronizer#getState()}) equal to zero means no thread is holding any lock.</li>
         */
        private static final class LeftRightSync extends AbstractQueuedSynchronizer {
    
    
            @Override
            protected int tryAcquireShared(int requiredSide) {
                return (tryChangeThreadCountHoldingCurrentLock(requiredSide, ChangeType.ADD) ? ACQUISITION_SUCCEEDED : ACQUISITION_FAILED);
            }
    
            @Override
            protected boolean tryReleaseShared(int requiredSide) {
                return tryChangeThreadCountHoldingCurrentLock(requiredSide, ChangeType.REMOVE);
            }
    
            public boolean tryChangeThreadCountHoldingCurrentLock(int requiredSide, ChangeType changeType) {
                if (requiredSide != 1 && requiredSide != -1)
                    throw new AssertionError("You can either lock LEFT or RIGHT (-1 or +1)");
    
                int curState;
                int newState;
                do {
                    curState = this.getState();
                    if (!sameSide(curState, requiredSide)) {
                        return false;
                    }
    
                    if (changeType == ChangeType.ADD) {
                        newState = curState + requiredSide;
                    } else {
                        newState = curState - requiredSide;
                    }
                    //TODO: protect against int overflow (hopefully you won't have so many threads)
                } while (!this.compareAndSetState(curState, newState));
                return true;
            }
    
            final int tryAcquireShared(LockSide lockSide) {
                return this.tryAcquireShared(lockSide.getV());
            }
    
            final boolean tryReleaseShared(LockSide lockSide) {
                return this.tryReleaseShared(lockSide.getV());
            }
    
            private boolean sameSide(int curState, int requiredSide) {
                return curState == 0 || sameSign(curState, requiredSide);
            }
    
            private boolean sameSign(int a, int b) {
                return (a >= 0) ^ (b < 0);
            }
    
            public enum ChangeType {
                ADD, REMOVE
            }
        }
    }
    

    LeftRightLockTest.java
    import org.junit.Test;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import static org.junit.Assert.assertFalse;
    import static org.junit.Assert.assertTrue;
    
    public class LeftRightLockTest {
    
    
        int logLineSequenceNumber = 0;
        private LeftRightLock sut = new LeftRightLock();
    
        @Test(timeout = 2000)
        public void acquiringLeftLockExcludeAcquiringRightLock() throws Exception {
            sut.lockLeft();
    
    
            Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockRight());
            assertFalse("I shouldn't be able to acquire the RIGHT lock!", task.get());
        }
    
        @Test(timeout = 2000)
        public void acquiringRightLockExcludeAcquiringLeftLock() throws Exception {
            sut.lockRight();
            Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockLeft());
            assertFalse("I shouldn't be able to acquire the LEFT lock!", task.get());
        }
    
        @Test(timeout = 2000)
        public void theLockShouldBeReentrant() throws Exception {
            sut.lockLeft();
            assertTrue(sut.tryLockLeft());
        }
    
        @Test(timeout = 2000)
        public void multipleThreadShouldBeAbleToAcquireTheSameLock_Right() throws Exception {
            sut.lockRight();
            Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockRight());
            assertTrue(task.get());
        }
    
        @Test(timeout = 2000)
        public void multipleThreadShouldBeAbleToAcquireTheSameLock_left() throws Exception {
            sut.lockLeft();
            Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> sut.tryLockLeft());
            assertTrue(task.get());
        }
    
        @Test(timeout = 2000)
        public void shouldKeepCountOfAllTheThreadsHoldingTheSide() throws Exception {
    
            CountDownLatch latchA = new CountDownLatch(1);
            CountDownLatch latchB = new CountDownLatch(1);
    
    
            Thread threadA = spawnThreadToAcquireLeftLock(latchA, sut);
            Thread threadB = spawnThreadToAcquireLeftLock(latchB, sut);
    
            System.out.println("Both threads have acquired the left lock.");
    
            try {
                latchA.countDown();
                threadA.join();
                boolean acqStatus = sut.tryLockRight();
                System.out.println("The right lock was " + (acqStatus ? "" : "not") + " acquired");
                assertFalse("There is still a thread holding the left lock. This shouldn't succeed.", acqStatus);
            } finally {
                latchB.countDown();
                threadB.join();
            }
    
        }
    
        @Test(timeout = 5000)
        public void shouldBlockThreadsTryingToAcquireLeftIfRightIsHeld() throws Exception {
            sut.lockLeft();
    
            CountDownLatch taskStartedLatch = new CountDownLatch(1);
    
            final Future<Boolean> task = Executors.newSingleThreadExecutor().submit(() -> {
                taskStartedLatch.countDown();
                sut.lockRight();
                return false;
            });
    
            taskStartedLatch.await();
            Thread.sleep(100);
    
            assertFalse(task.isDone());
        }
    
        @Test
        public void shouldBeFreeAfterRelease() throws Exception {
            sut.lockLeft();
            sut.releaseLeft();
            assertTrue(sut.tryLockRight());
        }
    
        @Test
        public void shouldBeFreeAfterMultipleThreadsReleaseIt() throws Exception {
            CountDownLatch latch = new CountDownLatch(1);
    
            final Thread thread1 = spawnThreadToAcquireLeftLock(latch, sut);
            final Thread thread2 = spawnThreadToAcquireLeftLock(latch, sut);
    
            latch.countDown();
    
            thread1.join();
            thread2.join();
    
            assertTrue(sut.tryLockRight());
    
        }
    
        @Test(timeout = 2000)
        public void lockShouldBeReleasedIfNoThreadIsHoldingIt() throws Exception {
            CountDownLatch releaseLeftLatch = new CountDownLatch(1);
            CountDownLatch rightLockTaskIsRunning = new CountDownLatch(1);
    
            Thread leftLockThread1 = spawnThreadToAcquireLeftLock(releaseLeftLatch, sut);
            Thread leftLockThread2 = spawnThreadToAcquireLeftLock(releaseLeftLatch, sut);
    
            Future<Boolean> acquireRightLockTask = Executors.newSingleThreadExecutor().submit(() -> {
                if (sut.tryLockRight())
                    throw new AssertionError("The left lock should be still held, I shouldn't be able to acquire right a this point.");
                printSynchronously("Going to be blocked on right lock");
                rightLockTaskIsRunning.countDown();
                sut.lockRight();
                printSynchronously("Lock acquired!");
                return true;
            });
    
            rightLockTaskIsRunning.await();
    
            releaseLeftLatch.countDown();
            leftLockThread1.join();
            leftLockThread2.join();
    
            assertTrue(acquireRightLockTask.get());
        }
    
        private synchronized void printSynchronously(String str) {
    
            System.out.println(logLineSequenceNumber++ + ")" + str);
            System.out.flush();
        }
    
        private Thread spawnThreadToAcquireLeftLock(CountDownLatch releaseLockLatch, LeftRightLock lock) throws InterruptedException {
            CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
            final Thread thread = spawnThreadToAcquireLeftLock(releaseLockLatch, lockAcquiredLatch, lock);
            lockAcquiredLatch.await();
            return thread;
        }
    
        private Thread spawnThreadToAcquireLeftLock(CountDownLatch releaseLockLatch, CountDownLatch lockAcquiredLatch, LeftRightLock lock) {
            final Thread thread = new Thread(() -> {
                lock.lockLeft();
                printSynchronously("Thread " + Thread.currentThread() + " Acquired left lock");
                try {
                    lockAcquiredLatch.countDown();
                    releaseLockLatch.await();
                } catch (InterruptedException ignore) {
                } finally {
                    lock.releaseLeft();
                }
    
                printSynchronously("Thread " + Thread.currentThread() + " RELEASED left lock");
            });
            thread.start();
            return thread;
        }
    }
    

  • 10-06 01:17