?!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
概要
Java的JUC(java.util.concurrent)包中的锁包括"独占?quot;?quot;׃n?quot;。在“Java多线E系?-“JUC锁?2?互斥锁ReentrantLock ”中Q对Java的独占锁q行了说明。本章对Java的“共享锁”进行介l,JUC中的׃n锁有CountDownLatch, CyclicBarrier, Semaphore, ReentrantReadWriteLock{;本章会以ReentrantReadWriteLock本对׃n锁进行说明。内容包括:
ReadWriteLock ?ReentrantReadWriteLock介绍
ReadWriteLock ?ReentrantReadWriteLock函数列表
参考代?ZJDK1.7.0_40)
获取׃n?/p>
释放׃n?/p>
公^׃n锁和非公q_享锁
ReentrantReadWriteLockCZ
ReadWriteLock ?ReentrantReadWriteLock介绍
ReadWriteLockQ顾名思义Q是d锁。它l护了一对相关的???“读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作?/p>
“读取锁”用于只L作,它是“共享锁”,能同时被多个U程获取?/p>
“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线E锁获取?/p>
注意Q不能同时存在读取锁和写入锁Q?/p>
ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类QReentrantReadWriteLock包括子类ReadLock和WriteLock?/p>
ReadWriteLock ?ReentrantReadWriteLock函数列表
ReadWriteLock函数列表
// q回用于d操作的锁?/p>
Lock readLock()
// q回用于写入操作的锁?/p>
Lock writeLock()
ReentrantReadWriteLock函数列表
复制代码
// 创徏一个新?ReentrantReadWriteLockQ默认是采用“非公^{略”?/p>
ReentrantReadWriteLock()
// 创徏一个新?ReentrantReadWriteLockQfair是“公q策略”。fair为trueQ意味着公^{略Q否则,意味着非公q策略?/p>
ReentrantReadWriteLock(boolean fair)
// q回当前拥有写入锁的U程Q如果没有这LU程Q则q回 null?/p>
protected Thread getOwner()
// q回一?collectionQ它包含可能正在{待获取d锁的U程?/p>
protected Collection<Thread> getQueuedReaderThreads()
// q回一?collectionQ它包含可能正在{待获取d或写入锁的线E?/p>
protected Collection<Thread> getQueuedThreads()
// q回一?collectionQ它包含可能正在{待获取写入锁的U程?/p>
protected Collection<Thread> getQueuedWriterThreads()
// q回{待获取d或写入锁的线E估计数目?/p>
int getQueueLength()
// 查询当前U程在此锁上保持的重入读取锁数量?/p>
int getReadHoldCount()
// 查询为此锁保持的d锁数量?/p>
int getReadLockCount()
// q回一?collectionQ它包含可能正在{待与写入锁相关的给定条件的那些U程?/p>
protected Collection<Thread> getWaitingThreads(Condition condition)
// q回正等待与写入锁相关的l定条g的线E估计数目?/p>
int getWaitQueueLength(Condition condition)
// 查询当前U程在此锁上保持的重入写入锁数量?/p>
int getWriteHoldCount()
// 查询是否l定U程正在{待获取d或写入锁?/p>
boolean hasQueuedThread(Thread thread)
// 查询是否所有的U程正在{待获取d或写入锁?/p>
boolean hasQueuedThreads()
// 查询是否有些U程正在{待与写入锁有关的给定条件?/p>
boolean hasWaiters(Condition condition)
// 如果此锁公qx设|ؓ tureQ则q回 true?/p>
boolean isFair()
// 查询是否某个U程保持了写入锁?/p>
boolean isWriteLocked()
// 查询当前U程是否保持了写入锁?/p>
boolean isWriteLockedByCurrentThread()
// q回用于d操作的锁?/p>
ReentrantReadWriteLock.ReadLock readLock()
// q回用于写入操作的锁?/p>
ReentrantReadWriteLock.WriteLock writeLock()
复制代码
参考代?ZJDK1.7.0_40)
ReentrantReadWriteLock的完?a target="_blank" style="margin: 0px; padding: 0px; color: rgb(51, 51, 51); text-decoration: none;">源码
View Code
AQS的完整源?/p>
View Code
其中Q共享锁源码相关的代码如下:
复制代码
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
// ReentrantReadWriteLock的AQS对象
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取“共享锁?/p>
public void lock() {
sync.acquireShared(1);
}
// 如果U程是中断状态,则抛Z场,否则试获取׃n锁?/p>
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 试获取“共享锁?/p>
public boolean tryLock() {
return sync.tryReadLock();
}
// 在指定时间内Q尝试获取“共享锁?/p>
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 释放“共享锁?/p>
public void unlock() {
sync.releaseShared(1);
}
// 新徏条g
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
复制代码
说明Q?/p>
ReadLock中的sync是一个Sync对象QSyncl承于AQSc,即Sync是一个锁。ReentrantReadWriteLock中也有一个Sync对象Q而且ReadLock中的sync和ReentrantReadWriteLock中的sync是对应关pR即ReentrantReadWriteLock和ReadLock׃n同一个AQS对象Q共享同一把锁?/p>
ReentrantReadWriteLock中Sync的定义如下:
final Sync sync;
下面Q分别从“获取共享锁”和“释攑օ享锁”两个方面对׃n锁进行说明?/p>
获取׃n?/p>
获取׃n锁的思想(即lock函数的步?Q是先通过tryAcquireShared()试获取׃n锁。尝试成功的话,则直接返回;试p|的话Q则通过doAcquireShared()不断的@环ƈ试获取锁,若有需要,则阻塞等待。doAcquireShared()在@环中每次试获取锁时Q都是通过tryAcquireShared()来进行尝试的。下面看看“获取共享锁”的详细程?/p>
1. lock()
lock()在ReadLock中,源码如下Q?/p>
public void lock() {
sync.acquireShared(1);
}
2. acquireShared()
Syncl承于AQSQacquireShared()定义在AQS中。源码如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
说明QacquireShared()首先会通过tryAcquireShared()来尝试获取锁?/p>
试成功的话Q则不再做Q何动?因ؓ已经成功获取到锁??/p>
试p|的话Q则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁了才返回?/p>
3. tryAcquireShared()
tryAcquireShared()定义在ReentrantReadWriteLock.java的Sync中,源码如下Q?/p>
复制代码
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取“锁”的状?/p>
int c = getState();
// 如果“锁”是“互斥锁”,q且获取锁的U程不是currentU程Q则q回-1?/p>
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取“读取锁”的׃n计数
int r = sharedCount(c);
// 如果“不需要阻塞等待”,q且“读取锁”的׃n计数于MAX_COUNTQ?/p>
// 则通过CAS函数更新“锁的状态”,“读取锁”的׃n计数+1?/p>
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// W?ơ获取“读取锁”?/p>
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果惌获取锁的U程(current)是第1个获取锁(firstReader)的线E?/p>
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// HoldCounter是用来统计该U程获取“读取锁”的ơ数?/p>
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 该U程获取“读取锁”的ơ数+1?/p>
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
复制代码
说明QtryAcquireShared()的作用是试获取“共享锁”?/p>
如果在尝试获取锁Ӟ“不需要阻塞等待”ƈ且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及“当前线E获取读取锁的次?1”?/p>
否则Q通过fullTryAcquireShared()获取d锁?/p>
4. fullTryAcquireShared()
fullTryAcquireShared()在ReentrantReadWriteLock中定义,源码如下Q?/p>
复制代码
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 获取“锁”的状?/p>
int c = getState();
// 如果“锁”是“互斥锁”,q且获取锁的U程不是currentU程Q则q回-1?/p>
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 如果“需要阻塞等待”?/p>
// (01) 当“需要阻塞等待”的U程是第1个获取锁的线E的话,则l往下执行?/p>
// (02) 当“需要阻塞等待”的U程获取锁的ơ数=0Ӟ则返?1?/p>
} else if (readerShouldBlock()) {
// 如果惌获取锁的U程(current)是第1个获取锁(firstReader)的线E?/p>
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 如果当前U程获取锁的计数=0,则返?1?/p>
if (rh.count == 0)
return -1;
}
}
// 如果“不需要阻塞等待”,则获取“读取锁”的׃nl计敎ͼ
// 如果׃nl计数超qMAX_COUNTQ则抛出异常?/p>
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 线E获取“读取锁”的ơ数+1?/p>
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第1ơ获取“读取锁”,则更新firstReader和firstReaderHoldCount?/p>
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果惌获取锁的U程(current)是第1个获取锁(firstReader)的线E,
// 则将firstReaderHoldCount+1?/p>
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 更新U程的获取“读取锁”的׃n计数
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
复制代码
说明QfullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超q限制”等{进行处理。如果不需要阻塞等待,q且锁的׃n计数没有过限制Q则通过CAS试获取锁,q返??/p>
5. doAcquireShared()
doAcquireShared()定义在AQS函数中,源码如下Q?/p>
复制代码
private void doAcquireShared(int arg) {
// addWaiter(Node.SHARED)的作用是Q创建“当前线E”对应的节点Qƈ该U程d到CLH队列中?/p>
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取“node”的前一节点
final Node p = node.predecessor();
// 如果“当前线E”是CLH队列的表_则尝试获取共享锁?/p>
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果“当前线E”不是CLH队列的表_则通过shouldParkAfterFailedAcquire()判断是否需要等待,
// 需要的话,则通过parkAndCheckInterrupt()q行d{待。若d{待q程中,U程被中断过Q则讄interrupted为true?/p>
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
说明QdoAcquireShared()的作用是获取׃n锁?/p>
它会首先创徏U程对应的CLH队列的节点,然后该节点d到CLH队列中。CLH队列是管理获取锁的等待线E的队列?/p>
如果“当前线E”是CLH队列的表_则尝试获取共享锁Q否则,则需要通过shouldParkAfterFailedAcquire()判断是否d{待Q需要的话,则通过parkAndCheckInterrupt()q行d{待?/p>
doAcquireShared()会通过for循环Q不断的q行上面的操作;目的是获取׃n锁。需要注意的是:doAcquireShared()在每一ơ尝试获取锁Ӟ是通过tryAcquireShared()来执行的Q?/p>
若读者对CLH队列QshouldParkAfterFailedAcquire(), parkAndCheckInterrupt(){内容的l节感兴,可以参考?a target="_blank" style="margin: 0px; padding: 0px; color: rgb(51, 51, 51); text-decoration: none;">Java多线E系?-“JUC锁?2?互斥锁ReentrantLock”?/p>
释放׃n?/p>
释放׃n锁的思想Q是先通过tryReleaseShared()试释放׃n锁。尝试成功的话,则通过doReleaseShared()唤醒“其他等待获取共享锁的线E”,q返回trueQ否则的话,q回flase?/p>
1. unlock()
public void unlock() {
sync.releaseShared(1);
}
说明Q该函数实际上调用releaseShared(1)释放׃n锁?/p>
2. releaseShared()
releaseShared()在AQS中实玎ͼ源码如下Q?/p>
复制代码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
说明QreleaseShared()的目的是让当前线E释攑֮所持有的共享锁?/p>
它首先会通过tryReleaseShared()d试释攑օ享锁。尝试成功,则直接返回;试p|Q则通过doReleaseShared()去释攑օ享锁?/p>
3. tryReleaseShared()
tryReleaseShared()定义在ReentrantReadWriteLock中,源码如下Q?/p>
复制代码
protected final boolean tryReleaseShared(int unused) {
// 获取当前U程Q即释放׃n锁的U程?/p>
Thread current = Thread.currentThread();
// 如果惌释放锁的U程(current)是第1个获取锁(firstReader)的线E,
// q且“第1个获取锁的线E获取锁的次数?1Q则讄firstReader为nullQ?/p>
// 否则Q将“第1个获取锁的线E的获取ơ数?1?/p>
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
// 获取rh对象Qƈ更新“当前线E获取锁的信息”?/p>
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 获取锁的状?/p>
int c = getState();
// 锁的获取次?1?/p>
int nextc = c - SHARED_UNIT;
// 通过CAS更新锁的状态?/p>
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
说明QtryReleaseShared()的作用是试释放׃n锁?/p>
4. doReleaseShared()
doReleaseShared()定义在AQS中,源码如下Q?/p>
复制代码
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为nullQƈ且头节点不等于tail节点?/p>
if (h != null && h != tail) {
// 获取头节点对应的U程的状?/p>
int ws = h.waitStatus;
// 如果头节点对应的U程是SIGNAL状态,则意味着“头节点的下一个节Ҏ对应的线E”需要被unpark唤醒?/p>
if (ws == Node.SIGNAL) {
// 讄“头节点对应的线E状态”ؓI状态。失败的话,则l@环?/p>
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节Ҏ对应的线E”?/p>
unparkSuccessor(h);
}
// 如果头节点对应的U程是空状态,则设|“文件点对应的线E所拥有的共享锁”ؓ其它U程获取锁的I状态?/p>
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则l@环。否则,退出@环?/p>
if (h == head) // loop if head changed
break;
}
}
复制代码
说明QdoReleaseShared()会释䏀共享锁”。它会从前往后的遍历CLH队列Q依ơ“唤醒”然后“执行”队列中每个节点对应的线E;最l的目的是让q些U程释放它们所持有的锁?/p>
公^׃n锁和非公q_享锁
和互斥锁ReentrantLock一PReadLock也分为公q锁和非公^锁?/p>
公^锁和非公q锁的区别,体现在判断是否需要阻塞的函数readerShouldBlock()是不同的?/p>
公^锁的readerShouldBlock()的源码如下:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
在公q_享锁中,如果在当前线E的前面有其他线E在{待获取׃n锁,则返回trueQ否则,q回false?/p>
非公q锁的readerShouldBlock()的源码如下:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
在非公^׃n锁中Q它会无视当前线E的前面是否有其他线E在{待获取׃n锁。只要该非公q_享锁对应的线E不为nullQ则q回true?/p>
ReentrantReadWriteLockCZ
复制代码
1 import java.util.concurrent.locks.ReadWriteLock;
2 import java.util.concurrent.locks.ReentrantReadWriteLock;
3
4 public class ReadWriteLockTest1 {
5
6 public static void main(String[] args) {
7 // 创徏账户
8 MyCount myCount = new MyCount("4238920615242830", 10000);
9 // 创徏用户Qƈ指定账户
10 User user = new User("Tommy", myCount);
11
12 // 分别启动3个“读取̎户金钱”的U程 ?3个“设|̎户金钱”的U程
13 for (int i=0; i<3; i++) {
14 user.getCash();
15 user.setCash((i+1)*1000);
16 }
17 }
18 }
19
20 class User {
21 private String name; //用户?nbsp;
22 private MyCount myCount; //所要操作的账户
23 private ReadWriteLock myLock; //执行操作所需的锁对象
24
25 User(String name, MyCount myCount) {
26 this.name = name;
27 this.myCount = myCount;
28 this.myLock = new ReentrantReadWriteLock();
29 }
30
31 public void getCash() {
32 new Thread() {
33 public void run() {
34 myLock.readLock().lock();
35 try {
36 System.out.println(Thread.currentThread().getName() +" getCash start");
37 myCount.getCash();
38 Thread.sleep(1);
39 System.out.println(Thread.currentThread().getName() +" getCash end");
40 } catch (InterruptedException e) {
41 } finally {
42 myLock.readLock().unlock();
43 }
44 }
45 }.start();
46 }
47
48 public void setCash(final int cash) {
49 new Thread() {
50 public void run() {
51 myLock.writeLock().lock();
52 try {
53 System.out.println(Thread.currentThread().getName() +" setCash start");
54 myCount.setCash(cash);
55 Thread.sleep(1);
56 System.out.println(Thread.currentThread().getName() +" setCash end");
57 } catch (InterruptedException e) {
58 } finally {
59 myLock.writeLock().unlock();
60 }
61 }
62 }.start();
63 }
64 }
65
66 class MyCount {
67 private String id; //账号
68 private int cash; //账户余额
69
70 MyCount(String id, int cash) {
71 this.id = id;
72 this.cash = cash;
73 }
74
75 public String getId() {
76 return id;
77 }
78
79 public void setId(String id) {
80 this.id = id;
81 }
82
83 public int getCash() {
84 System.out.println(Thread.currentThread().getName() +" getCash cash="+ cash);
85 return cash;
86 }
87
88 public void setCash(int cash) {
89 System.out.println(Thread.currentThread().getName() +" setCash cash="+ cash);
90 this.cash = cash;
91 }
92 }
复制代码
q行l果Q?/p>
复制代码
Thread-0 getCash start
Thread-2 getCash start
Thread-0 getCash cash=10000
Thread-2 getCash cash=10000
Thread-0 getCash end
Thread-2 getCash end
Thread-1 setCash start
Thread-1 setCash cash=1000
Thread-1 setCash end
Thread-3 setCash start
Thread-3 setCash cash=2000
Thread-3 setCash end
Thread-4 getCash start
Thread-4 getCash cash=2000
Thread-4 getCash end
Thread-5 setCash start
Thread-5 setCash cash=3000
Thread-5 setCash end
复制代码
l果说明Q?/p>
(01) 观察Thread0和Thread-2的运行结果,我们发现QThread-0启动q获取到“读取锁”,在它q没q行完毕的时候,Thread-2也启动了q且也成功获取到“读取锁”?/p>
因此Q“读取锁”支持被多个U程同时获取?/p>
(02) 观察Thread-1,Thread-3,Thread-5q三个“写入锁”的U程。只要“写入锁”被某线E获取,则该U程q行完毕了,才释放该锁?/p>
因此Q“写入锁”不支持被多个线E同时获取?/p>