feat(namedpipe): 实现基于命名管道的读写锁功能

新增了基于命名管道的读写锁服务端和客户端实现,支持跨进程的读写锁同步机制。
主要功能包括:

- 读写锁基本语义:允许多个读者并发访问,写者独占访问
- FIFO锁请求队列,确保公平性
- 客户端自动重连与超时机制
- 完整的单元测试覆盖各种锁竞争场景
- 支持 Windows 命名管道和 Unix Domain Socket

此功能可用于需要跨进程同步访问共享资源的场景,提供可靠的并发控制能力。
This commit is contained in:
kingecg 2025-11-16 21:00:55 +08:00
parent d3637a21c5
commit a9cb0294af
4 changed files with 852 additions and 2 deletions

View File

@ -0,0 +1,236 @@
const NamedPipeLockServer = require('../lock.namedpipe');
const NamedPipeRWLock = require('../lock-client.namedpipe');
const fs = require('fs');
const os = require('os');
const path = require('path');
jest.setTimeout(30000);
// Helper function to create a unique pipe path for testing
const createTestPipePath = () => {
if (process.platform === 'win32') {
return `\\\\.\\pipe\\rwlock-test-${Date.now()}`;
} else {
return path.join(os.tmpdir(), `rwlock-test-${Date.now()}.sock`);
}
};
describe('NamedPipeLock', () => {
let server;
let pipePath;
// 在所有测试之前启动服务器
beforeAll(async () => {
pipePath = createTestPipePath();
server = new NamedPipeLockServer(pipePath);
server.start();
});
// 在所有测试完成后停止服务器
afterAll(() => {
server.stop();
// 清理管道文件 (仅限Unix系统)
if (process.platform !== 'win32' && fs.existsSync(pipePath)) {
fs.unlinkSync(pipePath);
}
});
describe('Basic Lock Operations', () => {
test('should acquire and release read lock without waiting when not locked', async () => {
const lock = new NamedPipeRWLock('resource1', pipePath);
const startTime = Date.now();
await lock.readLock();
const lockAcquiredTime = Date.now();
// 应该几乎立即获得锁(无需等待)
expect(lockAcquiredTime - startTime).toBeLessThan(100);
await lock.unlock();
lock.close();
});
test('should acquire and release write lock without waiting when not locked', async () => {
const lock = new NamedPipeRWLock('resource2', pipePath);
const startTime = Date.now();
await lock.writeLock();
const lockAcquiredTime = Date.now();
// 应该几乎立即获得锁(无需等待)
expect(lockAcquiredTime - startTime).toBeLessThan(100);
await lock.unlock();
lock.close();
});
test('should allow consecutive acquisitions and releases without queueing', async () => {
const lock = new NamedPipeRWLock('resource3', pipePath);
// 第一次获取
await lock.readLock();
expect(lock.isLocked).toBe(true);
await lock.unlock();
// 第二次获取
await lock.writeLock();
expect(lock.isLocked).toBe(true);
await lock.unlock();
// 第三次获取
await lock.readLock();
expect(lock.isLocked).toBe(true);
await lock.unlock();
lock.close();
});
});
describe('Multiple Clients', () => {
test('should handle multiple concurrent read locks', async () => {
const lock1 = new NamedPipeRWLock('sharedResource', pipePath);
const lock2 = new NamedPipeRWLock('sharedResource', pipePath);
const lock3 = new NamedPipeRWLock('sharedResource', pipePath);
// 所有读锁应该能够同时获取
await Promise.all([
lock1.readLock(),
lock2.readLock(),
lock3.readLock()
]);
expect(lock1.isLocked).toBe(true);
expect(lock2.isLocked).toBe(true);
expect(lock3.isLocked).toBe(true);
// 释放所有锁
await Promise.all([
lock1.unlock(),
lock2.unlock(),
lock3.unlock()
]);
lock1.close();
lock2.close();
lock3.close();
});
test('should queue write lock when read locks exist', async () => {
const readLock1 = new NamedPipeRWLock('queuedResource', pipePath);
const readLock2 = new NamedPipeRWLock('queuedResource', pipePath);
const writeLock = new NamedPipeRWLock('queuedResource', pipePath);
// 先获取两个读锁
await readLock1.readLock();
await readLock2.readLock();
// 尝试获取写锁,应该会被阻塞
let writeLockAcquired = false;
const writeLockPromise = writeLock.writeLock().then(() => {
writeLockAcquired = true;
});
// 等待一小段时间,写锁不应该被获取
await new Promise(resolve => setTimeout(resolve, 50));
expect(writeLockAcquired).toBe(false);
// 释放一个读锁
await readLock1.unlock();
// 等待一小段时间,写锁仍然不应该被获取(还有一个读锁)
await new Promise(resolve => setTimeout(resolve, 50));
expect(writeLockAcquired).toBe(false);
// 释放最后一个读锁
await readLock2.unlock();
// 现在写锁应该能被获取
await new Promise(resolve => setTimeout(resolve, 100));
expect(writeLockAcquired).toBe(true);
// 释放写锁
await writeLock.unlock();
readLock1.close();
readLock2.close();
writeLock.close();
});
test('should queue read locks when write lock exists', async () => {
const writeLock = new NamedPipeRWLock('queuedResource2', pipePath);
const readLock1 = new NamedPipeRWLock('queuedResource2', pipePath);
const readLock2 = new NamedPipeRWLock('queuedResource2', pipePath);
// 先获取写锁
await writeLock.writeLock();
// 尝试获取读锁,应该会被阻塞
let readLockAcquired = false;
const readLockPromise = readLock1.readLock().then(() => {
readLockAcquired = true;
});
// 等待一小段时间,读锁不应该被获取
await new Promise(resolve => setTimeout(resolve, 50));
expect(readLockAcquired).toBe(false);
// 再尝试获取另一个读锁,也应该被阻塞
let readLock2Acquired = false;
const readLock2Promise = readLock2.readLock().then(() => {
readLock2Acquired = true;
});
// 等待一小段时间,第二个读锁也不应该被获取
await new Promise(resolve => setTimeout(resolve, 50));
expect(readLock2Acquired).toBe(false);
// 释放写锁
await writeLock.unlock();
// 现在读锁应该能被获取
await new Promise(resolve => setTimeout(resolve, 100));
expect(readLockAcquired).toBe(true);
expect(readLock2Acquired).toBe(true);
// 释放读锁
await Promise.all([
readLock1.unlock(),
readLock2.unlock()
]);
writeLock.close();
readLock1.close();
readLock2.close();
});
});
describe('Error Handling', () => {
test('should reject when trying to acquire lock while already holding one', async () => {
const lock = new NamedPipeRWLock('errorResource', pipePath);
await lock.readLock();
// 尝试在已经持有锁的情况下再获取锁
await expect(lock.writeLock()).rejects.toThrow('Lock already held');
await lock.unlock();
lock.close();
});
test('should handle lock acquisition timeout', async () => {
const lock = new NamedPipeRWLock('timeoutResource', pipePath, {
timeout: 100 // 设置很短的超时时间
});
// 模拟一个永远不会释放的锁场景
const blockingLock = new NamedPipeRWLock('timeoutResource', pipePath);
await blockingLock.writeLock();
// 尝试获取已经被占用的锁,应该会超时
await expect(lock.writeLock()).rejects.toThrow(/timeout/);
await blockingLock.unlock();
blockingLock.close();
lock.close();
});
});
});

255
lock-client.namedpipe.js Normal file
View File

@ -0,0 +1,255 @@
// named-pipe-client.js
const net = require('net');
const { EventEmitter } = require('events');
const { v4: uuidv4 } = require('uuid');
/**
* 基于命名管道的读写锁客户端
* 提供对NamedPipeLockServer的客户端接口允许应用程序请求和释放读写锁
* 支持自动重连和超时机制
*/
class NamedPipeRWLock extends EventEmitter {
/**
* 创建NamedPipeRWLock实例
* @param {string} resource - 要锁定的资源名称
* @param {string} pipePath - 服务器命名管道路径
* @param {Object} options - 配置选项
* @param {number} options.timeout - 锁请求超时时间毫秒默认30000
* @param {number} options.retryInterval - 重连间隔毫秒默认1000
* @param {number} options.maxRetries - 最大重连次数默认5
*/
constructor(resource, pipePath = '\\\\.\\pipe\\rwlock-server', options = {}) {
super();
this.resource = resource;
this.pipePath = pipePath;
this.timeout = options.timeout || 30000;
this.retryInterval = options.retryInterval || 1000;
this.maxRetries = options.maxRetries || 5;
this.socket = null;
this.requestId = null;
this.isLocked = false;
this.lockType = null;
this.timeoutHandle = null;
this.retryCount = 0;
}
/**
* 连接到锁服务器
* @returns {Promise<void>} 连接成功时resolve
*/
async connect() {
return new Promise((resolve, reject) => {
if (this.socket && !this.socket.destroyed) {
resolve();
return;
}
this.socket = net.createConnection(this.pipePath, () => {
console.log(`Connected to lock server at ${this.pipePath}`);
this.retryCount = 0;
resolve();
});
this.socket.on('error', (error) => {
console.error('Connection error:', error);
reject(error);
});
this.socket.on('data', data => this.handleMessage(data));
this.socket.on('close', () => this.handleDisconnect());
this.socket.on('end', () => this.handleDisconnect());
});
}
/**
* 请求读锁
* @returns {Promise<void>} 锁获取成功时resolve
*/
async readLock() {
return this.acquireLock('readLock');
}
/**
* 请求写锁
* @returns {Promise<void>} 锁获取成功时resolve
*/
async writeLock() {
return this.acquireLock('writeLock');
}
/**
* 获取指定类型的锁
* @param {string} type - 锁类型 ('readLock' 'writeLock')
* @returns {Promise<void>} 锁获取成功时resolve
*/
async acquireLock(type) {
await this.ensureConnected();
return new Promise((resolve, reject) => {
if (this.isLocked) {
reject(new Error('Lock already held'));
return;
}
this.requestId = uuidv4();
this.lockType = type;
// 发送锁请求
this.sendMessage({
type,
resource: this.resource,
requestId: this.requestId
});
// 设置超时
this.timeoutHandle = setTimeout(() => {
this.cleanup();
reject(new Error(`Lock acquisition timeout after ${this.timeout}ms`));
}, this.timeout);
// 监听锁授予事件
const onLockGranted = (data) => {
if (data.requestId === this.requestId) {
this.isLocked = true;
clearTimeout(this.timeoutHandle);
this.removeListener('lockGranted', onLockGranted);
resolve();
}
};
this.on('lockGranted', onLockGranted);
// 监听错误事件
const onError = (errorData) => {
if (errorData.requestId === this.requestId) {
clearTimeout(this.timeoutHandle);
this.removeListener('error', onError);
reject(new Error(errorData.message));
}
};
this.on('error', onError);
});
}
/**
* 释放当前持有的锁
* @returns {Promise<void>}
*/
async unlock() {
if (!this.isLocked || !this.socket) return;
this.sendMessage({
type: 'unlock',
resource: this.resource
});
this.cleanup();
console.log(`Lock released for resource: ${this.resource}`);
}
/**
* 处理从服务器收到的消息
* @param {Buffer} data - 接收的数据
*/
handleMessage(data) {
try {
const messages = data.toString().split('\n').filter(msg => msg.trim());
for (const msg of messages) {
const message = JSON.parse(msg);
switch (message.type) {
case 'lockGranted':
this.emit('lockGranted', message);
break;
case 'error':
this.emit('error', message);
break;
default:
console.warn('Unknown message type:', message.type);
}
}
} catch (error) {
console.error('Error parsing message:', error, 'Raw data:', data.toString());
}
}
/**
* 处理与服务器的连接断开
*/
handleDisconnect() {
console.log('Disconnected from lock server');
this.cleanup();
this.socket = null;
// 自动重连逻辑
if (this.retryCount < this.maxRetries) {
this.retryCount++;
console.log(`Attempting to reconnect... (${this.retryCount}/${this.maxRetries})`);
setTimeout(() => {
this.ensureConnected().catch(error => {
console.error('Reconnection failed:', error);
});
}, this.retryInterval);
}
}
/**
* 向服务器发送消息
* @param {Object} message - 要发送的消息对象
*/
sendMessage(message) {
if (this.socket && !this.socket.destroyed) {
try {
this.socket.write(JSON.stringify(message) + '\n');
} catch (error) {
console.error('Error sending message:', error);
this.handleDisconnect();
}
} else {
console.error('Socket not connected');
this.handleDisconnect();
}
}
/**
* 确保客户端已连接到服务器
* @returns {Promise<void>}
*/
async ensureConnected() {
if (!this.socket || this.socket.destroyed) {
await this.connect();
}
}
/**
* 清理内部状态
*/
cleanup() {
this.isLocked = false;
this.lockType = null;
this.requestId = null;
if (this.timeoutHandle) {
clearTimeout(this.timeoutHandle);
this.timeoutHandle = null;
}
}
/**
* 关闭连接
*/
close() {
if (this.socket) {
this.socket.removeAllListeners();
this.socket.end();
this.socket.destroy();
}
this.cleanup();
}
}
module.exports = NamedPipeRWLock;

349
lock.namedpipe.js Normal file
View File

@ -0,0 +1,349 @@
// named-pipe-server.js
const net = require('net');
const fs = require('fs');
const path = require('path');
const { EventEmitter } = require('events');
const async_hooks = require('async_hooks');
const { v4: uuidv4 } = require('uuid');
const DEFAULT_PIPE_PATH = process.platform === 'win32'? '\\\\.\\pipe\\rwlock' : '/tmp/rwlock.sock';
/**
* 基于命名管道的读写锁服务器
* 提供跨进程的读写锁功能支持多个客户端同时请求对同一资源的读锁或写锁
* 实现读写锁的基本语义
* 1. 多个读操作可以同时进行
* 2. 写操作是独占的不允许同时进行读或其他写操作
* 3. 锁请求按先进先出(FIFO)方式处理
*/
class NamedPipeLockServer extends EventEmitter {
/**
* 创建一个NamedPipeLockServer实例
* @param {string} pipePath - 命名管道路径默认值根据操作系统确定
*/
constructor(pipePath = DEFAULT_PIPE_PATH) {
super();
this.pipePath = pipePath;
this.server = null;
this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] }
this.clients = new Map(); // clientId -> socket
}
/**
* 启动服务器
* 创建并监听命名管道处理客户端连接和消息
*/
start() {
// 确保管道文件不存在Windows
if (process.platform === 'win32') {
try {
if (fs.existsSync(this.pipePath)) {
fs.unlinkSync(this.pipePath);
}
} catch (error) {
// 忽略错误,可能管道正在使用
}
}
this.server = net.createServer(socket => {
const clientId = uuidv4();
socket.clientId = clientId;
this.clients.set(clientId, socket);
console.log(`Client connected: ${clientId}`);
socket.on('data', data => {
try {
const messageStrs = data.toString().split('\n');
messageStrs.forEach(messageStr => {
if (messageStr) {
const message = JSON.parse(messageStr);
this.handleMessage(clientId, message);
}
});
// const message = JSON.parse();
// this.handleMessage(clientId, message);
} catch (error) {
console.error('Error parsing message:', error);
this.sendError(clientId, 'Invalid message format');
}
});
socket.on('close', () => {
this.handleClientDisconnect(clientId);
this.clients.delete(clientId);
console.log(`Client disconnected: ${clientId}`);
});
socket.on('error', error => {
console.error(`Client error (${clientId}):`, error);
this.handleClientDisconnect(clientId);
this.clients.delete(clientId);
});
});
// 启动命名管道服务器
this.server.listen(this.pipePath, () => {
console.log(`Lock server listening on named pipe: ${this.pipePath}`);
});
this.server.on('error', error => {
console.error('Server error:', error);
});
}
/**
* 处理来自客户端的消息
* @param {string} clientId - 客户端标识符
* @param {Object} message - 消息对象
*/
handleMessage(clientId, message) {
const { type, resource, requestId } = message;
if (!this.locks.has(resource)) {
this.locks.set(resource, {
readers: new Set(),
writer: null,
queue: []
});
}
const lock = this.locks.get(resource);
switch (type) {
case 'readLock':
this.handleReadLock(clientId, resource, requestId, lock);
break;
case 'writeLock':
this.handleWriteLock(clientId, resource, requestId, lock);
break;
case 'unlock':
this.handleUnlock(clientId, resource, lock);
break;
default:
this.sendError(clientId, `Unknown message type: ${type}`);
}
}
/**
* 处理读锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleReadLock(clientId, resource, requestId, lock) {
if (!lock.writer) {
// 可以立即获取读锁
lock.readers.add(clientId);
this.sendToClient(clientId, {
type: 'lockGranted',
requestId,
resource,
lockType: 'read'
});
console.log(`Read lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'read', requestId });
console.log(`Read lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理写锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleWriteLock(clientId, resource, requestId, lock) {
if (lock.readers.size === 0 && !lock.writer) {
// 可以立即获取写锁
lock.writer = clientId;
this.sendToClient(clientId, {
type: 'lockGranted',
requestId,
resource,
lockType: 'write'
});
console.log(`Write lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'write', requestId });
console.log(`Write lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理解锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
handleUnlock(clientId, resource, lock) {
let released = false;
// 移除读锁
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
console.log(`Read lock released by ${clientId} for ${resource}`);
}
// 移除写锁
if (lock.writer === clientId) {
lock.writer = null;
released = true;
console.log(`Write lock released by ${clientId} for ${resource}`);
}
if (released) {
// 处理等待队列
this.processQueue(resource, lock);
}
}
/**
* 处理等待队列中的锁请求
* 根据读写锁规则尽可能多地授权锁请求
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
processQueue(resource, lock) {
const granted = [];
for (let i = 0; i < lock.queue.length; i++) {
const request = lock.queue[i];
if (request.type === 'read') {
if (!lock.writer) {
// 可以授予读锁
lock.readers.add(request.clientId);
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'read'
});
granted.push(i);
console.log(`Queued read lock granted to ${request.clientId} for ${resource}`);
} else {
// 有写锁等待,读锁必须等待
break;
}
} else { // write
if (lock.readers.size === 0 && !lock.writer) {
// 可以授予写锁
lock.writer = request.clientId;
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'write'
});
granted.push(i);
console.log(`Queued write lock granted to ${request.clientId} for ${resource}`);
} else {
// 写锁必须等待前面的锁释放
break;
}
}
}
// 移除已处理的请求
if (granted.length > 0) {
lock.queue = lock.queue.filter((_, index) => !granted.includes(index));
}
}
/**
* 处理客户端断开连接
* 释放该客户端持有的所有锁并处理相关队列
* @param {string} clientId - 客户端标识符
*/
handleClientDisconnect(clientId) {
console.log(`Processing disconnect for client: ${clientId}`);
// 释放该客户端持有的所有锁
for (const [resource, lock] of this.locks) {
let released = false;
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
}
if (lock.writer === clientId) {
lock.writer = null;
released = true;
}
// 如果释放了锁,重新处理队列
if (released) {
this.processQueue(resource, lock);
}
}
}
/**
* 向指定客户端发送消息
* @param {string} clientId - 客户端标识符
* @param {Object} message - 要发送的消息对象
*/
sendToClient(clientId, message) {
const socket = this.clients.get(clientId);
if (socket && !socket.destroyed) {
try {
socket.write(JSON.stringify(message));
} catch (error) {
console.error(`Error sending to client ${clientId}:`, error);
}
}
}
/**
* 向客户端发送错误消息
* @param {string} clientId - 客户端标识符
* @param {string} errorMessage - 错误信息
*/
sendError(clientId, errorMessage) {
this.sendToClient(clientId, {
type: 'error',
message: errorMessage
});
}
/**
* 停止服务器
*/
stop() {
if (this.server) {
this.server.close();
console.log('Lock server stopped');
}
}
}
// // 启动服务器
// const pipePath = process.platform === 'win32'
// ? '\\\\.\\pipe\\rwlock-server'
// : '/tmp/rwlock.sock';
// const server = new NamedPipeLockServer(pipePath);
// server.start();
// // 优雅关闭
// process.on('SIGINT', () => {
// console.log('Shutting down lock server...');
// server.stop();
// process.exit(0);
// });
// process.on('SIGTERM', () => {
// console.log('Shutting down lock server...');
// server.stop();
// process.exit(0);
// });
module.exports = NamedPipeLockServer;

View File

@ -6,10 +6,20 @@
"scripts": {
"test": "jest"
},
"keywords": ["lock", "async", "file", "mutex", "synchronization", "concurrency"],
"keywords": [
"lock",
"async",
"file",
"mutex",
"synchronization",
"concurrency"
],
"author": "nlocks contributors",
"license": "MIT",
"devDependencies": {
"jest": "^30.2.0"
},
"dependencies": {
"uuid": "^8.3.2"
}
}