diff --git a/__tests__/namedpipe-lock.test.js b/__tests__/namedpipe-lock.test.js new file mode 100644 index 0000000..b30bbe1 --- /dev/null +++ b/__tests__/namedpipe-lock.test.js @@ -0,0 +1,236 @@ +const NamedPipeLockServer = require('../lock.namedpipe'); +const NamedPipeRWLock = require('../lock-client.namedpipe'); +const fs = require('fs'); +const os = require('os'); +const path = require('path'); + +jest.setTimeout(30000); +// Helper function to create a unique pipe path for testing +const createTestPipePath = () => { + if (process.platform === 'win32') { + return `\\\\.\\pipe\\rwlock-test-${Date.now()}`; + } else { + return path.join(os.tmpdir(), `rwlock-test-${Date.now()}.sock`); + } +}; + +describe('NamedPipeLock', () => { + let server; + let pipePath; + + // 在所有测试之前启动服务器 + beforeAll(async () => { + pipePath = createTestPipePath(); + server = new NamedPipeLockServer(pipePath); + server.start(); + }); + + // 在所有测试完成后停止服务器 + afterAll(() => { + server.stop(); + // 清理管道文件 (仅限Unix系统) + if (process.platform !== 'win32' && fs.existsSync(pipePath)) { + fs.unlinkSync(pipePath); + } + }); + + describe('Basic Lock Operations', () => { + test('should acquire and release read lock without waiting when not locked', async () => { + const lock = new NamedPipeRWLock('resource1', pipePath); + + const startTime = Date.now(); + await lock.readLock(); + const lockAcquiredTime = Date.now(); + + // 应该几乎立即获得锁(无需等待) + expect(lockAcquiredTime - startTime).toBeLessThan(100); + + await lock.unlock(); + lock.close(); + }); + + test('should acquire and release write lock without waiting when not locked', async () => { + const lock = new NamedPipeRWLock('resource2', pipePath); + + const startTime = Date.now(); + await lock.writeLock(); + const lockAcquiredTime = Date.now(); + + // 应该几乎立即获得锁(无需等待) + expect(lockAcquiredTime - startTime).toBeLessThan(100); + + await lock.unlock(); + lock.close(); + }); + + test('should allow consecutive acquisitions and releases without queueing', async () => { + const lock = new NamedPipeRWLock('resource3', pipePath); + + // 第一次获取 + await lock.readLock(); + expect(lock.isLocked).toBe(true); + await lock.unlock(); + + // 第二次获取 + await lock.writeLock(); + expect(lock.isLocked).toBe(true); + await lock.unlock(); + + // 第三次获取 + await lock.readLock(); + expect(lock.isLocked).toBe(true); + await lock.unlock(); + + lock.close(); + }); + }); + + describe('Multiple Clients', () => { + test('should handle multiple concurrent read locks', async () => { + const lock1 = new NamedPipeRWLock('sharedResource', pipePath); + const lock2 = new NamedPipeRWLock('sharedResource', pipePath); + const lock3 = new NamedPipeRWLock('sharedResource', pipePath); + + // 所有读锁应该能够同时获取 + await Promise.all([ + lock1.readLock(), + lock2.readLock(), + lock3.readLock() + ]); + + expect(lock1.isLocked).toBe(true); + expect(lock2.isLocked).toBe(true); + expect(lock3.isLocked).toBe(true); + + // 释放所有锁 + await Promise.all([ + lock1.unlock(), + lock2.unlock(), + lock3.unlock() + ]); + + lock1.close(); + lock2.close(); + lock3.close(); + }); + + test('should queue write lock when read locks exist', async () => { + const readLock1 = new NamedPipeRWLock('queuedResource', pipePath); + const readLock2 = new NamedPipeRWLock('queuedResource', pipePath); + const writeLock = new NamedPipeRWLock('queuedResource', pipePath); + + // 先获取两个读锁 + await readLock1.readLock(); + await readLock2.readLock(); + + // 尝试获取写锁,应该会被阻塞 + let writeLockAcquired = false; + const writeLockPromise = writeLock.writeLock().then(() => { + writeLockAcquired = true; + }); + + // 等待一小段时间,写锁不应该被获取 + await new Promise(resolve => setTimeout(resolve, 50)); + expect(writeLockAcquired).toBe(false); + + // 释放一个读锁 + await readLock1.unlock(); + + // 等待一小段时间,写锁仍然不应该被获取(还有一个读锁) + await new Promise(resolve => setTimeout(resolve, 50)); + expect(writeLockAcquired).toBe(false); + + // 释放最后一个读锁 + await readLock2.unlock(); + + // 现在写锁应该能被获取 + await new Promise(resolve => setTimeout(resolve, 100)); + expect(writeLockAcquired).toBe(true); + + // 释放写锁 + await writeLock.unlock(); + + readLock1.close(); + readLock2.close(); + writeLock.close(); + }); + + test('should queue read locks when write lock exists', async () => { + const writeLock = new NamedPipeRWLock('queuedResource2', pipePath); + const readLock1 = new NamedPipeRWLock('queuedResource2', pipePath); + const readLock2 = new NamedPipeRWLock('queuedResource2', pipePath); + + // 先获取写锁 + await writeLock.writeLock(); + + // 尝试获取读锁,应该会被阻塞 + let readLockAcquired = false; + const readLockPromise = readLock1.readLock().then(() => { + readLockAcquired = true; + }); + + // 等待一小段时间,读锁不应该被获取 + await new Promise(resolve => setTimeout(resolve, 50)); + expect(readLockAcquired).toBe(false); + + // 再尝试获取另一个读锁,也应该被阻塞 + let readLock2Acquired = false; + const readLock2Promise = readLock2.readLock().then(() => { + readLock2Acquired = true; + }); + + // 等待一小段时间,第二个读锁也不应该被获取 + await new Promise(resolve => setTimeout(resolve, 50)); + expect(readLock2Acquired).toBe(false); + + // 释放写锁 + await writeLock.unlock(); + + // 现在读锁应该能被获取 + await new Promise(resolve => setTimeout(resolve, 100)); + expect(readLockAcquired).toBe(true); + expect(readLock2Acquired).toBe(true); + + // 释放读锁 + await Promise.all([ + readLock1.unlock(), + readLock2.unlock() + ]); + + writeLock.close(); + readLock1.close(); + readLock2.close(); + }); + }); + + describe('Error Handling', () => { + test('should reject when trying to acquire lock while already holding one', async () => { + const lock = new NamedPipeRWLock('errorResource', pipePath); + + await lock.readLock(); + + // 尝试在已经持有锁的情况下再获取锁 + await expect(lock.writeLock()).rejects.toThrow('Lock already held'); + + await lock.unlock(); + lock.close(); + }); + + test('should handle lock acquisition timeout', async () => { + const lock = new NamedPipeRWLock('timeoutResource', pipePath, { + timeout: 100 // 设置很短的超时时间 + }); + + // 模拟一个永远不会释放的锁场景 + const blockingLock = new NamedPipeRWLock('timeoutResource', pipePath); + await blockingLock.writeLock(); + + // 尝试获取已经被占用的锁,应该会超时 + await expect(lock.writeLock()).rejects.toThrow(/timeout/); + + await blockingLock.unlock(); + blockingLock.close(); + lock.close(); + }); + }); +}); \ No newline at end of file diff --git a/lock-client.namedpipe.js b/lock-client.namedpipe.js new file mode 100644 index 0000000..cb49f98 --- /dev/null +++ b/lock-client.namedpipe.js @@ -0,0 +1,255 @@ +// named-pipe-client.js +const net = require('net'); +const { EventEmitter } = require('events'); +const { v4: uuidv4 } = require('uuid'); + +/** + * 基于命名管道的读写锁客户端 + * 提供对NamedPipeLockServer的客户端接口,允许应用程序请求和释放读写锁 + * 支持自动重连和超时机制 + */ +class NamedPipeRWLock extends EventEmitter { + /** + * 创建NamedPipeRWLock实例 + * @param {string} resource - 要锁定的资源名称 + * @param {string} pipePath - 服务器命名管道路径 + * @param {Object} options - 配置选项 + * @param {number} options.timeout - 锁请求超时时间(毫秒),默认30000 + * @param {number} options.retryInterval - 重连间隔(毫秒),默认1000 + * @param {number} options.maxRetries - 最大重连次数,默认5 + */ + constructor(resource, pipePath = '\\\\.\\pipe\\rwlock-server', options = {}) { + super(); + this.resource = resource; + this.pipePath = pipePath; + this.timeout = options.timeout || 30000; + this.retryInterval = options.retryInterval || 1000; + this.maxRetries = options.maxRetries || 5; + + this.socket = null; + this.requestId = null; + this.isLocked = false; + this.lockType = null; + this.timeoutHandle = null; + this.retryCount = 0; + } + + /** + * 连接到锁服务器 + * @returns {Promise} 连接成功时resolve + */ + async connect() { + return new Promise((resolve, reject) => { + if (this.socket && !this.socket.destroyed) { + resolve(); + return; + } + + this.socket = net.createConnection(this.pipePath, () => { + console.log(`Connected to lock server at ${this.pipePath}`); + this.retryCount = 0; + resolve(); + }); + + this.socket.on('error', (error) => { + console.error('Connection error:', error); + reject(error); + }); + + this.socket.on('data', data => this.handleMessage(data)); + this.socket.on('close', () => this.handleDisconnect()); + this.socket.on('end', () => this.handleDisconnect()); + }); + } + + /** + * 请求读锁 + * @returns {Promise} 锁获取成功时resolve + */ + async readLock() { + return this.acquireLock('readLock'); + } + + /** + * 请求写锁 + * @returns {Promise} 锁获取成功时resolve + */ + async writeLock() { + return this.acquireLock('writeLock'); + } + + /** + * 获取指定类型的锁 + * @param {string} type - 锁类型 ('readLock' 或 'writeLock') + * @returns {Promise} 锁获取成功时resolve + */ + async acquireLock(type) { + await this.ensureConnected(); + + return new Promise((resolve, reject) => { + if (this.isLocked) { + reject(new Error('Lock already held')); + return; + } + + this.requestId = uuidv4(); + this.lockType = type; + + // 发送锁请求 + this.sendMessage({ + type, + resource: this.resource, + requestId: this.requestId + }); + + // 设置超时 + this.timeoutHandle = setTimeout(() => { + this.cleanup(); + reject(new Error(`Lock acquisition timeout after ${this.timeout}ms`)); + }, this.timeout); + + // 监听锁授予事件 + const onLockGranted = (data) => { + if (data.requestId === this.requestId) { + this.isLocked = true; + clearTimeout(this.timeoutHandle); + this.removeListener('lockGranted', onLockGranted); + resolve(); + } + }; + + this.on('lockGranted', onLockGranted); + + // 监听错误事件 + const onError = (errorData) => { + if (errorData.requestId === this.requestId) { + clearTimeout(this.timeoutHandle); + this.removeListener('error', onError); + reject(new Error(errorData.message)); + } + }; + + this.on('error', onError); + }); + } + + /** + * 释放当前持有的锁 + * @returns {Promise} + */ + async unlock() { + if (!this.isLocked || !this.socket) return; + + this.sendMessage({ + type: 'unlock', + resource: this.resource + }); + + this.cleanup(); + console.log(`Lock released for resource: ${this.resource}`); + } + + /** + * 处理从服务器收到的消息 + * @param {Buffer} data - 接收的数据 + */ + handleMessage(data) { + try { + const messages = data.toString().split('\n').filter(msg => msg.trim()); + + for (const msg of messages) { + const message = JSON.parse(msg); + + switch (message.type) { + case 'lockGranted': + this.emit('lockGranted', message); + break; + case 'error': + this.emit('error', message); + break; + default: + console.warn('Unknown message type:', message.type); + } + } + } catch (error) { + console.error('Error parsing message:', error, 'Raw data:', data.toString()); + } + } + + /** + * 处理与服务器的连接断开 + */ + handleDisconnect() { + console.log('Disconnected from lock server'); + this.cleanup(); + this.socket = null; + + // 自动重连逻辑 + if (this.retryCount < this.maxRetries) { + this.retryCount++; + console.log(`Attempting to reconnect... (${this.retryCount}/${this.maxRetries})`); + + setTimeout(() => { + this.ensureConnected().catch(error => { + console.error('Reconnection failed:', error); + }); + }, this.retryInterval); + } + } + + /** + * 向服务器发送消息 + * @param {Object} message - 要发送的消息对象 + */ + sendMessage(message) { + if (this.socket && !this.socket.destroyed) { + try { + this.socket.write(JSON.stringify(message) + '\n'); + } catch (error) { + console.error('Error sending message:', error); + this.handleDisconnect(); + } + } else { + console.error('Socket not connected'); + this.handleDisconnect(); + } + } + + /** + * 确保客户端已连接到服务器 + * @returns {Promise} + */ + async ensureConnected() { + if (!this.socket || this.socket.destroyed) { + await this.connect(); + } + } + + /** + * 清理内部状态 + */ + cleanup() { + this.isLocked = false; + this.lockType = null; + this.requestId = null; + if (this.timeoutHandle) { + clearTimeout(this.timeoutHandle); + this.timeoutHandle = null; + } + } + + /** + * 关闭连接 + */ + close() { + if (this.socket) { + this.socket.removeAllListeners(); + + this.socket.end(); + this.socket.destroy(); + } + this.cleanup(); + } +} + +module.exports = NamedPipeRWLock; \ No newline at end of file diff --git a/lock.namedpipe.js b/lock.namedpipe.js new file mode 100644 index 0000000..dc43f8e --- /dev/null +++ b/lock.namedpipe.js @@ -0,0 +1,349 @@ +// named-pipe-server.js +const net = require('net'); +const fs = require('fs'); +const path = require('path'); +const { EventEmitter } = require('events'); +const async_hooks = require('async_hooks'); +const { v4: uuidv4 } = require('uuid'); + +const DEFAULT_PIPE_PATH = process.platform === 'win32'? '\\\\.\\pipe\\rwlock' : '/tmp/rwlock.sock'; + +/** + * 基于命名管道的读写锁服务器 + * 提供跨进程的读写锁功能,支持多个客户端同时请求对同一资源的读锁或写锁 + * 实现读写锁的基本语义: + * 1. 多个读操作可以同时进行 + * 2. 写操作是独占的,不允许同时进行读或其他写操作 + * 3. 锁请求按先进先出(FIFO)方式处理 + */ +class NamedPipeLockServer extends EventEmitter { + /** + * 创建一个NamedPipeLockServer实例 + * @param {string} pipePath - 命名管道路径,默认值根据操作系统确定 + */ + constructor(pipePath = DEFAULT_PIPE_PATH) { + super(); + this.pipePath = pipePath; + this.server = null; + this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] } + this.clients = new Map(); // clientId -> socket + } + + /** + * 启动服务器 + * 创建并监听命名管道,处理客户端连接和消息 + */ + start() { + // 确保管道文件不存在(Windows) + if (process.platform === 'win32') { + try { + if (fs.existsSync(this.pipePath)) { + fs.unlinkSync(this.pipePath); + } + } catch (error) { + // 忽略错误,可能管道正在使用 + } + } + + this.server = net.createServer(socket => { + const clientId = uuidv4(); + socket.clientId = clientId; + this.clients.set(clientId, socket); + + console.log(`Client connected: ${clientId}`); + + socket.on('data', data => { + try { + const messageStrs = data.toString().split('\n'); + messageStrs.forEach(messageStr => { + if (messageStr) { + const message = JSON.parse(messageStr); + this.handleMessage(clientId, message); + } + }); + // const message = JSON.parse(); + // this.handleMessage(clientId, message); + } catch (error) { + console.error('Error parsing message:', error); + this.sendError(clientId, 'Invalid message format'); + } + }); + + socket.on('close', () => { + this.handleClientDisconnect(clientId); + this.clients.delete(clientId); + console.log(`Client disconnected: ${clientId}`); + }); + + socket.on('error', error => { + console.error(`Client error (${clientId}):`, error); + this.handleClientDisconnect(clientId); + this.clients.delete(clientId); + }); + }); + + // 启动命名管道服务器 + this.server.listen(this.pipePath, () => { + console.log(`Lock server listening on named pipe: ${this.pipePath}`); + }); + + this.server.on('error', error => { + console.error('Server error:', error); + }); + } + + /** + * 处理来自客户端的消息 + * @param {string} clientId - 客户端标识符 + * @param {Object} message - 消息对象 + */ + handleMessage(clientId, message) { + const { type, resource, requestId } = message; + + if (!this.locks.has(resource)) { + this.locks.set(resource, { + readers: new Set(), + writer: null, + queue: [] + }); + } + + const lock = this.locks.get(resource); + + switch (type) { + case 'readLock': + this.handleReadLock(clientId, resource, requestId, lock); + break; + case 'writeLock': + this.handleWriteLock(clientId, resource, requestId, lock); + break; + case 'unlock': + this.handleUnlock(clientId, resource, lock); + break; + default: + this.sendError(clientId, `Unknown message type: ${type}`); + } + } + + /** + * 处理读锁请求 + * @param {string} clientId - 客户端标识符 + * @param {string} resource - 资源名称 + * @param {string} requestId - 请求ID + * @param {Object} lock - 锁对象 + */ + handleReadLock(clientId, resource, requestId, lock) { + if (!lock.writer) { + // 可以立即获取读锁 + lock.readers.add(clientId); + this.sendToClient(clientId, { + type: 'lockGranted', + requestId, + resource, + lockType: 'read' + }); + console.log(`Read lock granted to ${clientId} for ${resource}`); + } else { + // 加入等待队列 + lock.queue.push({ clientId, type: 'read', requestId }); + console.log(`Read lock queued for ${clientId} for ${resource}`); + } + } + + /** + * 处理写锁请求 + * @param {string} clientId - 客户端标识符 + * @param {string} resource - 资源名称 + * @param {string} requestId - 请求ID + * @param {Object} lock - 锁对象 + */ + handleWriteLock(clientId, resource, requestId, lock) { + if (lock.readers.size === 0 && !lock.writer) { + // 可以立即获取写锁 + lock.writer = clientId; + this.sendToClient(clientId, { + type: 'lockGranted', + requestId, + resource, + lockType: 'write' + }); + console.log(`Write lock granted to ${clientId} for ${resource}`); + } else { + // 加入等待队列 + lock.queue.push({ clientId, type: 'write', requestId }); + console.log(`Write lock queued for ${clientId} for ${resource}`); + } + } + + /** + * 处理解锁请求 + * @param {string} clientId - 客户端标识符 + * @param {string} resource - 资源名称 + * @param {Object} lock - 锁对象 + */ + handleUnlock(clientId, resource, lock) { + let released = false; + + // 移除读锁 + if (lock.readers.has(clientId)) { + lock.readers.delete(clientId); + released = true; + console.log(`Read lock released by ${clientId} for ${resource}`); + } + + // 移除写锁 + if (lock.writer === clientId) { + lock.writer = null; + released = true; + console.log(`Write lock released by ${clientId} for ${resource}`); + } + + if (released) { + // 处理等待队列 + this.processQueue(resource, lock); + } + } + + /** + * 处理等待队列中的锁请求 + * 根据读写锁规则,尽可能多地授权锁请求 + * @param {string} resource - 资源名称 + * @param {Object} lock - 锁对象 + */ + processQueue(resource, lock) { + const granted = []; + + for (let i = 0; i < lock.queue.length; i++) { + const request = lock.queue[i]; + + if (request.type === 'read') { + if (!lock.writer) { + // 可以授予读锁 + lock.readers.add(request.clientId); + this.sendToClient(request.clientId, { + type: 'lockGranted', + requestId: request.requestId, + resource, + lockType: 'read' + }); + granted.push(i); + console.log(`Queued read lock granted to ${request.clientId} for ${resource}`); + } else { + // 有写锁等待,读锁必须等待 + break; + } + } else { // write + if (lock.readers.size === 0 && !lock.writer) { + // 可以授予写锁 + lock.writer = request.clientId; + this.sendToClient(request.clientId, { + type: 'lockGranted', + requestId: request.requestId, + resource, + lockType: 'write' + }); + granted.push(i); + console.log(`Queued write lock granted to ${request.clientId} for ${resource}`); + } else { + // 写锁必须等待前面的锁释放 + break; + } + } + } + + // 移除已处理的请求 + if (granted.length > 0) { + lock.queue = lock.queue.filter((_, index) => !granted.includes(index)); + } + } + + /** + * 处理客户端断开连接 + * 释放该客户端持有的所有锁,并处理相关队列 + * @param {string} clientId - 客户端标识符 + */ + handleClientDisconnect(clientId) { + console.log(`Processing disconnect for client: ${clientId}`); + + // 释放该客户端持有的所有锁 + for (const [resource, lock] of this.locks) { + let released = false; + + if (lock.readers.has(clientId)) { + lock.readers.delete(clientId); + released = true; + } + + if (lock.writer === clientId) { + lock.writer = null; + released = true; + } + + // 如果释放了锁,重新处理队列 + if (released) { + this.processQueue(resource, lock); + } + } + } + + /** + * 向指定客户端发送消息 + * @param {string} clientId - 客户端标识符 + * @param {Object} message - 要发送的消息对象 + */ + sendToClient(clientId, message) { + const socket = this.clients.get(clientId); + if (socket && !socket.destroyed) { + try { + socket.write(JSON.stringify(message)); + } catch (error) { + console.error(`Error sending to client ${clientId}:`, error); + } + } + } + + /** + * 向客户端发送错误消息 + * @param {string} clientId - 客户端标识符 + * @param {string} errorMessage - 错误信息 + */ + sendError(clientId, errorMessage) { + this.sendToClient(clientId, { + type: 'error', + message: errorMessage + }); + } + + /** + * 停止服务器 + */ + stop() { + if (this.server) { + this.server.close(); + console.log('Lock server stopped'); + } + } +} + +// // 启动服务器 +// const pipePath = process.platform === 'win32' +// ? '\\\\.\\pipe\\rwlock-server' +// : '/tmp/rwlock.sock'; + +// const server = new NamedPipeLockServer(pipePath); +// server.start(); + +// // 优雅关闭 +// process.on('SIGINT', () => { +// console.log('Shutting down lock server...'); +// server.stop(); +// process.exit(0); +// }); + +// process.on('SIGTERM', () => { +// console.log('Shutting down lock server...'); +// server.stop(); +// process.exit(0); +// }); + +module.exports = NamedPipeLockServer; \ No newline at end of file diff --git a/package.json b/package.json index 35abb99..86274ef 100644 --- a/package.json +++ b/package.json @@ -6,10 +6,20 @@ "scripts": { "test": "jest" }, - "keywords": ["lock", "async", "file", "mutex", "synchronization", "concurrency"], + "keywords": [ + "lock", + "async", + "file", + "mutex", + "synchronization", + "concurrency" + ], "author": "nlocks contributors", "license": "MIT", "devDependencies": { "jest": "^30.2.0" + }, + "dependencies": { + "uuid": "^8.3.2" } -} \ No newline at end of file +}