feat(lock): 实现基于TCP和命名管道的读写锁服务端与客户端

新增支持通过TCP协议和命名管道进行通信的读写锁机制,包括服务端和客户端实现。
主要变更包括:

- 添加 LockServer 基类及 NamedPipeLockServer、TCPLockServer 实现
- 新增 LockClient 基类以及 NamedPipeRWLock 和 TcpRwLock 客户端实现
- 更新测试用例以适配新的连接配置方式,并增加对 TCP 锁的支持
- 调整 jest 超时设置以便更好地支持异步锁操作测试
- 导出新模块至 index.js 便于外部使用
```
This commit is contained in:
程广 2025-11-17 15:32:19 +08:00
parent a9cb0294af
commit 14d81e8f5c
9 changed files with 711 additions and 405 deletions

View File

@ -4,7 +4,7 @@ const fs = require('fs');
const os = require('os');
const path = require('path');
jest.setTimeout(30000);
jest.setTimeout(3000000);
// Helper function to create a unique pipe path for testing
const createTestPipePath = () => {
if (process.platform === 'win32') {
@ -17,12 +17,14 @@ const createTestPipePath = () => {
describe('NamedPipeLock', () => {
let server;
let pipePath;
let connect = {}
// 在所有测试之前启动服务器
beforeAll(async () => {
pipePath = createTestPipePath();
server = new NamedPipeLockServer(pipePath);
server.start();
connect = {pipePath}
server = new NamedPipeLockServer({pipePath});
await server.start();
});
// 在所有测试完成后停止服务器
@ -36,7 +38,7 @@ describe('NamedPipeLock', () => {
describe('Basic Lock Operations', () => {
test('should acquire and release read lock without waiting when not locked', async () => {
const lock = new NamedPipeRWLock('resource1', pipePath);
const lock = new NamedPipeRWLock('resource1',{connect});
const startTime = Date.now();
await lock.readLock();
@ -50,7 +52,7 @@ describe('NamedPipeLock', () => {
});
test('should acquire and release write lock without waiting when not locked', async () => {
const lock = new NamedPipeRWLock('resource2', pipePath);
const lock = new NamedPipeRWLock('resource2',{connect});
const startTime = Date.now();
await lock.writeLock();
@ -64,7 +66,7 @@ describe('NamedPipeLock', () => {
});
test('should allow consecutive acquisitions and releases without queueing', async () => {
const lock = new NamedPipeRWLock('resource3', pipePath);
const lock = new NamedPipeRWLock('resource3',{connect});
// 第一次获取
await lock.readLock();
@ -87,9 +89,9 @@ describe('NamedPipeLock', () => {
describe('Multiple Clients', () => {
test('should handle multiple concurrent read locks', async () => {
const lock1 = new NamedPipeRWLock('sharedResource', pipePath);
const lock2 = new NamedPipeRWLock('sharedResource', pipePath);
const lock3 = new NamedPipeRWLock('sharedResource', pipePath);
const lock1 = new NamedPipeRWLock('sharedResource',{connect});
const lock2 = new NamedPipeRWLock('sharedResource',{connect});
const lock3 = new NamedPipeRWLock('sharedResource',{connect});
// 所有读锁应该能够同时获取
await Promise.all([
@ -115,9 +117,9 @@ describe('NamedPipeLock', () => {
});
test('should queue write lock when read locks exist', async () => {
const readLock1 = new NamedPipeRWLock('queuedResource', pipePath);
const readLock2 = new NamedPipeRWLock('queuedResource', pipePath);
const writeLock = new NamedPipeRWLock('queuedResource', pipePath);
const readLock1 = new NamedPipeRWLock('queuedResource',{connect});
const readLock2 = new NamedPipeRWLock('queuedResource',{connect});
const writeLock = new NamedPipeRWLock('queuedResource',{connect});
// 先获取两个读锁
await readLock1.readLock();
@ -156,9 +158,9 @@ describe('NamedPipeLock', () => {
});
test('should queue read locks when write lock exists', async () => {
const writeLock = new NamedPipeRWLock('queuedResource2', pipePath);
const readLock1 = new NamedPipeRWLock('queuedResource2', pipePath);
const readLock2 = new NamedPipeRWLock('queuedResource2', pipePath);
const writeLock = new NamedPipeRWLock('queuedResource2',{connect});
const readLock1 = new NamedPipeRWLock('queuedResource2',{connect});
const readLock2 = new NamedPipeRWLock('queuedResource2',{connect});
// 先获取写锁
await writeLock.writeLock();
@ -205,7 +207,7 @@ describe('NamedPipeLock', () => {
describe('Error Handling', () => {
test('should reject when trying to acquire lock while already holding one', async () => {
const lock = new NamedPipeRWLock('errorResource', pipePath);
const lock = new NamedPipeRWLock('errorResource',{connect});
await lock.readLock();
@ -217,12 +219,12 @@ describe('NamedPipeLock', () => {
});
test('should handle lock acquisition timeout', async () => {
const lock = new NamedPipeRWLock('timeoutResource', pipePath, {
const lock = new NamedPipeRWLock('timeoutResource',{connect}, {
timeout: 100 // 设置很短的超时时间
});
// 模拟一个永远不会释放的锁场景
const blockingLock = new NamedPipeRWLock('timeoutResource', pipePath);
const blockingLock = new NamedPipeRWLock('timeoutResource',{connect});
await blockingLock.writeLock();
// 尝试获取已经被占用的锁,应该会超时

142
__tests__/tcp-lock.test.js Normal file
View File

@ -0,0 +1,142 @@
const TCPLockServer = require('../lock.tcp');
const TcpRwLock = require('../lock-client.tcp');
const net = require('net');
jest.setTimeout(30000);
// Helper function to check if a port is available
const isPortAvailable = (port) => {
return new Promise((resolve) => {
const tester = net.createServer();
tester.listen(port, () => {
tester.once('close', () => {
resolve(true);
});
tester.close();
});
tester.on('error', () => {
resolve(false);
});
});
};
// Find an available port for testing
const findAvailablePort = async (startPort = 7301) => {
let port = startPort;
while (!(await isPortAvailable(port))) {
port++;
}
return port;
};
describe('TCPLock', () => {
let server;
let port;
let connect = {};
// Start server before all tests
beforeAll(async () => {
port = await findAvailablePort();
connect = { host: 'localhost', port };
server = new TCPLockServer({ host: 'localhost', port });
await server.start();
});
// Stop server after all tests
afterAll(() => {
if (server) {
server.stop();
server = null;
}
});
test('should be able to create TCP lock client', () => {
const lock = new TcpRwLock('test-resource', { connect });
expect(lock).toBeInstanceOf(TcpRwLock);
expect(lock.host).toBe('localhost');
expect(lock.port).toBe(port);
});
test('should be able to connect to TCP lock server', async () => {
const lock = new TcpRwLock('test-resource', { connect });
try {
await lock.connect();
// Since there's no isConnected method, we'll check if we can send a request
expect(lock.socket).toBeDefined();
expect(lock.socket.destroyed).toBe(false);
} finally {
if (lock.socket) {
lock.close();
}
}
});
test('should be able to acquire and release read lock', async () => {
const lock = new TcpRwLock('test-resource', { connect });
try {
await lock.connect();
await lock.readLock();
expect(lock.isLocked).toBe(true);
await lock.unlock();
expect(lock.isLocked).toBe(false);
} finally {
if (lock.socket) {
lock.close();
}
}
});
test('should be able to acquire and release write lock', async () => {
const lock = new TcpRwLock('test-resource', { connect });
try {
await lock.connect();
await lock.writeLock();
expect(lock.isLocked).toBe(true);
await lock.unlock();
expect(lock.isLocked).toBe(false);
} finally {
if (lock.socket) {
lock.close();
}
}
});
test('should handle multiple clients', async () => {
const lock1 = new TcpRwLock('shared-resource', { connect });
const lock2 = new TcpRwLock('shared-resource', { connect });
try {
await Promise.all([
lock1.connect(),
lock2.connect()
]);
// Both clients should be able to acquire read locks
await Promise.all([
lock1.readLock(),
lock2.readLock()
]);
expect(lock1.isLocked).toBe(true);
expect(lock2.isLocked).toBe(true);
await Promise.all([
lock1.unlock(),
lock2.unlock()
]);
expect(lock1.isLocked).toBe(false);
expect(lock2.isLocked).toBe(false);
} finally {
if (lock1.socket) {
lock1.close();
}
if (lock2.socket) {
lock2.close();
}
}
});
});

View File

@ -2,8 +2,19 @@
const AsyncLock = require('./async-lock')
const FileLock = require('./file-lock')
const LockServer = require('./lock')
const NamedPipeLockServer = require('./lock.namedpipe')
const NamedPipeRWLock = require('./lock-client.namedpipe')
const TCPLockServer = require('./lock.tcp')
const TcpRwLock = require('./lock-client.tcp')
const LockClient = require('./lock-client')
module.exports = {
AsyncLock,
FileLock
FileLock,
LockServer,
NamedPipeLockServer,
NamedPipeRWLock,
TCPLockServer,
TcpRwLock,
LockClient
}

201
lock-client.js Normal file
View File

@ -0,0 +1,201 @@
// named-pipe-client.js
const net = require('net');
const { EventEmitter } = require('events');
const { v4: uuidv4 } = require('uuid');
/**
* 基于命名管道的读写锁客户端
* 提供对NamedPipeLockServer的客户端接口允许应用程序请求和释放读写锁
* 支持自动重连和超时机制
*/
class LockClient extends EventEmitter {
/**
* 创建NamedPipeRWLock实例
* @param {string} resource - 要锁定的资源名称
* @param {string} pipePath - 服务器命名管道路径
* @param {Object} options - 配置选项
* @param {number} options.timeout - 锁请求超时时间毫秒默认30000
* @param {number} options.retryInterval - 重连间隔毫秒默认1000
* @param {number} options.maxRetries - 最大重连次数默认5
* @param {object} options.connect
* @param {string} options.connect.pipePath - 命名管道路径
* @param {string} options.connect.host - 服务器主机名或IP地址
* @param {number} options.connect.port - 命名管道端口号
* @param {object} options.logger
*/
constructor(resource, options = {}) {
super();
this.resource = resource;
this._connectInfo = options.connect || {};
this.timeout = options.timeout || 30000;
this.retryInterval = options.retryInterval || 1000;
this.maxRetries = options.maxRetries || 5;
this.socket = null;
this.requestId = null;
this.isLocked = false;
this.lockType = null;
this.timeoutHandle = null;
this.retryCount = 0;
this._logger = options.logger || console;
}
/**
* 连接到锁服务器
* @returns {Promise<void>} 连接成功时resolve
*/
connect() {
}
/**
* 请求读锁
* @returns {Promise<void>} 锁获取成功时resolve
*/
async readLock() {
return this.acquireLock('readLock');
}
/**
* 请求写锁
* @returns {Promise<void>} 锁获取成功时resolve
*/
async writeLock() {
return this.acquireLock('writeLock');
}
/**
* 获取指定类型的锁
* @param {string} type - 锁类型 ('readLock' 'writeLock')
* @returns {Promise<void>} 锁获取成功时resolve
*/
async acquireLock(type) {
await this.ensureConnected();
return new Promise((resolve, reject) => {
if (this.isLocked) {
reject(new Error('Lock already held'));
return;
}
this.requestId = uuidv4();
this.lockType = type;
// 发送锁请求
this.sendMessage({
type,
resource: this.resource,
requestId: this.requestId
});
// 设置超时
this.timeoutHandle = setTimeout(() => {
this.cleanup();
reject(new Error(`Lock acquisition timeout after ${this.timeout}ms`));
}, this.timeout);
// 监听锁授予事件
const onLockGranted = (data) => {
if (data.requestId === this.requestId) {
this.isLocked = true;
clearTimeout(this.timeoutHandle);
this.removeListener('lockGranted', onLockGranted);
resolve();
}
};
this.on('lockGranted', onLockGranted);
// 监听错误事件
const onError = (errorData) => {
if (errorData.requestId === this.requestId) {
clearTimeout(this.timeoutHandle);
this.removeListener('error', onError);
reject(new Error(errorData.message));
}
};
this.on('error', onError);
});
}
/**
* 释放当前持有的锁
* @returns {Promise<void>}
*/
async unlock() {
if (!this.isLocked || !this.socket) return;
this.sendMessage({
type: 'unlock',
resource: this.resource
});
this.cleanup();
this._logger.debug(`Lock released for resource: ${this.resource}`);
}
/**
* 处理与服务器的连接断开
*/
handleDisconnect() {
this._logger.debug('Disconnected from lock server');
this.cleanup();
this.socket = null;
// 自动重连逻辑
if (this.retryCount < this.maxRetries) {
this.retryCount++;
this._logger.debug(`Attempting to reconnect... (${this.retryCount}/${this.maxRetries})`);
setTimeout(() => {
this.ensureConnected().catch(error => {
this._logger.error('Reconnection failed:', error);
});
}, this.retryInterval);
}
}
/**
* 向服务器发送消息
* @param {Object} message - 要发送的消息对象
*/
sendMessage(message) {
}
/**
* 确保客户端已连接到服务器
* @returns {Promise<void>}
*/
async ensureConnected() {
if (!this.socket || this.socket.destroyed) {
await this.connect();
}
}
/**
* 清理内部状态
*/
cleanup() {
this.isLocked = false;
this.lockType = null;
this.requestId = null;
if (this.timeoutHandle) {
clearTimeout(this.timeoutHandle);
this.timeoutHandle = null;
}
}
/**
* 关闭连接
*/
close() {
}
}
module.exports = LockClient;

View File

@ -1,6 +1,6 @@
// named-pipe-client.js
const net = require('net');
const { EventEmitter } = require('events');
const LockClient = require('./lock-client');
const { v4: uuidv4 } = require('uuid');
/**
@ -8,7 +8,7 @@ const { v4: uuidv4 } = require('uuid');
* 提供对NamedPipeLockServer的客户端接口允许应用程序请求和释放读写锁
* 支持自动重连和超时机制
*/
class NamedPipeRWLock extends EventEmitter {
class NamedPipeRWLock extends LockClient {
/**
* 创建NamedPipeRWLock实例
* @param {string} resource - 要锁定的资源名称
@ -18,41 +18,40 @@ class NamedPipeRWLock extends EventEmitter {
* @param {number} options.retryInterval - 重连间隔毫秒默认1000
* @param {number} options.maxRetries - 最大重连次数默认5
*/
constructor(resource, pipePath = '\\\\.\\pipe\\rwlock-server', options = {}) {
super();
constructor(resource, options = {}) {
super(options);
this.resource = resource;
this.pipePath = pipePath;
this.timeout = options.timeout || 30000;
this.retryInterval = options.retryInterval || 1000;
this.maxRetries = options.maxRetries || 5;
this.socket = null;
this.requestId = null;
this.isLocked = false;
this.lockType = null;
this.timeoutHandle = null;
this.retryCount = 0;
this.pipePath = options.connect.pipePath;
}
_createConnection(){
this.socket = net.createConnection(this.pipePath)
}
/**
* 连接到锁服务器
* @returns {Promise<void>} 连接成功时resolve
*/
async connect() {
connect() {
return new Promise((resolve, reject) => {
if (this.socket && !this.socket.destroyed) {
resolve();
return;
}
this.socket = net.createConnection(this.pipePath, () => {
console.log(`Connected to lock server at ${this.pipePath}`);
// this.socket = net.createConnection(this.pipePath, () => {
// console.log(`Connected to lock server at ${this.pipePath}`);
// this.retryCount = 0;
// resolve();
// });
this._createConnection()
this.socket.on('connect', () => {
this._logger.debug(`Connected to lock server at ${this.pipePath}`);
this.retryCount = 0;
resolve();
});
this.socket.on('error', (error) => {
console.error('Connection error:', error);
this._logger.error('Connection error:', error);
reject(error);
});
@ -62,93 +61,6 @@ class NamedPipeRWLock extends EventEmitter {
});
}
/**
* 请求读锁
* @returns {Promise<void>} 锁获取成功时resolve
*/
async readLock() {
return this.acquireLock('readLock');
}
/**
* 请求写锁
* @returns {Promise<void>} 锁获取成功时resolve
*/
async writeLock() {
return this.acquireLock('writeLock');
}
/**
* 获取指定类型的锁
* @param {string} type - 锁类型 ('readLock' 'writeLock')
* @returns {Promise<void>} 锁获取成功时resolve
*/
async acquireLock(type) {
await this.ensureConnected();
return new Promise((resolve, reject) => {
if (this.isLocked) {
reject(new Error('Lock already held'));
return;
}
this.requestId = uuidv4();
this.lockType = type;
// 发送锁请求
this.sendMessage({
type,
resource: this.resource,
requestId: this.requestId
});
// 设置超时
this.timeoutHandle = setTimeout(() => {
this.cleanup();
reject(new Error(`Lock acquisition timeout after ${this.timeout}ms`));
}, this.timeout);
// 监听锁授予事件
const onLockGranted = (data) => {
if (data.requestId === this.requestId) {
this.isLocked = true;
clearTimeout(this.timeoutHandle);
this.removeListener('lockGranted', onLockGranted);
resolve();
}
};
this.on('lockGranted', onLockGranted);
// 监听错误事件
const onError = (errorData) => {
if (errorData.requestId === this.requestId) {
clearTimeout(this.timeoutHandle);
this.removeListener('error', onError);
reject(new Error(errorData.message));
}
};
this.on('error', onError);
});
}
/**
* 释放当前持有的锁
* @returns {Promise<void>}
*/
async unlock() {
if (!this.isLocked || !this.socket) return;
this.sendMessage({
type: 'unlock',
resource: this.resource
});
this.cleanup();
console.log(`Lock released for resource: ${this.resource}`);
}
/**
* 处理从服务器收到的消息
* @param {Buffer} data - 接收的数据
@ -176,26 +88,6 @@ class NamedPipeRWLock extends EventEmitter {
}
}
/**
* 处理与服务器的连接断开
*/
handleDisconnect() {
console.log('Disconnected from lock server');
this.cleanup();
this.socket = null;
// 自动重连逻辑
if (this.retryCount < this.maxRetries) {
this.retryCount++;
console.log(`Attempting to reconnect... (${this.retryCount}/${this.maxRetries})`);
setTimeout(() => {
this.ensureConnected().catch(error => {
console.error('Reconnection failed:', error);
});
}, this.retryInterval);
}
}
/**
* 向服务器发送消息
@ -215,28 +107,9 @@ class NamedPipeRWLock extends EventEmitter {
}
}
/**
* 确保客户端已连接到服务器
* @returns {Promise<void>}
*/
async ensureConnected() {
if (!this.socket || this.socket.destroyed) {
await this.connect();
}
}
/**
* 清理内部状态
*/
cleanup() {
this.isLocked = false;
this.lockType = null;
this.requestId = null;
if (this.timeoutHandle) {
clearTimeout(this.timeoutHandle);
this.timeoutHandle = null;
}
}
/**
* 关闭连接

15
lock-client.tcp.js Normal file
View File

@ -0,0 +1,15 @@
const NamedPipeRWLock = require('./lock-client.namedpipe');
const net = require('net');
class TcpRwLock extends NamedPipeRWLock {
constructor(resource, options = {}) {
super(resource, options);
this.host = options.connect.host||'localhost';
this.port = options.connect.port||7301;
}
_createConnection() {
this.socket = net.createConnection({host:this.host, port:this.port})
}
}
module.exports = TcpRwLock;

264
lock.js Normal file
View File

@ -0,0 +1,264 @@
'use strict'
/**
* @typedef {Object} LockRequest
* @property {string} type - 锁类型可以是 "read" "write"
* @property {string} resource - 资源名称
* @property {string} requestId - 请求ID
*/
class LockServer {
/**
*
* @param {Object} lockOptions
* @param {string} lockOptions.pipePath - 命名管道路径
* @param {number} lockOptions.port - port // 端口 for tcp
* @param {object} lockOptions.logger // logger
*/
constructor(lockOptions) {
this.options = lockOptions;
this._logger = lockOptions.logger||console;
this.server = null;
this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] }
this.clients = new Map(); // clientId -> socket
}
/**
*
* @param {string} clientId
* @param {*} clientObject socket or other stands for client
*/
_addClient(clientId, clientObject){
this.clients.set(clientId, clientObject);
}
_removeClient(clientId){
this.clients.delete(clientId);
}
start() {
}
stop() {
}
/**
* 处理来自客户端的消息
* @param {string} clientId - 客户端标识符
* @param {LockRequest} message - 消息对象
*/
handleMessage(clientId, message) {
const { type, resource, requestId } = message;
if (!this.locks.has(resource)) {
this.locks.set(resource, {
readers: new Set(),
writer: null,
queue: []
});
}
const lock = this.locks.get(resource);
switch (type) {
case 'readLock':
this.handleReadLock(clientId, resource, requestId, lock);
break;
case 'writeLock':
this.handleWriteLock(clientId, resource, requestId, lock);
break;
case 'unlock':
this.handleUnlock(clientId, resource, lock);
break;
default:
this.sendError(clientId, `Unknown message type: ${type}`);
}
}
/**
* 处理读锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleReadLock(clientId, resource, requestId, lock) {
if (!lock.writer) {
// 可以立即获取读锁
lock.readers.add(clientId);
this.sendToClient(clientId, {
type: 'lockGranted',
requestId,
resource,
lockType: 'read'
});
this._logger.debug(`Read lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'read', requestId });
this._logger.debug(`Read lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理写锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleWriteLock(clientId, resource, requestId, lock) {
if (lock.readers.size === 0 && !lock.writer) {
// 可以立即获取写锁
lock.writer = clientId;
this.sendToClient(clientId, {
type: 'lockGranted',
requestId,
resource,
lockType: 'write'
});
this._logger.debug(`Write lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'write', requestId });
this._logger.debug(`Write lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理解锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
handleUnlock(clientId, resource, lock) {
let released = false;
// 移除读锁
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
this._logger.debug(`Read lock released by ${clientId} for ${resource}`);
}
// 移除写锁
if (lock.writer === clientId) {
lock.writer = null;
released = true;
this._logger.debug(`Write lock released by ${clientId} for ${resource}`);
}
if (released) {
// 处理等待队列
this.processQueue(resource, lock);
}
}
/**
* 处理等待队列中的锁请求
* 根据读写锁规则尽可能多地授权锁请求
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
processQueue(resource, lock) {
const granted = [];
for (let i = 0; i < lock.queue.length; i++) {
const request = lock.queue[i];
if (request.type === 'read') {
if (!lock.writer) {
// 可以授予读锁
lock.readers.add(request.clientId);
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'read'
});
granted.push(i);
this._logger.debug(`Queued read lock granted to ${request.clientId} for ${resource}`);
} else {
// 有写锁等待,读锁必须等待
break;
}
} else { // write
if (lock.readers.size === 0 && !lock.writer) {
// 可以授予写锁
lock.writer = request.clientId;
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'write'
});
granted.push(i);
this._logger.debug(`Queued write lock granted to ${request.clientId} for ${resource}`);
} else {
// 写锁必须等待前面的锁释放
break;
}
}
}
// 移除已处理的请求
if (granted.length > 0) {
lock.queue = lock.queue.filter((_, index) => !granted.includes(index));
}
}
/**
* 处理客户端断开连接
* 释放该客户端持有的所有锁并处理相关队列
* @param {string} clientId - 客户端标识符
*/
handleClientDisconnect(clientId) {
this._logger.debug(`Processing disconnect for client: ${clientId}`);
// 释放该客户端持有的所有锁
for (const [resource, lock] of this.locks) {
let released = false;
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
}
if (lock.writer === clientId) {
lock.writer = null;
released = true;
}
// 如果释放了锁,重新处理队列
if (released) {
this.processQueue(resource, lock);
}
}
}
/**
* 向指定客户端发送消息
* @param {string} clientId - 客户端标识符
* @param {Object} message - 要发送的消息对象
*/
sendToClient(clientId, message) {
const socket = this.clients.get(clientId);
if (socket && !socket.destroyed) {
try {
socket.write(JSON.stringify(message));
} catch (error) {
this._logger.error(`Error sending to client ${clientId}:`, error);
}
}
}
/**
* 向客户端发送错误消息
* @param {string} clientId - 客户端标识符
* @param {string} errorMessage - 错误信息
*/
sendError(clientId, errorMessage) {
this.sendToClient(clientId, {
type: 'error',
message: errorMessage
});
}
}
module.exports = LockServer;

View File

@ -2,8 +2,8 @@
const net = require('net');
const fs = require('fs');
const path = require('path');
const { EventEmitter } = require('events');
const async_hooks = require('async_hooks');
const LockServer = require('./lock');
const { v4: uuidv4 } = require('uuid');
const DEFAULT_PIPE_PATH = process.platform === 'win32'? '\\\\.\\pipe\\rwlock' : '/tmp/rwlock.sock';
@ -16,17 +16,15 @@ const DEFAULT_PIPE_PATH = process.platform === 'win32'? '\\\\.\\pipe\\rwlock' :
* 2. 写操作是独占的不允许同时进行读或其他写操作
* 3. 锁请求按先进先出(FIFO)方式处理
*/
class NamedPipeLockServer extends EventEmitter {
class NamedPipeLockServer extends LockServer {
/**
* 创建一个NamedPipeLockServer实例
* @param {string} pipePath - 命名管道路径默认值根据操作系统确定
*/
constructor(pipePath = DEFAULT_PIPE_PATH) {
super();
this.pipePath = pipePath;
this.server = null;
this.locks = new Map(); // resource -> { readers: Set, writer: null, queue: [] }
this.clients = new Map(); // clientId -> socket
constructor(lockOptions) {
super(lockOptions);
this.pipePath = this.options.pipePath || DEFAULT_PIPE_PATH;
}
/**
@ -50,7 +48,7 @@ class NamedPipeLockServer extends EventEmitter {
socket.clientId = clientId;
this.clients.set(clientId, socket);
console.log(`Client connected: ${clientId}`);
this._logger.debug(`Client connected: ${clientId}`);
socket.on('data', data => {
try {
@ -72,7 +70,7 @@ class NamedPipeLockServer extends EventEmitter {
socket.on('close', () => {
this.handleClientDisconnect(clientId);
this.clients.delete(clientId);
console.log(`Client disconnected: ${clientId}`);
this._logger.debug(`Client disconnected: ${clientId}`);
});
socket.on('error', error => {
@ -83,8 +81,11 @@ class NamedPipeLockServer extends EventEmitter {
});
// 启动命名管道服务器
this._start()
}
_start(){
this.server.listen(this.pipePath, () => {
console.log(`Lock server listening on named pipe: ${this.pipePath}`);
this._logger.debug(`Lock server listening on named pipe: ${this.pipePath}`);
});
this.server.on('error', error => {
@ -92,235 +93,13 @@ class NamedPipeLockServer extends EventEmitter {
});
}
/**
* 处理来自客户端的消息
* @param {string} clientId - 客户端标识符
* @param {Object} message - 消息对象
*/
handleMessage(clientId, message) {
const { type, resource, requestId } = message;
if (!this.locks.has(resource)) {
this.locks.set(resource, {
readers: new Set(),
writer: null,
queue: []
});
}
const lock = this.locks.get(resource);
switch (type) {
case 'readLock':
this.handleReadLock(clientId, resource, requestId, lock);
break;
case 'writeLock':
this.handleWriteLock(clientId, resource, requestId, lock);
break;
case 'unlock':
this.handleUnlock(clientId, resource, lock);
break;
default:
this.sendError(clientId, `Unknown message type: ${type}`);
}
}
/**
* 处理读锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleReadLock(clientId, resource, requestId, lock) {
if (!lock.writer) {
// 可以立即获取读锁
lock.readers.add(clientId);
this.sendToClient(clientId, {
type: 'lockGranted',
requestId,
resource,
lockType: 'read'
});
console.log(`Read lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'read', requestId });
console.log(`Read lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理写锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {string} requestId - 请求ID
* @param {Object} lock - 锁对象
*/
handleWriteLock(clientId, resource, requestId, lock) {
if (lock.readers.size === 0 && !lock.writer) {
// 可以立即获取写锁
lock.writer = clientId;
this.sendToClient(clientId, {
type: 'lockGranted',
requestId,
resource,
lockType: 'write'
});
console.log(`Write lock granted to ${clientId} for ${resource}`);
} else {
// 加入等待队列
lock.queue.push({ clientId, type: 'write', requestId });
console.log(`Write lock queued for ${clientId} for ${resource}`);
}
}
/**
* 处理解锁请求
* @param {string} clientId - 客户端标识符
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
handleUnlock(clientId, resource, lock) {
let released = false;
// 移除读锁
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
console.log(`Read lock released by ${clientId} for ${resource}`);
}
// 移除写锁
if (lock.writer === clientId) {
lock.writer = null;
released = true;
console.log(`Write lock released by ${clientId} for ${resource}`);
}
if (released) {
// 处理等待队列
this.processQueue(resource, lock);
}
}
/**
* 处理等待队列中的锁请求
* 根据读写锁规则尽可能多地授权锁请求
* @param {string} resource - 资源名称
* @param {Object} lock - 锁对象
*/
processQueue(resource, lock) {
const granted = [];
for (let i = 0; i < lock.queue.length; i++) {
const request = lock.queue[i];
if (request.type === 'read') {
if (!lock.writer) {
// 可以授予读锁
lock.readers.add(request.clientId);
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'read'
});
granted.push(i);
console.log(`Queued read lock granted to ${request.clientId} for ${resource}`);
} else {
// 有写锁等待,读锁必须等待
break;
}
} else { // write
if (lock.readers.size === 0 && !lock.writer) {
// 可以授予写锁
lock.writer = request.clientId;
this.sendToClient(request.clientId, {
type: 'lockGranted',
requestId: request.requestId,
resource,
lockType: 'write'
});
granted.push(i);
console.log(`Queued write lock granted to ${request.clientId} for ${resource}`);
} else {
// 写锁必须等待前面的锁释放
break;
}
}
}
// 移除已处理的请求
if (granted.length > 0) {
lock.queue = lock.queue.filter((_, index) => !granted.includes(index));
}
}
/**
* 处理客户端断开连接
* 释放该客户端持有的所有锁并处理相关队列
* @param {string} clientId - 客户端标识符
*/
handleClientDisconnect(clientId) {
console.log(`Processing disconnect for client: ${clientId}`);
// 释放该客户端持有的所有锁
for (const [resource, lock] of this.locks) {
let released = false;
if (lock.readers.has(clientId)) {
lock.readers.delete(clientId);
released = true;
}
if (lock.writer === clientId) {
lock.writer = null;
released = true;
}
// 如果释放了锁,重新处理队列
if (released) {
this.processQueue(resource, lock);
}
}
}
/**
* 向指定客户端发送消息
* @param {string} clientId - 客户端标识符
* @param {Object} message - 要发送的消息对象
*/
sendToClient(clientId, message) {
const socket = this.clients.get(clientId);
if (socket && !socket.destroyed) {
try {
socket.write(JSON.stringify(message));
} catch (error) {
console.error(`Error sending to client ${clientId}:`, error);
}
}
}
/**
* 向客户端发送错误消息
* @param {string} clientId - 客户端标识符
* @param {string} errorMessage - 错误信息
*/
sendError(clientId, errorMessage) {
this.sendToClient(clientId, {
type: 'error',
message: errorMessage
});
}
/**
* 停止服务器
*/
stop() {
if (this.server) {
this.server.close();
console.log('Lock server stopped');
this._logger.debug('Lock server stopped');
}
}
}
@ -335,13 +114,13 @@ class NamedPipeLockServer extends EventEmitter {
// // 优雅关闭
// process.on('SIGINT', () => {
// console.log('Shutting down lock server...');
// this._logger.debug('Shutting down lock server...');
// server.stop();
// process.exit(0);
// });
// process.on('SIGTERM', () => {
// console.log('Shutting down lock server...');
// this._logger.debug('Shutting down lock server...');
// server.stop();
// process.exit(0);
// });

19
lock.tcp.js Normal file
View File

@ -0,0 +1,19 @@
'use strict'
const NamedPipeLockServer = require('./lock.namedpipe');
class TCPLockServer extends NamedPipeLockServer {
constructor(lockOptions) {
super(lockOptions);
this.port = lockOptions.port || 7301;
this.host = lockOptions.host || '0.0.0.0';
}
_start(){
this.server.listen(this.port, this.host,()=>{
this._logger.debug(`Lock server listening on ${this.host}:${this.port}`);
});
this.server.on('error', (err) => {
this._logger.error(`Lock server error: ${err}`);
});
}
}
module.exports = TCPLockServer;