Java高并发 - 17.读写锁分离设计模式

在多线程的情况下访问共享资源,需要对资源进行同步操作以防止数据不一致的情况发生,通常可以使用synchronized关键字或者显式锁。

对资源的访问一般包括两种类型的动作——读和写。多个线程同时对资源进行读操作并不会引起数据不一致的问题,这时直接采用排他的方式加锁,就显得过于简单粗暴。可以通过实现一个读和写分离的锁提升程序性能。

1. 接口定义

Lock接口

public interface Lock {
	// 获取显式锁,没有获得锁的线程将被阻塞
	void lock() throws InterruptedException;
	// 释放获取的锁
	void unlock();
}

ReadWriteLock接口

public interface ReadWriteLock {
	// 创建reader锁
	Lock readLock();
	
	// 创建write锁
	Lock writeLock();
	
	// 获取当前有多少线程正在执行写操作
	int getWritingWriters();
	
	// 获取当前有多少线程正在等待获取写入锁
	int getWaitingWriters();
	
	// 获取当前有多少线程正在等待获取reader锁
	int getReadingReaders();
	
	// 工厂方法,创建ReadWriteLock
	static ReadWriteLock readWriteLock() {
		return new ReadWriteLockImpl();
	}
	
	// 工厂方法,创建ReadWriteLock,并且传入preferWriter
	static ReadWriteLock readWriteLock(boolean preferWriter) {
		return new ReadWriteLockImpl(preferWriter);
	}
}

ReadWriteLock虽然名字有Lock,但它并不是Lock,主要用于创建read lock和write lock,并且提供了查询功能用于查询当前有多少个reader和writer以及waiting中的writer。根据前面的分析,如果reader的个数大于0,则意味着writer的个数等于0,反之writer的个数大于0(事实上只能是1),则reader的个数等于0。

2. 程序实现

ReadWriteLockImpl像是一个工厂类,可以通过它创建不同类型的锁。

class ReadWriteLockImpl implements ReadWriteLock {
	// 定义对象锁
	private final Object MUTEX = new Object();
	
	// 当前有多少个线程正在写
	private int writingWriters = 0;
	// 当前有多少个线程正在等待写入
	private int waitingWriters = 0;
	// 当前有多少个线程正在读
	private int readingReaders = 0;
	// read和write的偏好设置
	private boolean preferWriter;
	
	// 构造ReadWriteLockImpl并且传入preferWriter
	public ReadWriteLockImpl(boolean preferWriter) {
		this.preferWriter = preferWriter;
	}
	
	// 创建read lock
	public Lock readLock() {
		return new ReadLock(this);
	}
	
	// 创建write lock
	public Lock writeLock() {
		return new WriteLock(this);
	}
	
	// 使写的线程数量增加
	void incrementWritingWriters() {
		this.writingWriters++;
	}
	
	// 使等待写的线程数量增加
	void incrementWaitingWriters() {
		this.waitingWriters++;
	}
	
	// 使读的线程数量增加
	void incrementReadingReaders() {
		this.readingReaders++;
	}
	
	// 使写的线程数量减少
	void decrementWritingWriters() {
		this.writingWriters--;
	}
	
	// 使等待写的线程数量减少
	void decrementWaitingWriters() {
		this.waitingWriters--;
	}
	
	// 使读的线程数量减少
	void decrementReadingReaders() {
		this.readingReaders--;
	}
	
	// 获取当前有多少个线程正在进行写操作
	public int getWritingWriters() {
		return this.writingWriters;
	}
	
	// 获取当前有多少个线程正在等待获取写入锁
	public int getWaitingWriters() {
		return this.waitingWriters;
	}
	
	// 获取当前有多少个线程正在进行读操作
	public int getReadingReaders() {
		return this.readingReaders;
	}
	
	// 获取对象锁
	Object getMutex() {
		return this.MUTEX;
	}
	
	// 获取当前是否偏向写锁
	boolean getPreferWriter() {
		return this.preferWriter;
	}
	
	// 设置写锁偏好
	void changePrefer(boolean preferWriter) {
		this.preferWriter = preferWriter;
	}
}

ReadLock读锁

class ReadLock implements Lock {
	private final ReadWriteLockImpl readWriteLock;
	
	ReadLock(ReadWriteLockImpl readWriteLock) {
		this.readWriteLock = readWriteLock;
	}
	
	@Override
	public void lock() throws InterruptedException {
		// 使用Mutex作为锁
		synchronized (readWriteLock.getMutex()) {
			// 若此时有线程在进行写操作,或者有写线程在等待并且偏向写时,就会无法获得读锁,只能被挂起
			while (readWriteLock.getWritingWriters() > 0
					|| (readWriteLock.getPreferWriter()
					&& readWriteLock.getWaitingWriters() > 0)) {
				readWriteLock.getMutex().wait();
			}
			readWriteLock.incrementReadingReaders();
		}
	}
	
	@Override
	public void unlock() {
		synchronized (readWriteLock.getMutex()) {
			// 释放锁的过程就是使得当前reading的数量减一
			readWriteLock.decrementReadingReaders();
			// 将preferWriter设置为true,可以使得writer线程获得更多机会
			readWriteLock.changePrefer(true);
			// 通知唤醒与Mutex关联monitor waitset中的线程
			readWriteLock.getMutex().notifyAll();
		}
	}
}

WriteLock写锁

class WriteLock implements Lock {
	private final ReadWriteLockImpl readWriteLock;
	
	WriteLock(ReadWriteLockImpl readWriteLock) {
		this.readWriteLock = readWriteLock;
	}
	
	@Override
	public void lock() throws InterruptedException {
		synchronized (readWriteLock.getMutex()) {
			try {
				// 首先使等待获取写入锁的数量加一
				readWriteLock.incrementWaitingWriters();
				// 如果此时有其他线程正在进行读操作或者写操作,当前线程将被挂起
				while (readWriteLock.getReadingReaders() > 0
						|| readWriteLock.getWritingWriters() > 0) {
					readWriteLock.getMutex().wait();
				}
			} finally {
				// 成功获取到写入锁,使得等待获取写入锁的计数器减一
				this.readWriteLock.decrementWaitingWriters();
			}
			// 将正在写入的线程数量加一
			readWriteLock.incrementWritingWriters();
		}
	}
	
	@Override
	public void unlock() {
		synchronized (readWriteLock.getMutex()) {
			// 减少正在写入锁的线程计数器
			readWriteLock.decrementWritingWriters();
			// 将偏好状态修改为false,可以使得读锁能被快速获得
			readWriteLock.changePrefer(false);
			// 通知唤醒其他在Mutex monitor waitset中的线程
			readWriteLock.getMutex().notifyAll();
		}
	}
}

3. 读写锁的使用

public class ShareData {
	// 定义共享数据(资源)
	private final List<Character> container = new ArrayList<>();
	// 构造ReadWriteLock
	private final ReadWriteLock readWriteLock = ReadWriteLock.readWriteLock();
	// 创建读锁
	private final Lock readLock = readWriteLock.readLock();
	// 创建写锁
	private final Lock writeLock = readWriteLock.writeLock();
	private final int length;
	
	public ShareData(int length) {
		this.length = length;
		for (int i=0; i<length; i++) {
			container.add(i, 'c');
		}
	}
	
	public char[] read() throws InterruptedException {
		try {
			// 首先锁住读锁
			readLock.lock();
			char[] newBuffer = new char[length];
			for (int i=0; i<length; i++) {
				newBuffer[i] = container.get(i);
			}
			slowly();
			return newBuffer;
		} finally {
			// 当操作结束后,将锁释放
			readLock.unlock();
		}
	}
	
	public void write(char c) throws InterruptedException {
		try {
			// 首先锁住写锁
			writeLock.lock();
			for (int i=0; i<length; i++) {
				this.container.add(i, c);
			}
			slowly();
		} finally {
			writeLock.unlock();
		}
	}
	
	// 模拟耗时操作
	public void slowly() {
		try {
			TimeUnit.SECONDS.sleep(1);
		} catch(InterruptedException e) {
			e.printStackTrace();
		}
	}
}

ShareData中涉及对数据的读写操作,因此它是需要进行线程同步控制。首先创建一个ReadWriteLock工厂类,然后用该工厂分别创建ReadLock和WriteLock的实例,在read方法中使用ReadLock对其进行加锁,而在write方法中则使用WriteLock。

public class ReadWriteLockTest {
	private final static String text = "This is the example of ReadWriteLock";
	
	public static void main(String[] args) {
		// 定义共享数据
		final ShareData shareData = new ShareData(50);
		// 创建两个线程进行数据写操作
		for (int i=0; i<2; i++) {
			new Thread(() -> {
				for (int index = 0; index < text.length(); index++) {
					try {
						char c = text.charAt(index);
						shareData.write(c);
						System.out.println(currentThread() + " write " + c);
					} catch(InterruptedException e) {
					}
				}
			}).start();
		}
		
		for (int i=0; i<10; i++) {
			new Thread(() -> {
				while (true) {
					try {
						System.out.println(currentThread() + " read " +
								new String(shareData.read()));
					} catch(InterruptedException e) {
						e.printStackTrace();
					}
				}
			}).start();
		}
	}
}