diff --git a/README.md b/README.md index f63414f..a8ed17b 100644 --- a/README.md +++ b/README.md @@ -1,169 +1,160 @@ -# GoTiDB - 时序数据库 +# GoTiDB - 轻量级时序数据库 -GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专门用于存储和查询时间序列数据。它支持高效的数据写入、查询和实时数据推送功能。 +GoTiDB 是一个用 Go 语言编写的轻量级时序数据库,专为高效存储和查询时间序列数据而设计。它提供了简单而强大的 API,支持高吞吐量的数据写入和灵活的查询功能。 -## 特性 +## 功能特点 -- 高性能内存存储引擎 -- WAL(预写日志)持久化 -- REST API 接口 -- WebSocket 实时数据推送 -- NATS 消息系统集成 -- Prometheus 指标监控 -- 支持自定义标签的数据点 -- 环形缓冲区数据结构 -- 支持多种查询类型(最新值、所有值、持续时间) +- **高效存储**: 使用基于文件的存储引擎,针对时间序列数据进行了优化 +- **灵活查询**: 支持原始数据查询、最新值查询和聚合查询 +- **标签索引**: 使用多维标签索引,支持按标签快速过滤数据 +- **时间窗口**: 高效的时间窗口索引,加速时间范围查询 +- **数据压缩**: 支持自动压缩旧数据,节省存储空间 +- **数据保留**: 自动清理过期数据,支持配置保留策略 +- **并发安全**: 支持多个并发读写操作 +- **可扩展**: 模块化设计,易于扩展和定制 ## 安装 -确保你已经安装了 Go 1.16 或更高版本。 - ```bash -git clone git.pyer.club/kingecg/gotidb -cd gotidb -go mod download +go get git.pyer.club/kingecg/gotidb ``` -## 构建 +## 快速开始 -```bash -go build -o gotidb cmd/server/main.go +以下是一个简单的示例,展示如何使用 GoTiDB: + +```go +package main + +import ( + "context" + "fmt" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + _ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎 +) + +func main() { + // 创建引擎配置 + config := &engine.FileEngineConfig{ + DataDir: "/path/to/data", + SegmentSize: 1024 * 1024, // 1MB + MaxSegments: 10, + WriteBufferSize: 1000, + } + + // 创建引擎 + e, err := engine.NewEngine(engine.EngineConfig{ + Type: "file", + FileConfig: config, + }) + if err != nil { + fmt.Printf("Failed to create engine: %v\n", err) + return + } + + // 打开引擎 + if err := e.Open(); err != nil { + fmt.Printf("Failed to open engine: %v\n", err) + return + } + defer e.Close() + + // 写入数据 + points := []engine.DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 42.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + }, + }, + } + + ctx := context.Background() + if err := e.Write(ctx, points); err != nil { + fmt.Printf("Failed to write points: %v\n", err) + return + } + + // 查询数据 + query := engine.Query{ + Type: engine.QueryTypeRaw, + StartTime: time.Now().Add(-time.Hour).UnixNano(), + EndTime: time.Now().UnixNano(), + Tags: map[string]string{ + "host": "server1", + }, + } + + result, err := e.Query(ctx, query) + if err != nil { + fmt.Printf("Failed to query: %v\n", err) + return + } + + // 处理查询结果 + for _, series := range result { + fmt.Printf("Series ID: %s\n", series.SeriesID) + for _, point := range series.Points { + fmt.Printf(" Timestamp: %s, Value: %f\n", + time.Unix(0, point.Timestamp).Format(time.RFC3339), + point.Value) + } + } +} ``` -## 运行 +更多示例请参考 [examples](./examples) 目录。 -```bash -./gotidb [options] -``` +## 配置选项 -### 可用选项 +### 文件引擎配置 -- `-rest-addr`: REST API 服务地址(默认:":8080") -- `-ws-addr`: WebSocket 服务地址(默认:":8081") -- `-metrics-addr`: 指标服务地址(默认:":8082") -- `-quic-addr`: QUIC 服务地址(默认:":8083") -- `-nats-url`: NATS 服务器地址(默认:"nats://localhost:4222") -- `-persistence`: 持久化类型(none, wal, boltdb)(默认:"none") -- `-persistence-dir`: 持久化目录(默认:"./data") -- `-sync-every`: 每写入多少条数据同步一次(默认:100) -- `-config`: 配置文件路径(默认:"config.yaml") +| 选项 | 描述 | 默认值 | +|------|------|--------| +| DataDir | 数据存储目录 | 必填 | +| SegmentSize | 段文件大小限制(字节) | 64MB | +| MaxSegments | 最大段文件数量 | 100 | +| WriteBufferSize | 写入缓冲区大小(数据点数) | 1000 | +| IndexCacheSize | 索引缓存大小(字节) | 32MB | +| UseCompression | 是否启用压缩 | false | +| CompressionLevel | 压缩级别(0-9) | 6 | +| CompactThreshold | 触发压缩的阈值(段文件数量比例) | 0.7 | +| MaxOpenFiles | 最大打开文件数 | 100 | +| SyncWrites | 是否同步写入(更安全但更慢) | false | +| RetentionPeriod | 数据保留时间 | 30d | -### 持久化选项 +## 性能考虑 -GoTiDB 支持多种持久化方式: +- **写入性能**: 使用写入缓冲区和异步刷新可以提高写入性能 +- **查询性能**: 使用标签索引和时间窗口索引加速查询 +- **存储效率**: 启用压缩可以减少存储空间占用,但会增加 CPU 使用率 +- **内存使用**: 调整索引缓存大小可以平衡内存使用和查询性能 +- **文件描述符**: 调整最大打开文件数以适应系统限制 -1. **内存存储(无持久化)**:数据仅保存在内存中,服务重启后数据丢失。 - - 配置:`-persistence=none` +## 架构 -2. **WAL 日志持久化**:使用预写日志(Write-Ahead Log)进行持久化,支持数据恢复。 - - 配置:`-persistence=wal -persistence-dir=./data -sync-every=100` +GoTiDB 的核心架构包括: -3. **BoltDB 持久化**:使用 BoltDB 进行持久化,提供更高的可靠性和查询性能。 - - 配置:`-persistence=boltdb -persistence-dir=./data` - - 配置文件中可设置:`boltdb_filename`(数据库文件名)和 `boltdb_bucket_size`(数据分桶大小) +1. **引擎接口**: 定义了存储引擎的通用接口 +2. **文件引擎**: 基于文件系统的存储引擎实现 +3. **索引管理**: 标签索引和时间窗口索引 +4. **查询处理**: 原始查询、最新值查询和聚合查询 +5. **后台任务**: 数据压缩和过期数据清理 -## API 使用 +## 贡献 -### REST API +欢迎贡献代码、报告问题或提出改进建议!请遵循以下步骤: -#### 写入数据 - -```bash -curl -X POST http://localhost:8080/api/v1/write \ - -H "Content-Type: application/json" \ - -d '{ - "device_id": "device1", - "metric_code": "temperature", - "labels": { - "location": "room1" - }, - "value": 25.5 - }' -``` - -#### 批量写入数据 - -```bash -curl -X POST http://localhost:8080/api/v1/batch_write \ - -H "Content-Type: application/json" \ - -d '{ - "points": [ - { - "device_id": "device1", - "metric_code": "temperature", - "labels": { - "location": "room1" - }, - "value": 25.5 - }, - { - "device_id": "device2", - "metric_code": "humidity", - "labels": { - "location": "room2" - }, - "value": 60 - } - ] - }' -``` - -#### 查询数据 - -```bash -curl -X POST http://localhost:8080/api/v1/query \ - -H "Content-Type: application/json" \ - -d '{ - "device_id": "device1", - "metric_code": "temperature", - "labels": { - "location": "room1" - }, - "query_type": "latest" - }' -``` - -### WebSocket API - -连接 WebSocket 服务: - -```javascript -const ws = new WebSocket('ws://localhost:8081/ws'); - -// 订阅数据点 -ws.send(JSON.stringify({ - device_id: "device1", - metric_code: "temperature", - labels: { - location: "room1" - } -})); - -// 接收数据更新 -ws.onmessage = function(event) { - const data = JSON.parse(event.data); - console.log('Received update:', data); -}; -``` - -## 监控 - -访问 `http://localhost:8082/metrics` 查看 Prometheus 指标。 - -可用指标: - -- `gotidb_write_total`: 写入操作总数 -- `gotidb_query_total`: 查询操作总数 -- `gotidb_write_latency_seconds`: 写入操作延迟 -- `gotidb_query_latency_seconds`: 查询操作延迟 -- `gotidb_active_connections`: 活跃连接数 -- `gotidb_data_points_count`: 数据点数量 -- `gotidb_persistence_latency_seconds`: 持久化操作延迟 -- `gotidb_persistence_errors_total`: 持久化错误总数 -- `gotidb_messaging_latency_seconds`: 消息操作延迟 -- `gotidb_messaging_errors_total`: 消息错误总数 -- `gotidb_websocket_connections`: WebSocket 连接数 +1. Fork 项目 +2. 创建功能分支 (`git checkout -b feature/amazing-feature`) +3. 提交更改 (`git commit -m 'Add some amazing feature'`) +4. 推送到分支 (`git push origin feature/amazing-feature`) +5. 创建 Pull Request ## 许可证 -MIT License \ No newline at end of file +本项目采用 MIT 许可证 - 详见 [LICENSE](LICENSE) 文件。 \ No newline at end of file diff --git a/cmd/server/main.go b/cmd/server/main.go index 0ac09f6..2230f16 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -138,7 +138,7 @@ func main() { quicConfig = config.QuicConfig // 如果配置文件中有配置,则使用配置文件中的配置 } - quicServer, err := api.NewQUICServer(dataManager, quicConfig) + quicServer, err = api.NewQUICServer(dataManager, quicConfig) if err != nil { log.Printf("Failed to create QUIC server: %v", err) log.Println("Continuing without QUIC server") diff --git a/cmd/server/server b/cmd/server/server new file mode 100755 index 0000000..b763ba6 Binary files /dev/null and b/cmd/server/server differ diff --git a/docs/design/engine-design.md b/docs/design/engine-design.md new file mode 100644 index 0000000..5b072b1 --- /dev/null +++ b/docs/design/engine-design.md @@ -0,0 +1,450 @@ +# 存储引擎设计文档 + +## 1. 概述 + +GoTiDB存储引擎抽象层旨在提供统一的接口,使不同的存储后端可以无缝集成到系统中。本文档描述了存储引擎的设计原则、接口定义和实现建议。 + +## 2. 设计目标 + +- **抽象统一**: 提供一致的API,隐藏不同存储引擎的实现细节 +- **可扩展性**: 支持添加新的存储引擎而无需修改核心代码 +- **性能优化**: 针对时序数据的特点进行优化 +- **可配置性**: 允许通过配置调整引擎行为 + +## 3. 存储引擎接口 + +### 3.1 核心接口 + +```go +// Engine 是所有存储引擎必须实现的基础接口 +type Engine interface { + // 基本生命周期 + Open() error + Close() error + + // 数据操作 + WritePoint(ctx context.Context, point DataPoint) error + WriteBatch(ctx context.Context, points []DataPoint) error + + // 查询操作 + Query(ctx context.Context, query Query) (QueryResult, error) + + // 管理操作 + Flush() error + Compact() error + + // 监控 + Stats() EngineStats + + // 能力查询 + Capabilities() EngineCapabilities +} +``` + +### 3.2 扩展接口 + +特定引擎可以实现额外接口来提供特殊功能: + +```go +// PersistentEngine 提供持久化功能 +type PersistentEngine interface { + Engine + Backup(path string) error + Restore(path string) error +} + +// ReplicatedEngine 提供复制功能 +type ReplicatedEngine interface { + Engine + AddReplica(addr string) error + RemoveReplica(addr string) error +} +``` + +## 4. 统一查询接口 + +所有读操作通过统一的Query接口实现,提供灵活性和一致性: + +```go +// Query 定义查询参数 +type Query struct { + // 查询类型 + Type QueryType + + // 时间范围 + StartTime int64 + EndTime int64 + + // 序列标识 + SeriesID string + DeviceID string + MetricCode string + + // 标签过滤 + TagFilters []TagFilter + + // 聚合选项 + Aggregation AggregationType + AggInterval time.Duration + IncludeRawData bool + + // 结果限制 + Limit int + Offset int + + // 其他查询选项 + Options map[string]interface{} +} + +// QueryType 定义查询类型 +type QueryType int + +const ( + // 原始数据查询 + QueryTypeRaw QueryType = iota + + // 聚合查询 + QueryTypeAggregate + + // 最新值查询 + QueryTypeLatest + + // 标签查询 + QueryTypeTags + + // 元数据查询 + QueryTypeMetadata +) + +// TagFilter 定义标签过滤条件 +type TagFilter struct { + Key string + Operator FilterOperator + Value string +} + +// FilterOperator 定义过滤操作符 +type FilterOperator int + +const ( + OpEqual FilterOperator = iota + OpNotEqual + OpRegex + OpGreaterThan + OpLessThan + // 更多操作符... +) + +// AggregationType 定义聚合类型 +type AggregationType int + +const ( + AggNone AggregationType = iota + AggSum + AggAvg + AggMin + AggMax + AggCount + // 更多聚合类型... +) +``` + +### 4.1 查询结果 + +```go +// QueryResult 定义查询结果 +type QueryResult interface { + // 结果类型 + Type() QueryType +} + +// TimeSeriesResult 定义时间序列查询结果 +type TimeSeriesResult struct { + SeriesID string + Points []DataPoint +} + +// AggregateResult 定义聚合查询结果 +type AggregateResult struct { + SeriesID string + Groups []AggregateGroup +} + +type AggregateGroup struct { + StartTime int64 + EndTime int64 + Value float64 + Count int +} +``` + +### 4.2 查询构建器 + +为了简化查询构建,提供流式API: + +```go +query := NewQueryBuilder(). + ForMetric("cpu.usage"). + WithTimeRange(startTime, endTime). + WithTag("host", OpEqual, "server01"). + WithAggregation(AggAvg, 5*time.Minute). + Build() +``` + +## 5. 配置抽象 + +```go +type EngineConfig interface { + // 通用配置方法 + WithMaxRetention(duration time.Duration) EngineConfig + WithMaxPoints(points int) EngineConfig + WithFlushInterval(interval time.Duration) EngineConfig + + // 获取特定引擎的配置 + MemoryConfig() *MemoryEngineConfig + FileConfig() *FileEngineConfig + // 其他引擎... +} + +// 内存引擎特定配置 +type MemoryEngineConfig struct { + MaxPointsPerSeries int // 可配置的保留点数,替代硬编码的30 + UseCompression bool + // 其他内存引擎特定参数... +} +``` + +## 6. 引擎注册机制 + +```go +// EngineRegistry 管理所有可用的存储引擎 +type EngineRegistry struct { + engines map[string]EngineFactory +} + +// EngineFactory 创建存储引擎实例 +type EngineFactory func(config EngineConfig) (Engine, error) + +// 注册新引擎 +func (r *EngineRegistry) Register(name string, factory EngineFactory) { + r.engines[name] = factory +} + +// 创建引擎实例 +func (r *EngineRegistry) Create(name string, config EngineConfig) (Engine, error) { + if factory, ok := r.engines[name]; ok { + return factory(config) + } + return nil, fmt.Errorf("unknown engine: %s", name) +} +``` + +## 7. 性能优化建议 + +### 7.1 写入路径优化 + +实现写入缓冲区合并小批量写入: + +```go +type WriteBuffer struct { + points map[string][]DataPoint // 按序列ID分组 + mu sync.Mutex + maxSize int + flushCh chan struct{} + engine Engine +} + +func (wb *WriteBuffer) Add(point DataPoint) { + wb.mu.Lock() + seriesID := point.SeriesID() + wb.points[seriesID] = append(wb.points[seriesID], point) + size := len(wb.points) + wb.mu.Unlock() + + if size >= wb.maxSize { + wb.Flush() + } +} + +func (wb *WriteBuffer) Flush() { + wb.mu.Lock() + points := wb.points + wb.points = make(map[string][]DataPoint) + wb.mu.Unlock() + + // 批量写入引擎 + wb.engine.WriteBatch(context.Background(), points) +} +``` + +### 7.2 并发控制优化 + +实现分片锁减少锁竞争: + +```go +type ShardedLock struct { + locks []sync.RWMutex + shardMask uint64 +} + +func NewShardedLock(shards int) *ShardedLock { + // 确保分片数是2的幂 + shards = nextPowerOfTwo(shards) + return &ShardedLock{ + locks: make([]sync.RWMutex, shards), + shardMask: uint64(shards - 1), + } +} + +func (sl *ShardedLock) getLockForKey(key string) *sync.RWMutex { + h := fnv.New64() + h.Write([]byte(key)) + hashVal := h.Sum64() + return &sl.locks[hashVal&sl.shardMask] +} + +func (sl *ShardedLock) Lock(key string) { + sl.getLockForKey(key).Lock() +} + +func (sl *ShardedLock) Unlock(key string) { + sl.getLockForKey(key).Unlock() +} +``` + +### 7.3 内存优化 + +实现时序数据的紧凑存储: + +```go +// 紧凑存储时间戳和值 +type CompactTimeSeriesBlock struct { + baseTime int64 + deltaEncode []byte // 使用delta编码存储时间戳 + values []byte // 压缩存储的值 +} + +func NewCompactBlock(baseTime int64, capacity int) *CompactTimeSeriesBlock { + return &CompactTimeSeriesBlock{ + baseTime: baseTime, + deltaEncode: make([]byte, 0, capacity*binary.MaxVarintLen64), + values: make([]byte, 0, capacity*8), // 假设double值 + } +} + +func (b *CompactTimeSeriesBlock) AddPoint(timestamp int64, value float64) { + // 存储时间增量 + delta := timestamp - b.baseTime + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutVarint(buf, delta) + b.deltaEncode = append(b.deltaEncode, buf[:n]...) + + // 存储值 + bits := math.Float64bits(value) + buf = make([]byte, 8) + binary.LittleEndian.PutUint64(buf, bits) + b.values = append(b.values, buf...) +} +``` + +### 7.4 查询优化 + +实现时间范围索引: + +```go +type TimeRangeIndex struct { + // 每个时间窗口的起始位置 + windows []timeWindow + blockSize int64 // 时间窗口大小,如1小时 +} + +type timeWindow struct { + startTime int64 + endTime int64 + offset int // 数据块中的偏移 +} + +func (idx *TimeRangeIndex) FindBlocks(start, end int64) []int { + var result []int + for i, window := range idx.windows { + if window.endTime >= start && window.startTime <= end { + result = append(result, i) + } + } + return result +} +``` + +## 8. 实现路线图 + +1. **定义核心接口** + - 实现Engine接口 + - 定义Query和QueryResult结构 + +2. **重构现有引擎** + - 调整内存引擎以实现新接口 + - 使MaxPointsPerSeries可配置 + +3. **实现查询构建器** + - 创建流式API构建查询 + +4. **添加性能优化** + - 实现写入缓冲区 + - 添加分片锁 + - 优化内存使用 + +5. **实现引擎注册机制** + - 创建EngineRegistry + - 支持动态引擎选择 + +6. **添加监控和统计** + - 实现Stats接口 + - 收集性能指标 + +## 9. 使用示例 + +```go +// 创建引擎 +registry := NewEngineRegistry() +registry.Register("memory", NewMemoryEngine) +registry.Register("file", NewFileEngine) + +config := NewEngineConfig(). + WithMaxRetention(24 * time.Hour). + WithMaxPoints(1000) + +engine, err := registry.Create("memory", config) +if err != nil { + log.Fatal(err) +} + +// 写入数据 +point := DataPoint{ + DeviceID: "device1", + MetricCode: "temperature", + Labels: map[string]string{"location": "room1"}, + Value: 25.5, + Timestamp: time.Now().UnixNano(), +} +err = engine.WritePoint(context.Background(), point) + +// 查询数据 +query := NewQueryBuilder(). + ForMetric("temperature"). + WithTimeRange(startTime, endTime). + WithTag("location", OpEqual, "room1"). + Build() + +result, err := engine.Query(context.Background(), query) +if err != nil { + log.Fatal(err) +} + +// 处理结果 +if tsResult, ok := result.(*TimeSeriesResult); ok { + for _, point := range tsResult.Points { + fmt.Printf("Time: %v, Value: %v\n", + time.Unix(0, point.Timestamp), point.Value) + } +} +``` \ No newline at end of file diff --git a/examples/engine/main.go b/examples/engine/main.go new file mode 100644 index 0000000..76d7737 --- /dev/null +++ b/examples/engine/main.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + "git.pyer.club/kingecg/gotidb/pkg/engine/memory" +) + +func main() { + // 创建引擎注册表 + registry := engine.NewEngineRegistry() + + // 注册内存引擎 + memory.Register(registry) + + // 创建引擎配置 + config := engine.NewEngineConfig(). + WithMaxRetention(24 * time.Hour). + WithMaxPoints(1000) + + // 创建内存引擎实例 + eng, err := registry.Create("memory", config) + if err != nil { + log.Fatal("Failed to create engine:", err) + } + + // 打开引擎 + if err := eng.Open(); err != nil { + log.Fatal("Failed to open engine:", err) + } + defer eng.Close() + + // 写入一些测试数据 + deviceID := "device001" + metricCode := "temperature" + now := time.Now() + + // 写入单个数据点 + point := engine.DataPoint{ + DeviceID: deviceID, + MetricCode: metricCode, + Labels: map[string]string{ + "location": "room1", + "floor": "1st", + }, + Value: 25.5, + Timestamp: now.UnixNano(), + } + + if err := eng.WritePoint(context.Background(), point); err != nil { + log.Fatal("Failed to write point:", err) + } + + // 写入一批数据点 + var points []engine.DataPoint + for i := 0; i < 10; i++ { + points = append(points, engine.DataPoint{ + DeviceID: deviceID, + MetricCode: metricCode, + Labels: map[string]string{ + "location": "room1", + "floor": "1st", + }, + Value: 25.5 + float64(i), + Timestamp: now.Add(time.Duration(i) * time.Second).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + log.Fatal("Failed to write batch:", err) + } + + // 查询最新数据 + latestQuery := engine.NewQueryBuilder(). + ForMetric(metricCode). + WithTag("location", engine.OpEqual, "room1"). + Build() + latestQuery.Type = engine.QueryTypeLatest + + result, err := eng.Query(context.Background(), latestQuery) + if err != nil { + log.Fatal("Failed to query latest data:", err) + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Println("\nLatest data:") + for _, p := range tsResult.Points { + fmt.Printf("Time: %v, Value: %.2f\n", + time.Unix(0, p.Timestamp).Format(time.RFC3339), + p.Value) + } + } + + // 查询原始数据 + rawQuery := engine.NewQueryBuilder(). + ForMetric(metricCode). + WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()). + WithTag("location", engine.OpEqual, "room1"). + Build() + + result, err = eng.Query(context.Background(), rawQuery) + if err != nil { + log.Fatal("Failed to query raw data:", err) + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Println("\nRaw data:") + for _, p := range tsResult.Points { + fmt.Printf("Time: %v, Value: %.2f\n", + time.Unix(0, p.Timestamp).Format(time.RFC3339), + p.Value) + } + } + + // 查询聚合数据 + aggQuery := engine.NewQueryBuilder(). + ForMetric(metricCode). + WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.UnixNano()). + WithTag("location", engine.OpEqual, "room1"). + WithAggregation(engine.AggAvg, 5*time.Minute). + Build() + + result, err = eng.Query(context.Background(), aggQuery) + if err != nil { + log.Fatal("Failed to query aggregate data:", err) + } + + if aggResult, ok := result.(*engine.AggregateResult); ok { + fmt.Println("\nAggregate data (5-minute averages):") + for _, g := range aggResult.Groups { + fmt.Printf("Time range: %v - %v, Average: %.2f, Count: %d\n", + time.Unix(0, g.StartTime).Format(time.RFC3339), + time.Unix(0, g.EndTime).Format(time.RFC3339), + g.Value, + g.Count) + } + } + + // 打印引擎统计信息 + stats := eng.Stats() + fmt.Printf("\nEngine stats:\n") + fmt.Printf("Points count: %d\n", stats.PointsCount) + fmt.Printf("Last write time: %v\n", stats.LastWriteTime.Format(time.RFC3339)) + + // 打印引擎能力 + caps := eng.Capabilities() + fmt.Printf("\nEngine capabilities:\n") + fmt.Printf("Supports compression: %v\n", caps.SupportsCompression) + fmt.Printf("Supports persistence: %v\n", caps.SupportsPersistence) + fmt.Printf("Supports replication: %v\n", caps.SupportsReplication) + fmt.Printf("Max concurrent writes: %d\n", caps.MaxConcurrentWrites) +} diff --git a/examples/engine/multi_engine.go b/examples/engine/multi_engine.go new file mode 100644 index 0000000..14f8b76 --- /dev/null +++ b/examples/engine/multi_engine.go @@ -0,0 +1,263 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + "git.pyer.club/kingecg/gotidb/pkg/engine/file" + "git.pyer.club/kingecg/gotidb/pkg/engine/memory" +) + +func main() { + // 创建引擎注册表 + registry := engine.NewEngineRegistry() + + // 注册内存引擎和文件引擎 + memory.Register(registry) + file.Register(registry) + + // 创建临时目录用于文件引擎 + tempDir, err := os.MkdirTemp("", "gotidb_example_*") + if err != nil { + log.Fatal("Failed to create temp dir:", err) + } + defer os.RemoveAll(tempDir) + + // 创建内存引擎配置 + memConfig := engine.NewEngineConfig(). + WithMaxRetention(24 * time.Hour). + WithMaxPoints(1000) + + // 创建文件引擎配置 + fileConfig := engine.NewEngineConfig() + fileConfig.SetFileConfig(&engine.FileEngineConfig{ + DataDir: tempDir, + SegmentSize: 1024 * 1024, // 1MB + CompactWindow: time.Hour, + MaxSegments: 10, + UseCompression: true, + CompressionLevel: 6, + }) + + // 创建内存引擎和文件引擎实例 + memEng, err := registry.Create("memory", memConfig) + if err != nil { + log.Fatal("Failed to create memory engine:", err) + } + + fileEng, err := registry.Create("file", fileConfig) + if err != nil { + log.Fatal("Failed to create file engine:", err) + } + + // 打开引擎 + if err := memEng.Open(); err != nil { + log.Fatal("Failed to open memory engine:", err) + } + defer memEng.Close() + + if err := fileEng.Open(); err != nil { + log.Fatal("Failed to open file engine:", err) + } + defer fileEng.Close() + + // 演示不同场景下的引擎使用 + + // 场景1:高频写入,短期存储 - 使用内存引擎 + fmt.Println("\n=== 场景1:高频写入,短期存储(内存引擎)===") + demoHighFrequencyWrites(memEng) + + // 场景2:长期存储,历史数据查询 - 使用文件引擎 + fmt.Println("\n=== 场景2:长期存储,历史数据查询(文件引擎)===") + demoHistoricalData(fileEng) + + // 场景3:聚合查询性能对比 + fmt.Println("\n=== 场景3:聚合查询性能对比 ===") + demoAggregationComparison(memEng, fileEng) + + // 打印引擎统计信息 + printEngineStats("Memory Engine", memEng) + printEngineStats("File Engine", fileEng) +} + +// 演示高频写入场景 +func demoHighFrequencyWrites(eng engine.Engine) { + start := time.Now() + count := 1000 + + // 批量写入数据 + var points []engine.DataPoint + for i := 0; i < count; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "sensor001", + MetricCode: "temperature", + Labels: map[string]string{ + "location": "room1", + "floor": "1st", + }, + Value: 25.5 + float64(i%10), + Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write batch: %v", err) + return + } + + duration := time.Since(start) + fmt.Printf("写入 %d 个数据点耗时: %v (%.2f points/sec)\n", + count, duration, float64(count)/duration.Seconds()) + + // 查询最新数据 + query := engine.NewQueryBuilder(). + ForMetric("temperature"). + WithTag("location", engine.OpEqual, "room1"). + Build() + query.Type = engine.QueryTypeLatest + + result, err := eng.Query(context.Background(), query) + if err != nil { + log.Printf("Failed to query latest data: %v", err) + return + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Printf("最新数据点: %.2f (时间: %v)\n", + tsResult.Points[0].Value, + time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339)) + } +} + +// 演示历史数据存储和查询场景 +func demoHistoricalData(eng engine.Engine) { + // 写入跨越多个时间段的数据 + now := time.Now() + var points []engine.DataPoint + for i := 0; i < 24; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "sensor002", + MetricCode: "power", + Labels: map[string]string{ + "device": "solar_panel", + "unit": "watts", + }, + Value: 100 + float64(i*50), + Timestamp: now.Add(time.Duration(-i) * time.Hour).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write historical data: %v", err) + return + } + + // 查询24小时内的数据 + query := engine.NewQueryBuilder(). + ForMetric("power"). + WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()). + WithTag("device", engine.OpEqual, "solar_panel"). + Build() + + result, err := eng.Query(context.Background(), query) + if err != nil { + log.Printf("Failed to query historical data: %v", err) + return + } + + if tsResult, ok := result.(*engine.TimeSeriesResult); ok { + fmt.Printf("24小时内的数据点数量: %d\n", len(tsResult.Points)) + if len(tsResult.Points) > 0 { + fmt.Printf("最早数据点: %.2f (时间: %v)\n", + tsResult.Points[0].Value, + time.Unix(0, tsResult.Points[0].Timestamp).Format(time.RFC3339)) + fmt.Printf("最新数据点: %.2f (时间: %v)\n", + tsResult.Points[len(tsResult.Points)-1].Value, + time.Unix(0, tsResult.Points[len(tsResult.Points)-1].Timestamp).Format(time.RFC3339)) + } + } +} + +// 演示聚合查询性能对比 +func demoAggregationComparison(memEng, fileEng engine.Engine) { + // 准备测试数据 + now := time.Now() + var points []engine.DataPoint + for i := 0; i < 1000; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "sensor003", + MetricCode: "cpu_usage", + Labels: map[string]string{ + "host": "server1", + }, + Value: float64(30 + (i % 40)), + Timestamp: now.Add(time.Duration(-i) * time.Minute).UnixNano(), + }) + } + + // 写入两个引擎 + if err := memEng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write to memory engine: %v", err) + return + } + + if err := fileEng.WriteBatch(context.Background(), points); err != nil { + log.Printf("Failed to write to file engine: %v", err) + return + } + + // 创建聚合查询 + query := engine.NewQueryBuilder(). + ForMetric("cpu_usage"). + WithTimeRange(now.Add(-24*time.Hour).UnixNano(), now.UnixNano()). + WithTag("host", engine.OpEqual, "server1"). + WithAggregation(engine.AggAvg, 1*time.Hour). + Build() + + // 测试内存引擎聚合性能 + memStart := time.Now() + memResult, err := memEng.Query(context.Background(), query) + if err != nil { + log.Printf("Memory engine aggregation failed: %v", err) + return + } + memDuration := time.Since(memStart) + + // 测试文件引擎聚合性能 + fileStart := time.Now() + fileResult, err := fileEng.Query(context.Background(), query) + if err != nil { + log.Printf("File engine aggregation failed: %v", err) + return + } + fileDuration := time.Since(fileStart) + + // 打印性能对比 + fmt.Printf("内存引擎聚合查询耗时: %v\n", memDuration) + fmt.Printf("文件引擎聚合查询耗时: %v\n", fileDuration) + + if memAgg, ok := memResult.(*engine.AggregateResult); ok { + fmt.Printf("内存引擎聚合组数: %d\n", len(memAgg.Groups)) + } + if fileAgg, ok := fileResult.(*engine.AggregateResult); ok { + fmt.Printf("文件引擎聚合组数: %d\n", len(fileAgg.Groups)) + } +} + +// 打印引擎统计信息 +func printEngineStats(name string, eng engine.Engine) { + stats := eng.Stats() + caps := eng.Capabilities() + + fmt.Printf("\n=== %s 统计信息 ===\n", name) + fmt.Printf("数据点总数: %d\n", stats.PointsCount) + fmt.Printf("最后写入时间: %v\n", stats.LastWriteTime.Format(time.RFC3339)) + fmt.Printf("支持压缩: %v\n", caps.SupportsCompression) + fmt.Printf("支持持久化: %v\n", caps.SupportsPersistence) + fmt.Printf("支持复制: %v\n", caps.SupportsReplication) + fmt.Printf("最大并发写入: %d\n", caps.MaxConcurrentWrites) +} diff --git a/examples/file_engine_example.go b/examples/file_engine_example.go new file mode 100644 index 0000000..3f95a3c --- /dev/null +++ b/examples/file_engine_example.go @@ -0,0 +1,200 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" + _ "git.pyer.club/kingecg/gotidb/pkg/engine/file" // 导入文件引擎以注册 +) + +func main() { + // 创建临时目录 + tempDir, err := os.MkdirTemp("", "gotidb_example") + if err != nil { + fmt.Printf("Failed to create temp dir: %v\n", err) + return + } + defer os.RemoveAll(tempDir) + + // 创建引擎配置 + config := &engine.FileEngineConfig{ + DataDir: tempDir, + SegmentSize: 1024 * 1024, // 1MB + MaxSegments: 10, + WriteBufferSize: 1000, + IndexCacheSize: 1024 * 1024, // 1MB + UseCompression: false, + CompressionLevel: 0, + CompactThreshold: 0.7, + MaxOpenFiles: 100, + SyncWrites: true, + RetentionPeriod: 24 * time.Hour, + } + + // 创建引擎 + e, err := engine.NewEngine(engine.EngineConfig{ + Type: "file", + FileConfig: config, + }) + if err != nil { + fmt.Printf("Failed to create file engine: %v\n", err) + return + } + + // 打开引擎 + if err := e.Open(); err != nil { + fmt.Printf("Failed to open engine: %v\n", err) + return + } + defer e.Close() + + // 创建上下文 + ctx := context.Background() + + // 写入测试数据 + fmt.Println("Writing data points...") + points := []engine.DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 1.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + "app": "web", + }, + }, + { + Timestamp: time.Now().Add(time.Second).UnixNano(), + Value: 2.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + "app": "web", + }, + }, + { + Timestamp: time.Now().Add(2 * time.Second).UnixNano(), + Value: 3.0, + Labels: map[string]string{ + "host": "server2", + "region": "us-east", + "app": "api", + }, + }, + { + Timestamp: time.Now().Add(3 * time.Second).UnixNano(), + Value: 4.0, + Labels: map[string]string{ + "host": "server2", + "region": "us-east", + "app": "api", + }, + }, + } + + // 写入数据 + if err := e.Write(ctx, points); err != nil { + fmt.Printf("Failed to write points: %v\n", err) + return + } + + // 查询原始数据 + fmt.Println("\nQuerying raw data for server1...") + rawQuery := engine.Query{ + Type: engine.QueryTypeRaw, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + Tags: map[string]string{ + "host": "server1", + }, + Limit: 10, + } + + rawResult, err := e.Query(ctx, rawQuery) + if err != nil { + fmt.Printf("Failed to query raw data: %v\n", err) + return + } + + // 打印原始查询结果 + fmt.Printf("Raw query returned %d series\n", len(rawResult)) + for i, series := range rawResult { + fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) + fmt.Printf(" Labels: %v\n", series.Points[0].Labels) + fmt.Printf(" Points: %d\n", len(series.Points)) + for j, point := range series.Points { + fmt.Printf(" Point %d: timestamp=%s, value=%f\n", + j+1, + time.Unix(0, point.Timestamp).Format(time.RFC3339Nano), + point.Value) + } + } + + // 查询最新数据 + fmt.Println("\nQuerying latest data for each host...") + latestQuery := engine.Query{ + Type: engine.QueryTypeLatest, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + Tags: map[string]string{}, // 空标签查询所有序列 + } + + latestResult, err := e.Query(ctx, latestQuery) + if err != nil { + fmt.Printf("Failed to query latest data: %v\n", err) + return + } + + // 打印最新查询结果 + fmt.Printf("Latest query returned %d series\n", len(latestResult)) + for i, series := range latestResult { + fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) + fmt.Printf(" Labels: %v\n", series.Points[0].Labels) + for _, point := range series.Points { + fmt.Printf(" Latest point: timestamp=%s, value=%f\n", + time.Unix(0, point.Timestamp).Format(time.RFC3339Nano), + point.Value) + } + } + + // 查询聚合数据 + fmt.Println("\nQuerying aggregate data (average) for each region...") + aggQuery := engine.Query{ + Type: engine.QueryTypeAggregate, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + AggregateType: engine.AggregateTypeAvg, + Tags: map[string]string{}, // 空标签查询所有序列 + } + + aggResult, err := e.Query(ctx, aggQuery) + if err != nil { + fmt.Printf("Failed to query aggregate data: %v\n", err) + return + } + + // 打印聚合查询结果 + fmt.Printf("Aggregate query returned %d series\n", len(aggResult)) + for i, series := range aggResult { + fmt.Printf("Series %d (ID: %s):\n", i+1, series.SeriesID) + fmt.Printf(" Labels: %v\n", series.Points[0].Labels) + for _, point := range series.Points { + fmt.Printf(" Average value: %f\n", point.Value) + } + } + + // 获取引擎统计信息 + stats := e.Stats() + fmt.Println("\nEngine statistics:") + fmt.Printf(" Points count: %d\n", stats.PointsCount) + fmt.Printf(" Segments count: %d\n", stats.SegmentsCount) + fmt.Printf(" Last write time: %s\n", stats.LastWriteTime.Format(time.RFC3339)) + if !stats.LastCompactionTime.IsZero() { + fmt.Printf(" Last compaction time: %s\n", stats.LastCompactionTime.Format(time.RFC3339)) + } else { + fmt.Printf(" Last compaction time: Never\n") + } +} diff --git a/go.mod b/go.mod index eee35cb..8168b38 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/stretchr/testify v1.10.0 ) -require go.etcd.io/bbolt v1.4.1 // indirect +require go.etcd.io/bbolt v1.4.1 require ( github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 5df72a2..50e4cf9 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= @@ -128,15 +127,11 @@ golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go new file mode 100644 index 0000000..c0f1c0f --- /dev/null +++ b/pkg/engine/engine.go @@ -0,0 +1,80 @@ +package engine + +import ( + "context" + "time" +) + +// Engine 是所有存储引擎必须实现的基础接口 +type Engine interface { + // 基本生命周期 + Open() error + Close() error + + // 数据操作 + WritePoint(ctx context.Context, point DataPoint) error + WriteBatch(ctx context.Context, points []DataPoint) error + + // 查询操作 + Query(ctx context.Context, query Query) (QueryResult, error) + + // 管理操作 + Flush() error + Compact() error + + // 监控 + Stats() EngineStats + + // 能力查询 + Capabilities() EngineCapabilities +} + +// PersistentEngine 提供持久化功能 +type PersistentEngine interface { + Engine + Backup(path string) error + Restore(path string) error +} + +// ReplicatedEngine 提供复制功能 +type ReplicatedEngine interface { + Engine + AddReplica(addr string) error + RemoveReplica(addr string) error +} + +// DataPoint 表示一个时间序列数据点 +type DataPoint struct { + DeviceID string `json:"device_id"` + MetricCode string `json:"metric_code"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp int64 `json:"timestamp"` +} + +// SeriesID 生成数据点的序列ID +func (p *DataPoint) SeriesID() string { + // 简单实现,实际可能需要更复杂的ID生成逻辑 + return p.DeviceID + ":" + p.MetricCode +} + +// EngineStats 包含引擎的统计信息 +type EngineStats struct { + PointsCount int64 + SeriesCount int64 + MemoryUsage int64 + LastWriteTime time.Time + LastCompactTime time.Time + WriteLatency time.Duration + QueryLatency time.Duration + // 其他统计信息... +} + +// EngineCapabilities 描述引擎支持的功能 +type EngineCapabilities struct { + SupportsCompression bool + SupportsPersistence bool + SupportsReplication bool + MaxConcurrentWrites int + // 其他能力指标... +} diff --git a/pkg/engine/file/file.go b/pkg/engine/file/file.go new file mode 100644 index 0000000..77a9991 --- /dev/null +++ b/pkg/engine/file/file.go @@ -0,0 +1,250 @@ +// findSeriesByTags 根据标签查找序列 +func (e *FileEngine) findSeriesByTags(tags map[string]string) ([]string, error) { + e.tagIndex.mu.RLock() + defer e.tagIndex.mu.RUnlock() + + if len(tags) == 0 { + // 如果没有指定标签,返回所有序列 + allSeries := make(map[string]struct{}) + for _, valueMap := range e.tagIndex.index { + for _, seriesIDs := range valueMap { + for _, seriesID := range seriesIDs { + allSeries[seriesID] = struct{}{} + } + } + } + + // 转换为切片 + result := make([]string, 0, len(allSeries)) + for seriesID := range allSeries { + result = append(result, seriesID) + } + return result, nil + } + + // 对于每个标签,找到匹配的序列 + var matchedSeries map[string]struct{} + first := true + + for key, value := range tags { + // 获取标签值映射 + valueMap, ok := e.tagIndex.index[key] + if !ok { + // 如果标签键不存在,返回空结果 + return []string{}, nil + } + + // 获取匹配标签值的序列 + seriesIDs, ok := valueMap[value] + if !ok { + // 如果标签值不存在,返回空结果 + return []string{}, nil + } + + // 初始化或取交集 + if first { + matchedSeries = make(map[string]struct{}) + for _, id := range seriesIDs { + matchedSeries[id] = struct{}{} + } + first = false + } else { + // 取交集 + newMatched := make(map[string]struct{}) + for _, id := range seriesIDs { + if _, ok := matchedSeries[id]; ok { + newMatched[id] = struct{}{} + } + } + matchedSeries = newMatched + } + + // 如果没有匹配的序列,提前返回 + if len(matchedSeries) == 0 { + return []string{}, nil + } + } + + // 转换为切片 + result := make([]string, 0, len(matchedSeries)) + for seriesID := range matchedSeries { + result = append(result, seriesID) + } + return result, nil +} + +// readPointsFromSegment 从段文件中读取数据点 +func (e *FileEngine) readPointsFromSegment(segment *Segment, offset int64, count int) ([]engine.DataPoint, error) { + segment.mu.RLock() + defer segment.mu.RUnlock() + + // 如果文件为空,直接返回 + if segment.size == 0 { + return []engine.DataPoint{}, nil + } + + // 移动文件指针到指定位置 + _, err := segment.file.Seek(offset, 0) + if err != nil { + return nil, fmt.Errorf("failed to seek segment file: %v", err) + } + + // 读取指定数量的数据点 + points := make([]engine.DataPoint, 0, count) + for i := 0; i < count; i++ { + // 读取时间戳(8字节) + var timestamp int64 + err := binary.Read(segment.file, binary.LittleEndian, ×tamp) + if err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("failed to read timestamp: %v", err) + } + + // 读取值(8字节) + var value float64 + err = binary.Read(segment.file, binary.LittleEndian, &value) + if err != nil { + return nil, fmt.Errorf("failed to read value: %v", err) + } + + // 创建数据点 + point := engine.DataPoint{ + Timestamp: timestamp, + Value: value, + } + points = append(points, point) + } + + return points, nil +} + +// findTimeWindows 查找指定时间范围内的时间窗口 +func (e *FileEngine) findTimeWindows(seriesID string, startTime, endTime int64) ([]timeWindow, error) { + e.timeIndex.mu.RLock() + defer e.timeIndex.mu.RUnlock() + + // 获取序列的所有时间窗口 + windows, ok := e.timeIndex.windows[seriesID] + if !ok { + return nil, nil // 序列不存在 + } + + // 找到所有与时间范围重叠的窗口 + var matchedWindows []timeWindow + for _, window := range windows { + // 检查窗口是否与查询时间范围重叠 + if window.endTime >= startTime && window.startTime <= endTime { + matchedWindows = append(matchedWindows, window) + } + } + + // 按时间排序 + sort.Slice(matchedWindows, func(i, j int) bool { + return matchedWindows[i].startTime < matchedWindows[j].startTime + }) + + return matchedWindows, nil +} + +// getLabelsForSeries 获取序列的标签 +func (e *FileEngine) getLabelsForSeries(seriesID string) map[string]string { + e.tagIndex.mu.RLock() + defer e.tagIndex.mu.RUnlock() + + labels := make(map[string]string) + + // 遍历所有标签 + for key, valueMap := range e.tagIndex.index { + // 遍历每个标签值 + for value, seriesIDs := range valueMap { + // 检查序列ID是否在列表中 + for _, id := range seriesIDs { + if id == seriesID { + labels[key] = value + break + } + } + } + } + + return labels +} + +// newTimeIndex 创建新的时间索引 +func newTimeIndex() *TimeIndex { + return &TimeIndex{ + windows: make(map[string][]timeWindow), + } +} + +// newTagIndex 创建新的标签索引 +func newTagIndex() *TagIndex { + return &TagIndex{ + index: make(map[string]map[string][]string), + } +} + +// WritePoint 向段文件写入数据点 +func (s *Segment) WritePoint(point engine.DataPoint) (int64, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // 获取当前偏移量 + offset, err := s.file.Seek(0, io.SeekCurrent) + if err != nil { + return 0, fmt.Errorf("failed to get file offset: %v", err) + } + + // 写入时间戳(8字节) + err = binary.Write(s.file, binary.LittleEndian, point.Timestamp) + if err != nil { + return 0, fmt.Errorf("failed to write timestamp: %v", err) + } + + // 写入值(8字节) + err = binary.Write(s.file, binary.LittleEndian, point.Value) + if err != nil { + return 0, fmt.Errorf("failed to write value: %v", err) + } + + // 更新段文件信息 + s.size += 16 // 每个数据点16字节 + s.pointCount++ + if s.startTime == 0 || point.Timestamp < s.startTime { + s.startTime = point.Timestamp + } + if point.Timestamp > s.endTime { + s.endTime = point.Timestamp + } + + return offset, nil +} + +// Close 关闭段文件 +func (s *Segment) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.file != nil { + if err := s.file.Close(); err != nil { + return fmt.Errorf("failed to close segment file: %v", err) + } + s.file = nil + } + return nil +} + +// Sync 将段文件同步到磁盘 +func (s *Segment) Sync() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.file != nil { + if err := s.file.Sync(); err != nil { + return fmt.Errorf("failed to sync segment file: %v", err) + } + } + return nil +} \ No newline at end of file diff --git a/pkg/engine/file/file_test.go b/pkg/engine/file/file_test.go new file mode 100644 index 0000000..a1b2eed --- /dev/null +++ b/pkg/engine/file/file_test.go @@ -0,0 +1,198 @@ +package file + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" +) + +// TestFileEngine 测试文件引擎的基本功能 +func TestFileEngine(t *testing.T) { + // 创建临时目录 + tempDir, err := os.MkdirTemp("", "gotidb_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + // 创建引擎配置 + config := &engine.FileEngineConfig{ + DataDir: tempDir, + SegmentSize: 1024 * 1024, // 1MB + MaxSegments: 10, + WriteBufferSize: 1000, + IndexCacheSize: 1024 * 1024, // 1MB + UseCompression: false, + CompressionLevel: 0, + CompactThreshold: 0.7, + MaxOpenFiles: 100, + SyncWrites: true, + RetentionPeriod: 24 * time.Hour, + } + + // 创建引擎 + e, err := NewFileEngine(engine.EngineConfig{ + Type: "file", + FileConfig: config, + }) + if err != nil { + t.Fatalf("Failed to create file engine: %v", err) + } + + // 打开引擎 + if err := e.Open(); err != nil { + t.Fatalf("Failed to open engine: %v", err) + } + defer e.Close() + + // 写入测试数据 + ctx := context.Background() + points := []engine.DataPoint{ + { + Timestamp: time.Now().UnixNano(), + Value: 1.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + }, + }, + { + Timestamp: time.Now().Add(time.Second).UnixNano(), + Value: 2.0, + Labels: map[string]string{ + "host": "server1", + "region": "us-west", + }, + }, + { + Timestamp: time.Now().Add(2 * time.Second).UnixNano(), + Value: 3.0, + Labels: map[string]string{ + "host": "server2", + "region": "us-east", + }, + }, + } + + // 写入数据 + if err := e.Write(ctx, points); err != nil { + t.Fatalf("Failed to write points: %v", err) + } + + // 查询原始数据 + query := engine.Query{ + Type: engine.QueryTypeRaw, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + Tags: map[string]string{ + "host": "server1", + }, + Limit: 10, + } + + result, err := e.Query(ctx, query) + if err != nil { + t.Fatalf("Failed to query: %v", err) + } + + // 验证查询结果 + if len(result) == 0 { + t.Fatalf("Expected non-empty result") + } + + // 检查结果中的数据点数量 + seriesResult := result[0] + if len(seriesResult.Points) != 2 { + t.Fatalf("Expected 2 points, got %d", len(seriesResult.Points)) + } + + // 查询最新数据 + latestQuery := engine.Query{ + Type: engine.QueryTypeLatest, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + Tags: map[string]string{ + "host": "server1", + }, + } + + latestResult, err := e.Query(ctx, latestQuery) + if err != nil { + t.Fatalf("Failed to query latest: %v", err) + } + + // 验证最新查询结果 + if len(latestResult) == 0 { + t.Fatalf("Expected non-empty latest result") + } + + // 检查最新结果中的数据点 + latestSeriesResult := latestResult[0] + if len(latestSeriesResult.Points) != 1 { + t.Fatalf("Expected 1 point in latest result, got %d", len(latestSeriesResult.Points)) + } + + // 查询聚合数据 + aggQuery := engine.Query{ + Type: engine.QueryTypeAggregate, + StartTime: time.Now().Add(-time.Minute).UnixNano(), + EndTime: time.Now().Add(time.Minute).UnixNano(), + AggregateType: engine.AggregateTypeAvg, + Tags: map[string]string{ + "host": "server1", + }, + } + + aggResult, err := e.Query(ctx, aggQuery) + if err != nil { + t.Fatalf("Failed to query aggregate: %v", err) + } + + // 验证聚合查询结果 + if len(aggResult) == 0 { + t.Fatalf("Expected non-empty aggregate result") + } + + // 检查聚合结果中的数据点 + aggSeriesResult := aggResult[0] + if len(aggSeriesResult.Points) != 1 { + t.Fatalf("Expected 1 point in aggregate result, got %d", len(aggSeriesResult.Points)) + } + + // 验证平均值 + avgValue := aggSeriesResult.Points[0].Value + if avgValue != 1.5 { // (1.0 + 2.0) / 2 = 1.5 + t.Fatalf("Expected avg value 1.5, got %f", avgValue) + } + + // 获取引擎统计信息 + stats := e.Stats() + if stats.PointsCount != 3 { + t.Fatalf("Expected 3 points in stats, got %d", stats.PointsCount) + } + + // 检查段文件是否创建 + files, err := os.ReadDir(tempDir) + if err != nil { + t.Fatalf("Failed to read data directory: %v", err) + } + if len(files) == 0 { + t.Fatalf("Expected segment files to be created") + } + + // 检查是否有.seg文件 + var segFileFound bool + for _, file := range files { + if filepath.Ext(file.Name()) == ".seg" { + segFileFound = true + break + } + } + if !segFileFound { + t.Fatalf("Expected .seg files to be created") + } +} diff --git a/pkg/engine/file/query.go b/pkg/engine/file/query.go new file mode 100644 index 0000000..27d3e5f --- /dev/null +++ b/pkg/engine/file/query.go @@ -0,0 +1,340 @@ +package file + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "sort" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" +) + +// queryRaw 实现原始数据查询 +func (e *FileEngine) queryRaw(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + result := &engine.TimeSeriesResult{ + SeriesID: query.SeriesID, + Points: make([]engine.DataPoint, 0), + } + + // 如果指定了SeriesID,只查询特定序列 + if query.SeriesID != "" { + points, err := e.querySeriesPoints(query.SeriesID, query.StartTime, query.EndTime) + if err != nil { + return nil, err + } + result.Points = points + return result, nil + } + + // 否则查询所有匹配的序列 + matchedSeries := e.findMatchingSeries(query) + for _, seriesID := range matchedSeries { + points, err := e.querySeriesPoints(seriesID, query.StartTime, query.EndTime) + if err != nil { + return nil, err + } + result.Points = append(result.Points, points...) + } + + // 按时间戳排序 + sort.Slice(result.Points, func(i, j int) bool { + return result.Points[i].Timestamp < result.Points[j].Timestamp + }) + + return result, nil +} + +// queryLatest 实现最新数据查询 +func (e *FileEngine) queryLatest(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + result := &engine.TimeSeriesResult{ + SeriesID: query.SeriesID, + Points: make([]engine.DataPoint, 0), + } + + if query.SeriesID != "" { + point, err := e.queryLatestPoint(query.SeriesID) + if err != nil { + return nil, err + } + if point != nil { + result.Points = append(result.Points, *point) + } + return result, nil + } + + matchedSeries := e.findMatchingSeries(query) + for _, seriesID := range matchedSeries { + point, err := e.queryLatestPoint(seriesID) + if err != nil { + return nil, err + } + if point != nil { + result.Points = append(result.Points, *point) + } + } + + return result, nil +} + +// queryAggregate 实现聚合查询 +func (e *FileEngine) queryAggregate(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + result := &engine.AggregateResult{ + SeriesID: query.SeriesID, + Groups: make([]engine.AggregateGroup, 0), + } + + // 创建时间窗口 + windows := splitTimeRange(query.StartTime, query.EndTime, query.AggInterval) + + // 如果指定了SeriesID,只聚合特定序列 + if query.SeriesID != "" { + groups, err := e.aggregateSeriesInWindows(query.SeriesID, windows, query.Aggregation) + if err != nil { + return nil, err + } + result.Groups = groups + return result, nil + } + + // 否则聚合所有匹配的序列 + matchedSeries := e.findMatchingSeries(query) + for _, window := range windows { + group := engine.AggregateGroup{ + StartTime: window.start, + EndTime: window.end, + } + + var totalSum float64 + var totalCount int + + for _, seriesID := range matchedSeries { + points, err := e.querySeriesPoints(seriesID, window.start, window.end) + if err != nil { + return nil, err + } + + windowSum, windowCount := aggregatePoints(points, query.Aggregation) + totalSum += windowSum + totalCount += windowCount + } + + if totalCount > 0 { + group.Value = calculateAggregateValue(totalSum, totalCount, query.Aggregation) + group.Count = totalCount + result.Groups = append(result.Groups, group) + } + } + + return result, nil +} + +// 辅助方法 + +func (e *FileEngine) querySeriesPoints(seriesID string, startTime, endTime int64) ([]engine.DataPoint, error) { + e.timeIndex.mu.RLock() + windows := e.timeIndex.windows[seriesID] + e.timeIndex.mu.RUnlock() + + var points []engine.DataPoint + for _, window := range windows { + if window.endTime < startTime || window.startTime > endTime { + continue + } + + segment, ok := e.segments[window.segmentID] + if !ok { + continue + } + + segmentPoints, err := e.readSegmentPoints(segment, window.offset, startTime, endTime) + if err != nil { + return nil, err + } + points = append(points, segmentPoints...) + } + + return points, nil +} + +func (e *FileEngine) queryLatestPoint(seriesID string) (*engine.DataPoint, error) { + e.timeIndex.mu.RLock() + windows := e.timeIndex.windows[seriesID] + e.timeIndex.mu.RUnlock() + + if len(windows) == 0 { + return nil, nil + } + + // 找到最新的时间窗口 + var latestWindow timeWindow + for _, window := range windows { + if window.endTime > latestWindow.endTime { + latestWindow = window + } + } + + segment, ok := e.segments[latestWindow.segmentID] + if !ok { + return nil, nil + } + + points, err := e.readSegmentPoints(segment, latestWindow.offset, 0, time.Now().UnixNano()) + if err != nil { + return nil, err + } + + if len(points) == 0 { + return nil, nil + } + + // 返回最新的点 + latestPoint := points[len(points)-1] + return &latestPoint, nil +} + +func (e *FileEngine) readSegmentPoints(segment *Segment, offset int64, startTime, endTime int64) ([]engine.DataPoint, error) { + segment.mu.RLock() + defer segment.mu.RUnlock() + + var points []engine.DataPoint + + // 移动到指定偏移 + if _, err := segment.file.Seek(offset, io.SeekStart); err != nil { + return nil, err + } + + // 读取数据点 + for { + var timestamp int64 + var value float64 + + err := binary.Read(segment.file, binary.BigEndian, ×tamp) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + err = binary.Read(segment.file, binary.BigEndian, &value) + if err != nil { + return nil, err + } + + if timestamp >= startTime && timestamp <= endTime { + points = append(points, engine.DataPoint{ + Timestamp: timestamp, + Value: value, + }) + } + } + + return points, nil +} + +func (e *FileEngine) findMatchingSeries(query engine.Query) []string { + e.tagIndex.mu.RLock() + defer e.tagIndex.mu.RUnlock() + + var matchedSeries []string + seriesMap := make(map[string]bool) + + // 首先根据标签过滤器找到匹配的序列 + for _, filter := range query.TagFilters { + if values, ok := e.tagIndex.index[filter.Key]; ok { + if series, ok := values[filter.Value]; ok { + for _, seriesID := range series { + seriesMap[seriesID] = true + } + } + } + } + + // 转换为切片 + for seriesID := range seriesMap { + matchedSeries = append(matchedSeries, seriesID) + } + + return matchedSeries +} + +func (e *FileEngine) aggregateSeriesInWindows(seriesID string, windows []timeWindow, aggType engine.AggregationType) ([]engine.AggregateGroup, error) { + var groups []engine.AggregateGroup + + for _, window := range windows { + points, err := e.querySeriesPoints(seriesID, window.start, window.end) + if err != nil { + return nil, err + } + + if len(points) > 0 { + sum, count := aggregatePoints(points, aggType) + groups = append(groups, engine.AggregateGroup{ + StartTime: window.start, + EndTime: window.end, + Value: calculateAggregateValue(sum, count, aggType), + Count: count, + }) + } + } + + return groups, nil +} + +func aggregatePoints(points []engine.DataPoint, aggType engine.AggregationType) (float64, int) { + if len(points) == 0 { + return 0, 0 + } + + var sum float64 + for _, p := range points { + sum += p.Value + } + + return sum, len(points) +} + +func calculateAggregateValue(sum float64, count int, aggType engine.AggregationType) float64 { + if count == 0 { + return 0 + } + + switch aggType { + case engine.AggSum: + return sum + case engine.AggAvg: + return sum / float64(count) + case engine.AggCount: + return float64(count) + default: + return sum + } +} + +func splitTimeRange(start, end int64, interval time.Duration) []timeWindow { + var windows []timeWindow + intervalNanos := interval.Nanoseconds() + + for windowStart := start; windowStart < end; windowStart += intervalNanos { + windowEnd := windowStart + intervalNanos + if windowEnd > end { + windowEnd = end + } + windows = append(windows, timeWindow{ + start: windowStart, + end: windowEnd, + }) + } + + return windows +} + +// 错误处理辅助函数 +func wrapError(op string, err error) error { + if err == nil { + return nil + } + return fmt.Errorf("%s: %v", op, err) +} diff --git a/pkg/engine/file/register.go b/pkg/engine/file/register.go new file mode 100644 index 0000000..320cf18 --- /dev/null +++ b/pkg/engine/file/register.go @@ -0,0 +1,8 @@ +package file + +import "git.pyer.club/kingecg/gotidb/pkg/engine" + +// Register 注册文件存储引擎到引擎注册表 +func Register(registry *engine.EngineRegistry) { + registry.Register("file", NewFileEngine) +} diff --git a/pkg/engine/memory/memory.go b/pkg/engine/memory/memory.go new file mode 100644 index 0000000..28bf5d1 --- /dev/null +++ b/pkg/engine/memory/memory.go @@ -0,0 +1,441 @@ +package memory + +import ( + "context" + "fmt" + "sync" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" +) + +// MemoryEngine 实现基于内存的存储引擎 +type MemoryEngine struct { + // 配置 + config *engine.MemoryEngineConfig + + // 数据存储 + series map[string]*SeriesData + + // 并发控制 + lock *engine.ShardedLock + + // 写入缓冲 + writeBuffer *engine.WriteBuffer + + // 统计信息 + stats engine.EngineStats + + // 关闭控制 + closed bool + mu sync.RWMutex +} + +// SeriesData 存储单个时间序列的数据 +type SeriesData struct { + // 最新数据点的环形缓冲区 + buffer *engine.CircularBuffer + + // 元数据 + deviceID string + metricCode string + labels map[string]string + + // 统计信息 + lastWriteTime time.Time + pointsCount int64 +} + +// NewMemoryEngine 创建新的内存存储引擎 +func NewMemoryEngine(config engine.EngineConfig) (engine.Engine, error) { + memConfig := config.MemoryConfig() + if memConfig == nil { + return nil, fmt.Errorf("memory engine config is required") + } + + e := &MemoryEngine{ + config: memConfig, + series: make(map[string]*SeriesData), + lock: engine.NewShardedLock(16), // 16个分片 + } + + // 创建写入缓冲区 + e.writeBuffer = engine.NewWriteBuffer(e, 1000) // 缓冲1000个点 + + return e, nil +} + +// Open 实现Engine接口 +func (e *MemoryEngine) Open() error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.closed { + e.closed = false + e.series = make(map[string]*SeriesData) + } + return nil +} + +// Close 实现Engine接口 +func (e *MemoryEngine) Close() error { + e.mu.Lock() + defer e.mu.Unlock() + + if !e.closed { + e.closed = true + // 刷新缓冲区 + if err := e.writeBuffer.Flush(); err != nil { + return err + } + // 清理数据 + e.series = nil + } + return nil +} + +// WritePoint 实现Engine接口 +func (e *MemoryEngine) WritePoint(ctx context.Context, point engine.DataPoint) error { + if e.closed { + return fmt.Errorf("engine is closed") + } + + // 写入缓冲区 + return e.writeBuffer.Add(point) +} + +// WriteBatch 实现Engine接口 +func (e *MemoryEngine) WriteBatch(ctx context.Context, points []engine.DataPoint) error { + if e.closed { + return fmt.Errorf("engine is closed") + } + + for _, point := range points { + seriesID := point.SeriesID() + + // 获取或创建序列数据 + e.lock.Lock(seriesID) + series, ok := e.series[seriesID] + if !ok { + series = &SeriesData{ + buffer: engine.NewCircularBuffer(e.config.MaxPointsPerSeries), + deviceID: point.DeviceID, + metricCode: point.MetricCode, + labels: point.Labels, + } + e.series[seriesID] = series + } + e.lock.Unlock(seriesID) + + // 写入数据点 + e.lock.Lock(seriesID) + series.buffer.Add(point) + series.lastWriteTime = time.Now() + series.pointsCount++ + e.lock.Unlock(seriesID) + + // 更新统计信息 + e.mu.Lock() + e.stats.PointsCount++ + e.stats.LastWriteTime = time.Now() + e.mu.Unlock() + } + + return nil +} + +// Query 实现Engine接口 +func (e *MemoryEngine) Query(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + if e.closed { + return nil, fmt.Errorf("engine is closed") + } + + switch query.Type { + case engine.QueryTypeRaw: + return e.queryRaw(ctx, query) + case engine.QueryTypeLatest: + return e.queryLatest(ctx, query) + case engine.QueryTypeAggregate: + return e.queryAggregate(ctx, query) + default: + return nil, fmt.Errorf("unsupported query type: %v", query.Type) + } +} + +// queryRaw 查询原始数据 +func (e *MemoryEngine) queryRaw(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + result := &engine.TimeSeriesResult{ + SeriesID: query.SeriesID, + Points: make([]engine.DataPoint, 0), + } + + // 如果指定了SeriesID,只查询特定序列 + if query.SeriesID != "" { + e.lock.RLock(query.SeriesID) + series, ok := e.series[query.SeriesID] + e.lock.RUnlock(query.SeriesID) + + if !ok { + return result, nil + } + + points := series.buffer.GetRecent(series.buffer.Size()) + for _, point := range points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + result.Points = append(result.Points, point) + } + } + return result, nil + } + + // 否则查询所有匹配的序列 + e.mu.RLock() + for seriesID, series := range e.series { + // 检查是否匹配查询条件 + if query.DeviceID != "" && series.deviceID != query.DeviceID { + continue + } + if query.MetricCode != "" && series.metricCode != query.MetricCode { + continue + } + if !matchTags(series.labels, query.TagFilters) { + continue + } + + e.lock.RLock(seriesID) + points := series.buffer.GetRecent(series.buffer.Size()) + e.lock.RUnlock(seriesID) + + for _, point := range points { + if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime { + result.Points = append(result.Points, point) + } + } + } + e.mu.RUnlock() + + return result, nil +} + +// queryLatest 查询最新数据 +func (e *MemoryEngine) queryLatest(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + result := &engine.TimeSeriesResult{ + SeriesID: query.SeriesID, + Points: make([]engine.DataPoint, 0), + } + + if query.SeriesID != "" { + e.lock.RLock(query.SeriesID) + series, ok := e.series[query.SeriesID] + e.lock.RUnlock(query.SeriesID) + + if !ok { + return result, nil + } + + points := series.buffer.GetRecent(1) + if len(points) > 0 { + result.Points = append(result.Points, points[0]) + } + return result, nil + } + + e.mu.RLock() + for seriesID, series := range e.series { + if query.DeviceID != "" && series.deviceID != query.DeviceID { + continue + } + if query.MetricCode != "" && series.metricCode != query.MetricCode { + continue + } + if !matchTags(series.labels, query.TagFilters) { + continue + } + + e.lock.RLock(seriesID) + points := series.buffer.GetRecent(1) + e.lock.RUnlock(seriesID) + + if len(points) > 0 { + result.Points = append(result.Points, points[0]) + } + } + e.mu.RUnlock() + + return result, nil +} + +// queryAggregate 查询聚合数据 +func (e *MemoryEngine) queryAggregate(ctx context.Context, query engine.Query) (engine.QueryResult, error) { + result := &engine.AggregateResult{ + SeriesID: query.SeriesID, + Groups: make([]engine.AggregateGroup, 0), + } + + // 创建时间窗口 + windows := splitTimeRange(query.StartTime, query.EndTime, query.AggInterval) + for _, window := range windows { + group := engine.AggregateGroup{ + StartTime: window.start, + EndTime: window.end, + } + + // 聚合每个窗口内的数据 + if query.SeriesID != "" { + e.aggregateSeriesInWindow(query.SeriesID, window.start, window.end, query.Aggregation, &group) + } else { + e.aggregateAllSeriesInWindow(query, window.start, window.end, &group) + } + + result.Groups = append(result.Groups, group) + } + + return result, nil +} + +// Flush 实现Engine接口 +func (e *MemoryEngine) Flush() error { + return e.writeBuffer.Flush() +} + +// Compact 实现Engine接口 +func (e *MemoryEngine) Compact() error { + // 内存引擎不需要压缩 + return nil +} + +// Stats 实现Engine接口 +func (e *MemoryEngine) Stats() engine.EngineStats { + e.mu.RLock() + defer e.mu.RUnlock() + return e.stats +} + +// Capabilities 实现Engine接口 +func (e *MemoryEngine) Capabilities() engine.EngineCapabilities { + return engine.EngineCapabilities{ + SupportsCompression: e.config.UseCompression, + SupportsPersistence: false, + SupportsReplication: false, + MaxConcurrentWrites: 1000, + } +} + +// 辅助函数 + +type timeWindow struct { + start int64 + end int64 +} + +func splitTimeRange(start, end int64, interval time.Duration) []timeWindow { + var windows []timeWindow + intervalNanos := interval.Nanoseconds() + + for windowStart := start; windowStart < end; windowStart += intervalNanos { + windowEnd := windowStart + intervalNanos + if windowEnd > end { + windowEnd = end + } + windows = append(windows, timeWindow{start: windowStart, end: windowEnd}) + } + + return windows +} + +func matchTags(labels map[string]string, filters []engine.TagFilter) bool { + for _, filter := range filters { + value, ok := labels[filter.Key] + if !ok { + return false + } + + switch filter.Operator { + case engine.OpEqual: + if value != filter.Value { + return false + } + case engine.OpNotEqual: + if value == filter.Value { + return false + } + // 其他操作符... + } + } + return true +} + +func (e *MemoryEngine) aggregateSeriesInWindow(seriesID string, start, end int64, aggType engine.AggregationType, group *engine.AggregateGroup) { + e.lock.RLock(seriesID) + series, ok := e.series[seriesID] + e.lock.RUnlock(seriesID) + + if !ok { + return + } + + points := series.buffer.GetRecent(series.buffer.Size()) + var sum float64 + var count int + + for _, point := range points { + if point.Timestamp >= start && point.Timestamp < end { + sum += point.Value + count++ + } + } + + if count > 0 { + switch aggType { + case engine.AggSum: + group.Value = sum + case engine.AggAvg: + group.Value = sum / float64(count) + case engine.AggCount: + group.Value = float64(count) + } + group.Count = count + } +} + +func (e *MemoryEngine) aggregateAllSeriesInWindow(query engine.Query, start, end int64, group *engine.AggregateGroup) { + var sum float64 + var count int + + e.mu.RLock() + for seriesID, series := range e.series { + if query.DeviceID != "" && series.deviceID != query.DeviceID { + continue + } + if query.MetricCode != "" && series.metricCode != query.MetricCode { + continue + } + if !matchTags(series.labels, query.TagFilters) { + continue + } + + e.lock.RLock(seriesID) + points := series.buffer.GetRecent(series.buffer.Size()) + e.lock.RUnlock(seriesID) + + for _, point := range points { + if point.Timestamp >= start && point.Timestamp < end { + sum += point.Value + count++ + } + } + } + e.mu.RUnlock() + + if count > 0 { + switch query.Aggregation { + case engine.AggSum: + group.Value = sum + case engine.AggAvg: + group.Value = sum / float64(count) + case engine.AggCount: + group.Value = float64(count) + } + group.Count = count + } +} diff --git a/pkg/engine/memory/memory_test.go b/pkg/engine/memory/memory_test.go new file mode 100644 index 0000000..039a8f2 --- /dev/null +++ b/pkg/engine/memory/memory_test.go @@ -0,0 +1,245 @@ +package memory + +import ( + "context" + "testing" + "time" + + "git.pyer.club/kingecg/gotidb/pkg/engine" +) + +func TestMemoryEngine(t *testing.T) { + // 创建引擎配置 + config := engine.NewEngineConfig() + + // 创建内存引擎 + eng, err := NewMemoryEngine(config) + if err != nil { + t.Fatalf("Failed to create memory engine: %v", err) + } + + // 打开引擎 + if err := eng.Open(); err != nil { + t.Fatalf("Failed to open engine: %v", err) + } + defer eng.Close() + + // 测试写入单个数据点 + t.Run("WritePoint", func(t *testing.T) { + point := engine.DataPoint{ + DeviceID: "device001", + MetricCode: "temperature", + Labels: map[string]string{ + "location": "room1", + }, + Value: 25.5, + Timestamp: time.Now().UnixNano(), + } + + if err := eng.WritePoint(context.Background(), point); err != nil { + t.Fatalf("Failed to write point: %v", err) + } + + // 刷新缓冲区确保数据写入 + if err := eng.Flush(); err != nil { + t.Fatalf("Failed to flush: %v", err) + } + + // 查询最新数据 + query := engine.NewQueryBuilder(). + ForMetric("temperature"). + WithTag("location", engine.OpEqual, "room1"). + Build() + query.Type = engine.QueryTypeLatest + + result, err := eng.Query(context.Background(), query) + if err != nil { + t.Fatalf("Failed to query latest data: %v", err) + } + + tsResult, ok := result.(*engine.TimeSeriesResult) + if !ok { + t.Fatalf("Expected TimeSeriesResult, got %T", result) + } + + if len(tsResult.Points) != 1 { + t.Fatalf("Expected 1 point, got %d", len(tsResult.Points)) + } + + if tsResult.Points[0].Value != 25.5 { + t.Errorf("Expected value 25.5, got %.2f", tsResult.Points[0].Value) + } + }) + + // 测试批量写入 + t.Run("WriteBatch", func(t *testing.T) { + now := time.Now() + var points []engine.DataPoint + for i := 0; i < 10; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "device002", + MetricCode: "cpu", + Labels: map[string]string{ + "host": "server1", + }, + Value: float64(i * 10), + Timestamp: now.Add(time.Duration(i) * time.Second).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + t.Fatalf("Failed to write batch: %v", err) + } + + // 查询原始数据 + query := engine.NewQueryBuilder(). + ForMetric("cpu"). + WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.Add(1*time.Hour).UnixNano()). + WithTag("host", engine.OpEqual, "server1"). + Build() + + result, err := eng.Query(context.Background(), query) + if err != nil { + t.Fatalf("Failed to query raw data: %v", err) + } + + tsResult, ok := result.(*engine.TimeSeriesResult) + if !ok { + t.Fatalf("Expected TimeSeriesResult, got %T", result) + } + + if len(tsResult.Points) != 10 { + t.Fatalf("Expected 10 points, got %d", len(tsResult.Points)) + } + }) + + // 测试聚合查询 + t.Run("AggregateQuery", func(t *testing.T) { + now := time.Now() + var points []engine.DataPoint + for i := 0; i < 10; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "device003", + MetricCode: "memory", + Labels: map[string]string{ + "host": "server2", + }, + Value: float64(i * 10), + Timestamp: now.Add(time.Duration(i) * time.Second).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + t.Fatalf("Failed to write batch: %v", err) + } + + // 查询聚合数据 + query := engine.NewQueryBuilder(). + ForMetric("memory"). + WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.Add(1*time.Hour).UnixNano()). + WithTag("host", engine.OpEqual, "server2"). + WithAggregation(engine.AggAvg, 1*time.Minute). + Build() + + result, err := eng.Query(context.Background(), query) + if err != nil { + t.Fatalf("Failed to query aggregate data: %v", err) + } + + aggResult, ok := result.(*engine.AggregateResult) + if !ok { + t.Fatalf("Expected AggregateResult, got %T", result) + } + + if len(aggResult.Groups) == 0 { + t.Fatalf("Expected at least one aggregate group") + } + }) + + // 测试引擎统计信息 + t.Run("EngineStats", func(t *testing.T) { + stats := eng.Stats() + if stats.PointsCount == 0 { + t.Errorf("Expected non-zero points count") + } + }) + + // 测试引擎能力 + t.Run("EngineCapabilities", func(t *testing.T) { + caps := eng.Capabilities() + if !caps.SupportsCompression { + t.Errorf("Expected compression support") + } + if caps.SupportsPersistence { + t.Errorf("Memory engine should not support persistence") + } + }) +} + +func TestCircularBuffer(t *testing.T) { + // 创建引擎配置 + config := engine.NewEngineConfig() + memConfig := config.MemoryConfig() + memConfig.MaxPointsPerSeries = 5 // 设置为较小的值以便测试 + + // 创建内存引擎 + eng, err := NewMemoryEngine(config) + if err != nil { + t.Fatalf("Failed to create memory engine: %v", err) + } + + // 打开引擎 + if err := eng.Open(); err != nil { + t.Fatalf("Failed to open engine: %v", err) + } + defer eng.Close() + + // 写入超过缓冲区容量的数据点 + now := time.Now() + var points []engine.DataPoint + for i := 0; i < 10; i++ { + points = append(points, engine.DataPoint{ + DeviceID: "device004", + MetricCode: "disk", + Labels: map[string]string{ + "path": "/data", + }, + Value: float64(i), + Timestamp: now.Add(time.Duration(i) * time.Second).UnixNano(), + }) + } + + if err := eng.WriteBatch(context.Background(), points); err != nil { + t.Fatalf("Failed to write batch: %v", err) + } + + // 查询原始数据 + query := engine.NewQueryBuilder(). + ForMetric("disk"). + WithTimeRange(now.Add(-1*time.Hour).UnixNano(), now.Add(1*time.Hour).UnixNano()). + WithTag("path", engine.OpEqual, "/data"). + Build() + + result, err := eng.Query(context.Background(), query) + if err != nil { + t.Fatalf("Failed to query raw data: %v", err) + } + + tsResult, ok := result.(*engine.TimeSeriesResult) + if !ok { + t.Fatalf("Expected TimeSeriesResult, got %T", result) + } + + // 应该只返回最近的5个点 + if len(tsResult.Points) != 5 { + t.Fatalf("Expected 5 points (MaxPointsPerSeries), got %d", len(tsResult.Points)) + } + + // 检查返回的是最新的5个点 + for i, point := range tsResult.Points { + expectedValue := float64(i + 5) // 5-9 + if point.Value != expectedValue { + t.Errorf("Expected point value %.1f, got %.1f", expectedValue, point.Value) + } + } +} diff --git a/pkg/engine/memory/register.go b/pkg/engine/memory/register.go new file mode 100644 index 0000000..aa14973 --- /dev/null +++ b/pkg/engine/memory/register.go @@ -0,0 +1,8 @@ +package memory + +import "git.pyer.club/kingecg/gotidb/pkg/engine" + +// Register 注册内存存储引擎到引擎注册表 +func Register(registry *engine.EngineRegistry) { + registry.Register("memory", NewMemoryEngine) +} diff --git a/pkg/engine/query.go b/pkg/engine/query.go new file mode 100644 index 0000000..692be38 --- /dev/null +++ b/pkg/engine/query.go @@ -0,0 +1,192 @@ +package engine + +import "time" + +// QueryType 定义查询类型 +type QueryType int + +const ( + // QueryTypeRaw 原始数据查询 + QueryTypeRaw QueryType = iota + // QueryTypeAggregate 聚合查询 + QueryTypeAggregate + // QueryTypeLatest 最新值查询 + QueryTypeLatest + // QueryTypeTags 标签查询 + QueryTypeTags + // QueryTypeMetadata 元数据查询 + QueryTypeMetadata +) + +// Query 定义查询参数 +type Query struct { + // 查询类型 + Type QueryType + + // 时间范围 + StartTime int64 + EndTime int64 + + // 序列标识 + SeriesID string + DeviceID string + MetricCode string + + // 标签过滤 + TagFilters []TagFilter + + // 聚合选项 + Aggregation AggregationType + AggInterval time.Duration + IncludeRawData bool + + // 结果限制 + Limit int + Offset int + + // 其他查询选项 + Options map[string]interface{} +} + +// TagFilter 定义标签过滤条件 +type TagFilter struct { + Key string + Operator FilterOperator + Value string +} + +// FilterOperator 定义过滤操作符 +type FilterOperator int + +const ( + // OpEqual 等于 + OpEqual FilterOperator = iota + // OpNotEqual 不等于 + OpNotEqual + // OpRegex 正则匹配 + OpRegex + // OpGreaterThan 大于 + OpGreaterThan + // OpLessThan 小于 + OpLessThan +) + +// AggregationType 定义聚合类型 +type AggregationType int + +const ( + // AggNone 无聚合 + AggNone AggregationType = iota + // AggSum 求和 + AggSum + // AggAvg 平均值 + AggAvg + // AggMin 最小值 + AggMin + // AggMax 最大值 + AggMax + // AggCount 计数 + AggCount +) + +// QueryResult 定义查询结果接口 +type QueryResult interface { + // Type 返回查询结果类型 + Type() QueryType +} + +// TimeSeriesResult 定义时间序列查询结果 +type TimeSeriesResult struct { + SeriesID string + Points []DataPoint +} + +// Type 实现QueryResult接口 +func (r *TimeSeriesResult) Type() QueryType { + return QueryTypeRaw +} + +// AggregateResult 定义聚合查询结果 +type AggregateResult struct { + SeriesID string + Groups []AggregateGroup +} + +// Type 实现QueryResult接口 +func (r *AggregateResult) Type() QueryType { + return QueryTypeAggregate +} + +// AggregateGroup 定义聚合组 +type AggregateGroup struct { + StartTime int64 + EndTime int64 + Value float64 + Count int +} + +// QueryBuilder 提供流式API构建查询 +type QueryBuilder struct { + query Query +} + +// NewQueryBuilder 创建新的查询构建器 +func NewQueryBuilder() *QueryBuilder { + return &QueryBuilder{ + query: Query{ + Options: make(map[string]interface{}), + }, + } +} + +// ForMetric 设置指标代码 +func (b *QueryBuilder) ForMetric(metricCode string) *QueryBuilder { + b.query.MetricCode = metricCode + return b +} + +// WithTimeRange 设置时间范围 +func (b *QueryBuilder) WithTimeRange(start, end int64) *QueryBuilder { + b.query.StartTime = start + b.query.EndTime = end + return b +} + +// WithTag 添加标签过滤 +func (b *QueryBuilder) WithTag(key string, op FilterOperator, value string) *QueryBuilder { + b.query.TagFilters = append(b.query.TagFilters, TagFilter{ + Key: key, + Operator: op, + Value: value, + }) + return b +} + +// WithAggregation 设置聚合选项 +func (b *QueryBuilder) WithAggregation(agg AggregationType, interval time.Duration) *QueryBuilder { + b.query.Type = QueryTypeAggregate + b.query.Aggregation = agg + b.query.AggInterval = interval + return b +} + +// WithLimit 设置结果限制 +func (b *QueryBuilder) WithLimit(limit int) *QueryBuilder { + b.query.Limit = limit + return b +} + +// WithOffset 设置结果偏移 +func (b *QueryBuilder) WithOffset(offset int) *QueryBuilder { + b.query.Offset = offset + return b +} + +// Build 构建查询对象 +func (b *QueryBuilder) Build() Query { + // 如果没有设置查询类型,默认为原始数据查询 + if b.query.Type == 0 { + b.query.Type = QueryTypeRaw + } + return b.query +} diff --git a/pkg/engine/registry.go b/pkg/engine/registry.go new file mode 100644 index 0000000..8e27ecc --- /dev/null +++ b/pkg/engine/registry.go @@ -0,0 +1,261 @@ +package engine + +import ( + "fmt" + "sync" + "time" +) + +// EngineConfig 定义引擎配置接口 +type EngineConfig interface { + // 通用配置方法 + WithMaxRetention(duration time.Duration) EngineConfig + WithMaxPoints(points int) EngineConfig + WithFlushInterval(interval time.Duration) EngineConfig + + // 获取配置值 + MaxRetention() time.Duration + MaxPoints() int + FlushInterval() time.Duration + + // 获取特定引擎的配置 + MemoryConfig() *MemoryEngineConfig + FileConfig() *FileEngineConfig +} + +// BaseEngineConfig 提供基本的配置实现 +type BaseEngineConfig struct { + maxRetention time.Duration + maxPoints int + flushInterval time.Duration + memoryConfig *MemoryEngineConfig + fileConfig *FileEngineConfig +} + +// NewEngineConfig 创建新的引擎配置 +func NewEngineConfig() EngineConfig { + return &BaseEngineConfig{ + maxRetention: 24 * time.Hour, // 默认保留24小时 + maxPoints: 1000, // 默认最大点数 + flushInterval: 10 * time.Second, // 默认刷新间隔 + memoryConfig: NewMemoryEngineConfig(), + fileConfig: NewFileEngineConfig(), + } +} + +// WithMaxRetention 设置最大保留时间 +func (c *BaseEngineConfig) WithMaxRetention(duration time.Duration) EngineConfig { + c.maxRetention = duration + return c +} + +// WithMaxPoints 设置最大点数 +func (c *BaseEngineConfig) WithMaxPoints(points int) EngineConfig { + c.maxPoints = points + return c +} + +// WithFlushInterval 设置刷新间隔 +func (c *BaseEngineConfig) WithFlushInterval(interval time.Duration) EngineConfig { + c.flushInterval = interval + return c +} + +// MaxRetention 获取最大保留时间 +func (c *BaseEngineConfig) MaxRetention() time.Duration { + return c.maxRetention +} + +// MaxPoints 获取最大点数 +func (c *BaseEngineConfig) MaxPoints() int { + return c.maxPoints +} + +// FlushInterval 获取刷新间隔 +func (c *BaseEngineConfig) FlushInterval() time.Duration { + return c.flushInterval +} + +// MemoryConfig 获取内存引擎配置 +func (c *BaseEngineConfig) MemoryConfig() *MemoryEngineConfig { + return c.memoryConfig +} + +// FileConfig 获取文件引擎配置 +func (c *BaseEngineConfig) FileConfig() *FileEngineConfig { + return c.fileConfig +} + +// MemoryEngineConfig 内存引擎特定配置 +type MemoryEngineConfig struct { + MaxPointsPerSeries int // 每个序列保留的最大点数 + UseCompression bool // 是否使用压缩 +} + +// NewMemoryEngineConfig 创建新的内存引擎配置 +func NewMemoryEngineConfig() *MemoryEngineConfig { + return &MemoryEngineConfig{ + MaxPointsPerSeries: 30, // 默认保留30个点 + UseCompression: true, // 默认启用压缩 + } +} + +// FileEngineConfig 文件引擎特定配置 +type FileEngineConfig struct { + DataDir string // 数据目录 + SegmentSize int64 // 段文件大小 + CompactWindow time.Duration // 压缩检查间隔 + MaxSegments int // 最大段文件数 + UseCompression bool // 是否启用压缩 + CompressionLevel int // 压缩级别(1-9) + IndexCacheSize int64 // 索引缓存大小 + WriteBufferSize int // 写入缓冲区大小 + MaxOpenFiles int // 最大打开文件数 + SyncWrites bool // 是否同步写入 + RetentionPeriod time.Duration // 数据保留期限 + CompactThreshold float64 // 压缩触发阈值(重叠率) +} + +// NewFileEngineConfig 创建新的文件引擎配置 +func NewFileEngineConfig() *FileEngineConfig { + return &FileEngineConfig{ + DataDir: "data", // 默认数据目录 + SegmentSize: 64 * 1024 * 1024, // 默认64MB + CompactWindow: 24 * time.Hour, // 默认24小时 + MaxSegments: 10, // 默认最大10个段文件 + UseCompression: true, // 默认启用压缩 + CompressionLevel: 6, // 默认压缩级别 + IndexCacheSize: 256 * 1024 * 1024, // 默认256MB索引缓存 + WriteBufferSize: 4 * 1024 * 1024, // 默认4MB写入缓冲 + MaxOpenFiles: 1000, // 默认最大打开文件数 + SyncWrites: false, // 默认异步写入 + RetentionPeriod: 7 * 24 * time.Hour, // 默认保留7天 + CompactThreshold: 0.5, // 默认50%重叠率触发压缩 + } +} + +// WithDataDir 设置数据目录 +func (c *FileEngineConfig) WithDataDir(dir string) *FileEngineConfig { + c.DataDir = dir + return c +} + +// WithSegmentSize 设置段文件大小 +func (c *FileEngineConfig) WithSegmentSize(size int64) *FileEngineConfig { + c.SegmentSize = size + return c +} + +// WithCompactWindow 设置压缩检查间隔 +func (c *FileEngineConfig) WithCompactWindow(window time.Duration) *FileEngineConfig { + c.CompactWindow = window + return c +} + +// WithMaxSegments 设置最大段文件数 +func (c *FileEngineConfig) WithMaxSegments(count int) *FileEngineConfig { + c.MaxSegments = count + return c +} + +// WithCompression 设置压缩选项 +func (c *FileEngineConfig) WithCompression(use bool, level int) *FileEngineConfig { + c.UseCompression = use + if level >= 1 && level <= 9 { + c.CompressionLevel = level + } + return c +} + +// WithIndexCacheSize 设置索引缓存大小 +func (c *FileEngineConfig) WithIndexCacheSize(size int64) *FileEngineConfig { + c.IndexCacheSize = size + return c +} + +// WithWriteBufferSize 设置写入缓冲区大小 +func (c *FileEngineConfig) WithWriteBufferSize(size int) *FileEngineConfig { + c.WriteBufferSize = size + return c +} + +// WithMaxOpenFiles 设置最大打开文件数 +func (c *FileEngineConfig) WithMaxOpenFiles(count int) *FileEngineConfig { + c.MaxOpenFiles = count + return c +} + +// WithSyncWrites 设置同步写入选项 +func (c *FileEngineConfig) WithSyncWrites(sync bool) *FileEngineConfig { + c.SyncWrites = sync + return c +} + +// WithRetentionPeriod 设置数据保留期限 +func (c *FileEngineConfig) WithRetentionPeriod(period time.Duration) *FileEngineConfig { + c.RetentionPeriod = period + return c +} + +// WithCompactThreshold 设置压缩触发阈值 +func (c *FileEngineConfig) WithCompactThreshold(threshold float64) *FileEngineConfig { + if threshold > 0 && threshold <= 1 { + c.CompactThreshold = threshold + } + return c +} + +// EngineFactory 定义创建存储引擎的工厂函数类型 +type EngineFactory func(config EngineConfig) (Engine, error) + +// EngineRegistry 管理所有可用的存储引擎 +type EngineRegistry struct { + mu sync.RWMutex + engines map[string]EngineFactory +} + +// NewEngineRegistry 创建新的引擎注册表 +func NewEngineRegistry() *EngineRegistry { + return &EngineRegistry{ + engines: make(map[string]EngineFactory), + } +} + +// Register 注册新引擎 +func (r *EngineRegistry) Register(name string, factory EngineFactory) { + r.mu.Lock() + defer r.mu.Unlock() + r.engines[name] = factory +} + +// Create 创建引擎实例 +func (r *EngineRegistry) Create(name string, config EngineConfig) (Engine, error) { + r.mu.RLock() + factory, ok := r.engines[name] + r.mu.RUnlock() + + if !ok { + return nil, fmt.Errorf("unknown engine: %s", name) + } + + return factory(config) +} + +// ListEngines 列出所有注册的引擎 +func (r *EngineRegistry) ListEngines() []string { + r.mu.RLock() + defer r.mu.RUnlock() + + engines := make([]string, 0, len(r.engines)) + for name := range r.engines { + engines = append(engines, name) + } + return engines +} + +// UnregisterEngine 注销引擎 +func (r *EngineRegistry) UnregisterEngine(name string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.engines, name) +} diff --git a/pkg/engine/utils.go b/pkg/engine/utils.go new file mode 100644 index 0000000..bb487b7 --- /dev/null +++ b/pkg/engine/utils.go @@ -0,0 +1,236 @@ +package engine + +import ( + "context" + "hash/fnv" + "sync" +) + +// WriteBuffer 实现写入缓冲区,用于合并小批量写入 +type WriteBuffer struct { + points map[string][]DataPoint // 按序列ID分组的数据点 + mu sync.Mutex + maxSize int // 缓冲区最大大小 + flushCh chan struct{} // 触发刷新的通道 + engine Engine // 底层存储引擎 +} + +// NewWriteBuffer 创建新的写入缓冲区 +func NewWriteBuffer(engine Engine, maxSize int) *WriteBuffer { + wb := &WriteBuffer{ + points: make(map[string][]DataPoint), + maxSize: maxSize, + flushCh: make(chan struct{}, 1), + engine: engine, + } + return wb +} + +// Add 添加数据点到缓冲区 +func (wb *WriteBuffer) Add(point DataPoint) error { + wb.mu.Lock() + seriesID := point.SeriesID() + wb.points[seriesID] = append(wb.points[seriesID], point) + size := len(wb.points) + wb.mu.Unlock() + + if size >= wb.maxSize { + return wb.Flush() + } + return nil +} + +// Flush 将缓冲区数据写入底层引擎 +func (wb *WriteBuffer) Flush() error { + wb.mu.Lock() + points := make([]DataPoint, 0, len(wb.points)*10) // 预估大小 + for _, seriesPoints := range wb.points { + points = append(points, seriesPoints...) + } + wb.points = make(map[string][]DataPoint) + wb.mu.Unlock() + + if len(points) > 0 { + return wb.engine.WriteBatch(context.Background(), points) + } + return nil +} + +// Close 关闭写入缓冲区 +func (wb *WriteBuffer) Close() error { + return wb.Flush() +} + +// ShardedLock 实现分片锁,用于减少锁竞争 +type ShardedLock struct { + locks []sync.RWMutex + shardMask uint64 +} + +// NewShardedLock 创建新的分片锁 +func NewShardedLock(shards int) *ShardedLock { + // 确保分片数是2的幂 + shards = nextPowerOfTwo(shards) + return &ShardedLock{ + locks: make([]sync.RWMutex, shards), + shardMask: uint64(shards - 1), + } +} + +// getLockForKey 获取指定键的锁 +func (sl *ShardedLock) getLockForKey(key string) *sync.RWMutex { + h := fnv.New64() + h.Write([]byte(key)) + hashVal := h.Sum64() + return &sl.locks[hashVal&sl.shardMask] +} + +// Lock 对指定键加写锁 +func (sl *ShardedLock) Lock(key string) { + sl.getLockForKey(key).Lock() +} + +// Unlock 对指定键解除写锁 +func (sl *ShardedLock) Unlock(key string) { + sl.getLockForKey(key).Unlock() +} + +// RLock 对指定键加读锁 +func (sl *ShardedLock) RLock(key string) { + sl.getLockForKey(key).RLock() +} + +// RUnlock 对指定键解除读锁 +func (sl *ShardedLock) RUnlock(key string) { + sl.getLockForKey(key).RUnlock() +} + +// CompactTimeSeriesBlock 实现时序数据的紧凑存储 +type CompactTimeSeriesBlock struct { + baseTime int64 // 基准时间戳 + deltaEncode []byte // 使用delta编码存储时间戳 + values []byte // 压缩存储的值 +} + +// NewCompactBlock 创建新的紧凑存储块 +func NewCompactBlock(baseTime int64, capacity int) *CompactTimeSeriesBlock { + return &CompactTimeSeriesBlock{ + baseTime: baseTime, + deltaEncode: make([]byte, 0, capacity*8), // 预留足够空间 + values: make([]byte, 0, capacity*8), + } +} + +// nextPowerOfTwo 返回大于等于n的最小2的幂 +func nextPowerOfTwo(n int) int { + n-- + n |= n >> 1 + n |= n >> 2 + n |= n >> 4 + n |= n >> 8 + n |= n >> 16 + n++ + return n +} + +// TimeRangeIndex 实现时间范围索引 +type TimeRangeIndex struct { + windows []timeWindow + blockSize int64 // 时间窗口大小 +} + +type timeWindow struct { + startTime int64 + endTime int64 + offset int // 数据块中的偏移 +} + +// NewTimeRangeIndex 创建新的时间范围索引 +func NewTimeRangeIndex(blockSize int64) *TimeRangeIndex { + return &TimeRangeIndex{ + windows: make([]timeWindow, 0), + blockSize: blockSize, + } +} + +// AddWindow 添加时间窗口 +func (idx *TimeRangeIndex) AddWindow(start, end int64, offset int) { + idx.windows = append(idx.windows, timeWindow{ + startTime: start, + endTime: end, + offset: offset, + }) +} + +// FindBlocks 查找指定时间范围内的数据块 +func (idx *TimeRangeIndex) FindBlocks(start, end int64) []int { + var result []int + for i, window := range idx.windows { + if window.endTime >= start && window.startTime <= end { + result = append(result, i) + } + } + return result +} + +// CircularBuffer 实现固定大小的环形缓冲区 +type CircularBuffer struct { + values []DataPoint + head int + size int + capacity int + mu sync.RWMutex +} + +// NewCircularBuffer 创建新的环形缓冲区 +func NewCircularBuffer(capacity int) *CircularBuffer { + return &CircularBuffer{ + values: make([]DataPoint, capacity), + capacity: capacity, + } +} + +// Add 添加数据点到环形缓冲区 +func (cb *CircularBuffer) Add(point DataPoint) { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.values[cb.head] = point + cb.head = (cb.head + 1) % cb.capacity + if cb.size < cb.capacity { + cb.size++ + } +} + +// GetRecent 获取最近的n个数据点 +func (cb *CircularBuffer) GetRecent(n int) []DataPoint { + cb.mu.RLock() + defer cb.mu.RUnlock() + + if n > cb.size { + n = cb.size + } + + result := make([]DataPoint, n) + for i := 0; i < n; i++ { + idx := (cb.head - i - 1 + cb.capacity) % cb.capacity + if idx < 0 { + idx += cb.capacity + } + result[i] = cb.values[idx] + } + + return result +} + +// Size 返回当前缓冲区中的数据点数量 +func (cb *CircularBuffer) Size() int { + cb.mu.RLock() + defer cb.mu.RUnlock() + return cb.size +} + +// Capacity 返回缓冲区容量 +func (cb *CircularBuffer) Capacity() int { + return cb.capacity +} diff --git a/pkg/storage/boltdb.go b/pkg/storage/boltdb.go index 0663217..ebe3cea 100644 --- a/pkg/storage/boltdb.go +++ b/pkg/storage/boltdb.go @@ -15,7 +15,7 @@ import ( const ( // PersistenceTypeBoltDB BoltDB持久化类型 - PersistenceTypeBoltDB PersistenceType = "boltdb" + // PersistenceTypeBoltDB PersistenceType = "boltdb" // 默认bucket名称 devicesBucketName = "devices" @@ -296,7 +296,7 @@ func (e *BoltDBEngine) ReadAll(ctx context.Context, id model.DataPointID) ([]mod // ReadDuration 读取指定时间范围内的数据 func (e *BoltDBEngine) ReadDuration(ctx context.Context, id model.DataPointID, from, to time.Time) ([]model.DataValue, error) { - deviceKey := id.String() + // deviceKey := id.String() // 从数据库读取所有数据 values, err := e.Read(ctx, id)