Written by
Yitong Huang
on
on
Java高并发 - 17.读写锁分离设计模式
在多线程的情况下访问共享资源,需要对资源进行同步操作以防止数据不一致的情况发生,通常可以使用synchronized关键字或者显式锁。
对资源的访问一般包括两种类型的动作——读和写。多个线程同时对资源进行读操作并不会引起数据不一致的问题,这时直接采用排他的方式加锁,就显得过于简单粗暴。可以通过实现一个读和写分离的锁提升程序性能。
1. 接口定义
Lock接口
public interface Lock {
// 获取显式锁,没有获得锁的线程将被阻塞
void lock() throws InterruptedException;
// 释放获取的锁
void unlock();
}
ReadWriteLock接口
public interface ReadWriteLock {
// 创建reader锁
Lock readLock();
// 创建write锁
Lock writeLock();
// 获取当前有多少线程正在执行写操作
int getWritingWriters();
// 获取当前有多少线程正在等待获取写入锁
int getWaitingWriters();
// 获取当前有多少线程正在等待获取reader锁
int getReadingReaders();
// 工厂方法,创建ReadWriteLock
static ReadWriteLock readWriteLock() {
return new ReadWriteLockImpl();
}
// 工厂方法,创建ReadWriteLock,并且传入preferWriter
static ReadWriteLock readWriteLock(boolean preferWriter) {
return new ReadWriteLockImpl(preferWriter);
}
}
ReadWriteLock虽然名字有Lock,但它并不是Lock,主要用于创建read lock和write lock,并且提供了查询功能用于查询当前有多少个reader和writer以及waiting中的writer。根据前面的分析,如果reader的个数大于0,则意味着writer的个数等于0,反之writer的个数大于0(事实上只能是1),则reader的个数等于0。
2. 程序实现
ReadWriteLockImpl像是一个工厂类,可以通过它创建不同类型的锁。
class ReadWriteLockImpl implements ReadWriteLock {
// 定义对象锁
private final Object MUTEX = new Object();
// 当前有多少个线程正在写
private int writingWriters = 0;
// 当前有多少个线程正在等待写入
private int waitingWriters = 0;
// 当前有多少个线程正在读
private int readingReaders = 0;
// read和write的偏好设置
private boolean preferWriter;
// 构造ReadWriteLockImpl并且传入preferWriter
public ReadWriteLockImpl(boolean preferWriter) {
this.preferWriter = preferWriter;
}
// 创建read lock
public Lock readLock() {
return new ReadLock(this);
}
// 创建write lock
public Lock writeLock() {
return new WriteLock(this);
}
// 使写的线程数量增加
void incrementWritingWriters() {
this.writingWriters++;
}
// 使等待写的线程数量增加
void incrementWaitingWriters() {
this.waitingWriters++;
}
// 使读的线程数量增加
void incrementReadingReaders() {
this.readingReaders++;
}
// 使写的线程数量减少
void decrementWritingWriters() {
this.writingWriters--;
}
// 使等待写的线程数量减少
void decrementWaitingWriters() {
this.waitingWriters--;
}
// 使读的线程数量减少
void decrementReadingReaders() {
this.readingReaders--;
}
// 获取当前有多少个线程正在进行写操作
public int getWritingWriters() {
return this.writingWriters;
}
// 获取当前有多少个线程正在等待获取写入锁
public int getWaitingWriters() {
return this.waitingWriters;
}
// 获取当前有多少个线程正在进行读操作
public int getReadingReaders() {
return this.readingReaders;
}
// 获取对象锁
Object getMutex() {
return this.MUTEX;
}
// 获取当前是否偏向写锁
boolean getPreferWriter() {
return this.preferWriter;
}
// 设置写锁偏好
void changePrefer(boolean preferWriter) {
this.preferWriter = preferWriter;
}
}
ReadLock读锁
class ReadLock implements Lock {
private final ReadWriteLockImpl readWriteLock;
ReadLock(ReadWriteLockImpl readWriteLock) {
this.readWriteLock = readWriteLock;
}
@Override
public void lock() throws InterruptedException {
// 使用Mutex作为锁
synchronized (readWriteLock.getMutex()) {
// 若此时有线程在进行写操作,或者有写线程在等待并且偏向写时,就会无法获得读锁,只能被挂起
while (readWriteLock.getWritingWriters() > 0
|| (readWriteLock.getPreferWriter()
&& readWriteLock.getWaitingWriters() > 0)) {
readWriteLock.getMutex().wait();
}
readWriteLock.incrementReadingReaders();
}
}
@Override
public void unlock() {
synchronized (readWriteLock.getMutex()) {
// 释放锁的过程就是使得当前reading的数量减一
readWriteLock.decrementReadingReaders();
// 将preferWriter设置为true,可以使得writer线程获得更多机会
readWriteLock.changePrefer(true);
// 通知唤醒与Mutex关联monitor waitset中的线程
readWriteLock.getMutex().notifyAll();
}
}
}
WriteLock写锁
class WriteLock implements Lock {
private final ReadWriteLockImpl readWriteLock;
WriteLock(ReadWriteLockImpl readWriteLock) {
this.readWriteLock = readWriteLock;
}
@Override
public void lock() throws InterruptedException {
synchronized (readWriteLock.getMutex()) {
try {
// 首先使等待获取写入锁的数量加一
readWriteLock.incrementWaitingWriters();
// 如果此时有其他线程正在进行读操作或者写操作,当前线程将被挂起
while (readWriteLock.getReadingReaders() > 0
|| readWriteLock.getWritingWriters() > 0) {
readWriteLock.getMutex().wait();
}
} finally {
// 成功获取到写入锁,使得等待获取写入锁的计数器减一
this.readWriteLock.decrementWaitingWriters();
}
// 将正在写入的线程数量加一
readWriteLock.incrementWritingWriters();
}
}
@Override
public void unlock() {
synchronized (readWriteLock.getMutex()) {
// 减少正在写入锁的线程计数器
readWriteLock.decrementWritingWriters();
// 将偏好状态修改为false,可以使得读锁能被快速获得
readWriteLock.changePrefer(false);
// 通知唤醒其他在Mutex monitor waitset中的线程
readWriteLock.getMutex().notifyAll();
}
}
}
3. 读写锁的使用
public class ShareData {
// 定义共享数据(资源)
private final List<Character> container = new ArrayList<>();
// 构造ReadWriteLock
private final ReadWriteLock readWriteLock = ReadWriteLock.readWriteLock();
// 创建读锁
private final Lock readLock = readWriteLock.readLock();
// 创建写锁
private final Lock writeLock = readWriteLock.writeLock();
private final int length;
public ShareData(int length) {
this.length = length;
for (int i=0; i<length; i++) {
container.add(i, 'c');
}
}
public char[] read() throws InterruptedException {
try {
// 首先锁住读锁
readLock.lock();
char[] newBuffer = new char[length];
for (int i=0; i<length; i++) {
newBuffer[i] = container.get(i);
}
slowly();
return newBuffer;
} finally {
// 当操作结束后,将锁释放
readLock.unlock();
}
}
public void write(char c) throws InterruptedException {
try {
// 首先锁住写锁
writeLock.lock();
for (int i=0; i<length; i++) {
this.container.add(i, c);
}
slowly();
} finally {
writeLock.unlock();
}
}
// 模拟耗时操作
public void slowly() {
try {
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
ShareData中涉及对数据的读写操作,因此它是需要进行线程同步控制。首先创建一个ReadWriteLock工厂类,然后用该工厂分别创建ReadLock和WriteLock的实例,在read方法中使用ReadLock对其进行加锁,而在write方法中则使用WriteLock。
public class ReadWriteLockTest {
private final static String text = "This is the example of ReadWriteLock";
public static void main(String[] args) {
// 定义共享数据
final ShareData shareData = new ShareData(50);
// 创建两个线程进行数据写操作
for (int i=0; i<2; i++) {
new Thread(() -> {
for (int index = 0; index < text.length(); index++) {
try {
char c = text.charAt(index);
shareData.write(c);
System.out.println(currentThread() + " write " + c);
} catch(InterruptedException e) {
}
}
}).start();
}
for (int i=0; i<10; i++) {
new Thread(() -> {
while (true) {
try {
System.out.println(currentThread() + " read " +
new String(shareData.read()));
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}