// named-pipe-client.js const net = require('net'); const { EventEmitter } = require('events'); const { v4: uuidv4 } = require('uuid'); /** * 基于命名管道的读写锁客户端 * 提供对NamedPipeLockServer的客户端接口,允许应用程序请求和释放读写锁 * 支持自动重连和超时机制 */ class LockClient extends EventEmitter { /** * 创建NamedPipeRWLock实例 * @param {string} resource - 要锁定的资源名称 * @param {string} pipePath - 服务器命名管道路径 * @param {Object} options - 配置选项 * @param {number} options.timeout - 锁请求超时时间(毫秒),默认30000 * @param {number} options.retryInterval - 重连间隔(毫秒),默认1000 * @param {number} options.maxRetries - 最大重连次数,默认5 * @param {object} options.connect * @param {string} options.connect.pipePath - 命名管道路径 * @param {string} options.connect.host - 服务器主机名或IP地址 * @param {number} options.connect.port - 命名管道端口号 * @param {object} options.logger */ constructor(resource, options = {}) { super(); this.resource = resource; this._connectInfo = options.connect || {}; this.timeout = options.timeout || 30000; this.retryInterval = options.retryInterval || 1000; this.maxRetries = options.maxRetries || 5; this.socket = null; this.requestId = null; this.isLocked = false; this.lockType = null; this.timeoutHandle = null; this.retryCount = 0; this._logger = options.logger || console; } /** * 连接到锁服务器 * @returns {Promise} 连接成功时resolve */ connect() { } /** * 请求读锁 * @returns {Promise} 锁获取成功时resolve */ async readLock() { return this.acquireLock('readLock'); } /** * 请求写锁 * @returns {Promise} 锁获取成功时resolve */ async writeLock() { return this.acquireLock('writeLock'); } /** * 获取指定类型的锁 * @param {string} type - 锁类型 ('readLock' 或 'writeLock') * @returns {Promise} 锁获取成功时resolve */ async acquireLock(type) { await this.ensureConnected(); return new Promise((resolve, reject) => { if (this.isLocked) { reject(new Error('Lock already held')); return; } this.requestId = uuidv4(); this.lockType = type; // 发送锁请求 this.sendMessage({ type, resource: this.resource, requestId: this.requestId }); // 设置超时 this.timeoutHandle = setTimeout(() => { this.cleanup(); reject(new Error(`Lock acquisition timeout after ${this.timeout}ms`)); }, this.timeout); // 监听锁授予事件 const onLockGranted = (data) => { if (data.requestId === this.requestId) { this.isLocked = true; clearTimeout(this.timeoutHandle); this.removeListener('lockGranted', onLockGranted); resolve(); } }; this.on('lockGranted', onLockGranted); // 监听错误事件 const onError = (errorData) => { if (errorData.requestId === this.requestId) { clearTimeout(this.timeoutHandle); this.removeListener('error', onError); reject(new Error(errorData.message)); } }; this.on('error', onError); }); } /** * 释放当前持有的锁 * @returns {Promise} */ async unlock() { if (!this.isLocked || !this.socket) return; this.sendMessage({ type: 'unlock', resource: this.resource }); this.cleanup(); this._logger.debug(`Lock released for resource: ${this.resource}`); } async release(){ await this.unlock(); } /** * 处理与服务器的连接断开 */ handleDisconnect() { this._logger.debug('Disconnected from lock server'); this.cleanup(); this.socket = null; // 自动重连逻辑 if (this.retryCount < this.maxRetries) { this.retryCount++; this._logger.debug(`Attempting to reconnect... (${this.retryCount}/${this.maxRetries})`); setTimeout(() => { this.ensureConnected().catch(error => { this._logger.error('Reconnection failed:', error); }); }, this.retryInterval); } } /** * 向服务器发送消息 * @param {Object} message - 要发送的消息对象 */ sendMessage(message) { } /** * 确保客户端已连接到服务器 * @returns {Promise} */ async ensureConnected() { if (!this.socket || this.socket.destroyed) { await this.connect(); } } /** * 清理内部状态 */ cleanup() { this.isLocked = false; this.lockType = null; this.requestId = null; if (this.timeoutHandle) { clearTimeout(this.timeoutHandle); this.timeoutHandle = null; } } /** * 关闭连接 */ close() { } } module.exports = LockClient;