gotidb/pkg/engine/memory/memory.go

442 lines
9.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package memory
import (
"context"
"fmt"
"sync"
"time"
"git.pyer.club/kingecg/gotidb/pkg/engine"
)
// MemoryEngine 实现基于内存的存储引擎
type MemoryEngine struct {
// 配置
config *engine.MemoryEngineConfig
// 数据存储
series map[string]*SeriesData
// 并发控制
lock *engine.ShardedLock
// 写入缓冲
writeBuffer *engine.WriteBuffer
// 统计信息
stats engine.EngineStats
// 关闭控制
closed bool
mu sync.RWMutex
}
// SeriesData 存储单个时间序列的数据
type SeriesData struct {
// 最新数据点的环形缓冲区
buffer *engine.CircularBuffer
// 元数据
deviceID string
metricCode string
labels map[string]string
// 统计信息
lastWriteTime time.Time
pointsCount int64
}
// NewMemoryEngine 创建新的内存存储引擎
func NewMemoryEngine(config engine.EngineConfig) (engine.Engine, error) {
memConfig := config.MemoryConfig()
if memConfig == nil {
return nil, fmt.Errorf("memory engine config is required")
}
e := &MemoryEngine{
config: memConfig,
series: make(map[string]*SeriesData),
lock: engine.NewShardedLock(16), // 16个分片
}
// 创建写入缓冲区
e.writeBuffer = engine.NewWriteBuffer(e, 1000) // 缓冲1000个点
return e, nil
}
// Open 实现Engine接口
func (e *MemoryEngine) Open() error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
e.closed = false
e.series = make(map[string]*SeriesData)
}
return nil
}
// Close 实现Engine接口
func (e *MemoryEngine) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
if !e.closed {
e.closed = true
// 刷新缓冲区
if err := e.writeBuffer.Flush(); err != nil {
return err
}
// 清理数据
e.series = nil
}
return nil
}
// WritePoint 实现Engine接口
func (e *MemoryEngine) WritePoint(ctx context.Context, point engine.DataPoint) error {
if e.closed {
return fmt.Errorf("engine is closed")
}
// 写入缓冲区
return e.writeBuffer.Add(point)
}
// WriteBatch 实现Engine接口
func (e *MemoryEngine) WriteBatch(ctx context.Context, points []engine.DataPoint) error {
if e.closed {
return fmt.Errorf("engine is closed")
}
for _, point := range points {
seriesID := point.SeriesID()
// 获取或创建序列数据
e.lock.Lock(seriesID)
series, ok := e.series[seriesID]
if !ok {
series = &SeriesData{
buffer: engine.NewCircularBuffer(e.config.MaxPointsPerSeries),
deviceID: point.DeviceID,
metricCode: point.MetricCode,
labels: point.Labels,
}
e.series[seriesID] = series
}
e.lock.Unlock(seriesID)
// 写入数据点
e.lock.Lock(seriesID)
series.buffer.Add(point)
series.lastWriteTime = time.Now()
series.pointsCount++
e.lock.Unlock(seriesID)
// 更新统计信息
e.mu.Lock()
e.stats.PointsCount++
e.stats.LastWriteTime = time.Now()
e.mu.Unlock()
}
return nil
}
// Query 实现Engine接口
func (e *MemoryEngine) Query(ctx context.Context, query engine.Query) (engine.QueryResult, error) {
if e.closed {
return nil, fmt.Errorf("engine is closed")
}
switch query.Type {
case engine.QueryTypeRaw:
return e.queryRaw(ctx, query)
case engine.QueryTypeLatest:
return e.queryLatest(ctx, query)
case engine.QueryTypeAggregate:
return e.queryAggregate(ctx, query)
default:
return nil, fmt.Errorf("unsupported query type: %v", query.Type)
}
}
// queryRaw 查询原始数据
func (e *MemoryEngine) queryRaw(ctx context.Context, query engine.Query) (engine.QueryResult, error) {
result := &engine.TimeSeriesResult{
SeriesID: query.SeriesID,
Points: make([]engine.DataPoint, 0),
}
// 如果指定了SeriesID只查询特定序列
if query.SeriesID != "" {
e.lock.RLock(query.SeriesID)
series, ok := e.series[query.SeriesID]
e.lock.RUnlock(query.SeriesID)
if !ok {
return result, nil
}
points := series.buffer.GetRecent(series.buffer.Size())
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
result.Points = append(result.Points, point)
}
}
return result, nil
}
// 否则查询所有匹配的序列
e.mu.RLock()
for seriesID, series := range e.series {
// 检查是否匹配查询条件
if query.DeviceID != "" && series.deviceID != query.DeviceID {
continue
}
if query.MetricCode != "" && series.metricCode != query.MetricCode {
continue
}
if !matchTags(series.labels, query.TagFilters) {
continue
}
e.lock.RLock(seriesID)
points := series.buffer.GetRecent(series.buffer.Size())
e.lock.RUnlock(seriesID)
for _, point := range points {
if point.Timestamp >= query.StartTime && point.Timestamp <= query.EndTime {
result.Points = append(result.Points, point)
}
}
}
e.mu.RUnlock()
return result, nil
}
// queryLatest 查询最新数据
func (e *MemoryEngine) queryLatest(ctx context.Context, query engine.Query) (engine.QueryResult, error) {
result := &engine.TimeSeriesResult{
SeriesID: query.SeriesID,
Points: make([]engine.DataPoint, 0),
}
if query.SeriesID != "" {
e.lock.RLock(query.SeriesID)
series, ok := e.series[query.SeriesID]
e.lock.RUnlock(query.SeriesID)
if !ok {
return result, nil
}
points := series.buffer.GetRecent(1)
if len(points) > 0 {
result.Points = append(result.Points, points[0])
}
return result, nil
}
e.mu.RLock()
for seriesID, series := range e.series {
if query.DeviceID != "" && series.deviceID != query.DeviceID {
continue
}
if query.MetricCode != "" && series.metricCode != query.MetricCode {
continue
}
if !matchTags(series.labels, query.TagFilters) {
continue
}
e.lock.RLock(seriesID)
points := series.buffer.GetRecent(1)
e.lock.RUnlock(seriesID)
if len(points) > 0 {
result.Points = append(result.Points, points[0])
}
}
e.mu.RUnlock()
return result, nil
}
// queryAggregate 查询聚合数据
func (e *MemoryEngine) queryAggregate(ctx context.Context, query engine.Query) (engine.QueryResult, error) {
result := &engine.AggregateResult{
SeriesID: query.SeriesID,
Groups: make([]engine.AggregateGroup, 0),
}
// 创建时间窗口
windows := splitTimeRange(query.StartTime, query.EndTime, query.AggInterval)
for _, window := range windows {
group := engine.AggregateGroup{
StartTime: window.start,
EndTime: window.end,
}
// 聚合每个窗口内的数据
if query.SeriesID != "" {
e.aggregateSeriesInWindow(query.SeriesID, window.start, window.end, query.Aggregation, &group)
} else {
e.aggregateAllSeriesInWindow(query, window.start, window.end, &group)
}
result.Groups = append(result.Groups, group)
}
return result, nil
}
// Flush 实现Engine接口
func (e *MemoryEngine) Flush() error {
return e.writeBuffer.Flush()
}
// Compact 实现Engine接口
func (e *MemoryEngine) Compact() error {
// 内存引擎不需要压缩
return nil
}
// Stats 实现Engine接口
func (e *MemoryEngine) Stats() engine.EngineStats {
e.mu.RLock()
defer e.mu.RUnlock()
return e.stats
}
// Capabilities 实现Engine接口
func (e *MemoryEngine) Capabilities() engine.EngineCapabilities {
return engine.EngineCapabilities{
SupportsCompression: e.config.UseCompression,
SupportsPersistence: false,
SupportsReplication: false,
MaxConcurrentWrites: 1000,
}
}
// 辅助函数
type timeWindow struct {
start int64
end int64
}
func splitTimeRange(start, end int64, interval time.Duration) []timeWindow {
var windows []timeWindow
intervalNanos := interval.Nanoseconds()
for windowStart := start; windowStart < end; windowStart += intervalNanos {
windowEnd := windowStart + intervalNanos
if windowEnd > end {
windowEnd = end
}
windows = append(windows, timeWindow{start: windowStart, end: windowEnd})
}
return windows
}
func matchTags(labels map[string]string, filters []engine.TagFilter) bool {
for _, filter := range filters {
value, ok := labels[filter.Key]
if !ok {
return false
}
switch filter.Operator {
case engine.OpEqual:
if value != filter.Value {
return false
}
case engine.OpNotEqual:
if value == filter.Value {
return false
}
// 其他操作符...
}
}
return true
}
func (e *MemoryEngine) aggregateSeriesInWindow(seriesID string, start, end int64, aggType engine.AggregationType, group *engine.AggregateGroup) {
e.lock.RLock(seriesID)
series, ok := e.series[seriesID]
e.lock.RUnlock(seriesID)
if !ok {
return
}
points := series.buffer.GetRecent(series.buffer.Size())
var sum float64
var count int
for _, point := range points {
if point.Timestamp >= start && point.Timestamp < end {
sum += point.Value
count++
}
}
if count > 0 {
switch aggType {
case engine.AggSum:
group.Value = sum
case engine.AggAvg:
group.Value = sum / float64(count)
case engine.AggCount:
group.Value = float64(count)
}
group.Count = count
}
}
func (e *MemoryEngine) aggregateAllSeriesInWindow(query engine.Query, start, end int64, group *engine.AggregateGroup) {
var sum float64
var count int
e.mu.RLock()
for seriesID, series := range e.series {
if query.DeviceID != "" && series.deviceID != query.DeviceID {
continue
}
if query.MetricCode != "" && series.metricCode != query.MetricCode {
continue
}
if !matchTags(series.labels, query.TagFilters) {
continue
}
e.lock.RLock(seriesID)
points := series.buffer.GetRecent(series.buffer.Size())
e.lock.RUnlock(seriesID)
for _, point := range points {
if point.Timestamp >= start && point.Timestamp < end {
sum += point.Value
count++
}
}
}
e.mu.RUnlock()
if count > 0 {
switch query.Aggregation {
case engine.AggSum:
group.Value = sum
case engine.AggAvg:
group.Value = sum / float64(count)
case engine.AggCount:
group.Value = float64(count)
}
group.Count = count
}
}