Java 中的并发工具类
CountDownLatch
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser1 finish!");
}
});
Thread parser2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("parser2 finish!");
}
});
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish!");
}
}
运行结果
parser1 finish!
parser2 finish!
all parser finish!
或
parser2 finish!
parser1 finish!
all parser finish!
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
private static CountDownLatch count = new CountDownLatch(2);
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
count.countDown();
System.out.println(2);
count.countDown();
}
});
t1.start();
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(3);
}
}
运行结果
1
2
3
CyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
});
t.start();
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2);
}
}
运行结果
1
2
或
2
1
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest2 {
private static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
});
t.start();
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2);
}
private static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
运行结果
3
1
2
或
3
2
1
import java.util.Map;
import java.util.concurrent.*;
public class BankWaterService implements Runnable {
private CyclicBarrier c = new CyclicBarrier(4, this);
private ExecutorService executor = Executors.newFixedThreadPool(4);
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
}
@Override
public void run() {
int result = 0;
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result += sheet.getValue();
}
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService bankWaterCount = new BankWaterService();
bankWaterCount.count();
}
}
运行结果
4
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest3 {
private static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
}
});
t.start();
Thread.sleep(1000);
System.out.println(c.getNumberWaiting());
t.interrupt();
try {
c.await();
} catch (Exception e) {
System.out.println(c.isBroken());
}
}
}
运行结果
1
true
Semaphore
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data!");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
}
}
Exchanger
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<>();
private static ExecutorService executor = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
executor.execute(new Runnable() {
@Override
public void run() {
String A = "银行流水 A";
try {
exgr.exchange(A);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
String B = "银行流水 B";
try {
String A = exgr.exchange(B);
System.out.println("A 和 B 数据是否一致:" + A.equals(B) + ",\nA 录入的是:" + A + ",\nB 录入的是:" + B + "。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executor.shutdown();
}
}
运行结果
A 和 B 数据是否一致:false,
A 录入的是:银行流水 A,
B 录入的是:银行流水 B。