gomog/internal/engine/memory_store.go

551 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"
"git.kingecg.top/kingecg/gomog/internal/database"
"git.kingecg.top/kingecg/gomog/pkg/errors"
"git.kingecg.top/kingecg/gomog/pkg/types"
)
// MemoryStore 内存数据存储
type MemoryStore struct {
mu sync.RWMutex
collections map[string]*Collection
adapter database.DatabaseAdapter
}
// Collection 内存集合
type Collection struct {
name string
documents map[string]types.Document // id -> Document
mu sync.RWMutex
pageSize int // 分页大小
loadedAll bool // 是否已经加载了全部数据
totalCount int // 总文档数(如果知道的话)
}
// DocumentIterator 文档迭代器
type DocumentIterator struct {
store *MemoryStore
collection string
keys []string
currentIdx int
batchSize int
mutex sync.Mutex
}
// GetDocumentIterator 获取文档迭代器
func (ms *MemoryStore) GetDocumentIterator(collection string, batchSize int) (*DocumentIterator, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
coll, exists := ms.collections[collection]
if !exists {
return nil, fmt.Errorf("collection %s does not exist", collection)
}
// 获取所有键
keys := make([]string, 0, len(coll.documents))
for k := range coll.documents {
keys = append(keys, k)
}
return &DocumentIterator{
store: ms,
collection: collection,
keys: keys,
currentIdx: 0,
batchSize: batchSize,
}, nil
}
// HasNext 检查是否还有更多文档
func (iter *DocumentIterator) HasNext() bool {
iter.mutex.Lock()
defer iter.mutex.Unlock()
return iter.currentIdx < len(iter.keys)
}
// NextBatch 获取下一批文档
func (iter *DocumentIterator) NextBatch() ([]types.Document, error) {
iter.mutex.Lock()
defer iter.mutex.Unlock()
if iter.currentIdx >= len(iter.keys) {
return []types.Document{}, nil
}
endIdx := iter.currentIdx + iter.batchSize
if endIdx > len(iter.keys) {
endIdx = len(iter.keys)
}
batch := make([]types.Document, 0, endIdx-iter.currentIdx)
iter.store.mu.RLock()
coll := iter.store.collections[iter.collection]
iter.store.mu.RUnlock()
for i := iter.currentIdx; i < endIdx; i++ {
doc, exists := coll.documents[iter.keys[i]]
if exists {
batch = append(batch, doc)
}
}
iter.currentIdx = endIdx
return batch, nil
}
// Close 关闭迭代器
func (iter *DocumentIterator) Close() {
// 目前不需要特殊关闭操作
}
// NewMemoryStore 创建内存存储
func NewMemoryStore(adapter database.DatabaseAdapter) *MemoryStore {
return &MemoryStore{
collections: make(map[string]*Collection),
adapter: adapter,
}
}
// Initialize 初始化内存存储,但不加载所有数据,只创建集合结构
func (ms *MemoryStore) Initialize(ctx context.Context) error {
if ms.adapter == nil {
log.Println("[INFO] No database adapter, skipping initialization")
return nil
}
// 获取所有现有集合
tables, err := ms.adapter.ListCollections(ctx)
if err != nil {
// 如果 ListCollections 未实现,返回 nil不加载
if err.Error() == "not implemented" {
log.Println("[WARN] ListCollections not implemented, skipping initialization")
return nil
}
return fmt.Errorf("failed to list collections: %w", err)
}
log.Printf("[INFO] Found %d collections in database", len(tables))
// 仅为每个集合创建结构,不加载数据
createdCount := 0
for _, tableName := range tables {
ms.mu.Lock()
// 检查是否已存在
if _, exists := ms.collections[tableName]; !exists {
ms.collections[tableName] = &Collection{
name: tableName,
documents: make(map[string]types.Document),
pageSize: 1000, // 默认每页1000条记录
loadedAll: false,
}
createdCount++
log.Printf("[DEBUG] Created collection structure for %s", tableName)
}
ms.mu.Unlock()
}
log.Printf("[INFO] Created %d collection structures (data will be loaded lazily)", createdCount)
return nil
}
// CreateTestCollectionForTesting 为测试创建集合(仅用于测试)
func CreateTestCollectionForTesting(store *MemoryStore, name string, documents map[string]types.Document) {
store.collections[name] = &Collection{
name: name,
documents: documents,
pageSize: 1000,
loadedAll: true,
}
}
// LoadCollectionPage 按页加载集合数据
func (ms *MemoryStore) LoadCollectionPage(ctx context.Context, name string, page int) error {
coll, err := ms.GetCollection(name)
if err != nil {
return err
}
coll.mu.Lock()
defer coll.mu.Unlock()
// 如果已经加载了全部数据,则无需再加载
if coll.loadedAll {
return nil
}
skip := page * coll.pageSize
result, err := ms.adapter.FindPage(ctx, name, skip, coll.pageSize)
if err != nil {
return fmt.Errorf("failed to load page %d of collection %s: %w", page, name, err)
}
// 将页面数据添加到内存中
for _, doc := range result.Documents {
coll.documents[doc.ID] = doc
}
// 如果没有更多数据了,标记为已加载全部
if !result.HasMore {
coll.loadedAll = true
}
log.Printf("[INFO] Loaded page %d for collection %s (%d documents)", page, name, len(result.Documents))
return nil
}
// LoadEntireCollection 加载整个集合(谨慎使用,大数据集会导致内存问题)
func (ms *MemoryStore) LoadEntireCollection(ctx context.Context, name string) error {
coll, err := ms.GetCollection(name)
if err != nil {
return err
}
coll.mu.Lock()
defer coll.mu.Unlock()
// 直接从数据库加载所有数据
docs, err := ms.adapter.FindAll(ctx, name)
if err != nil {
return fmt.Errorf("failed to load entire collection %s: %w", name, err)
}
// 清空现有数据并加载新数据
coll.documents = make(map[string]types.Document)
for _, doc := range docs {
coll.documents[doc.ID] = doc
}
coll.loadedAll = true
log.Printf("[INFO] Loaded entire collection %s (%d documents)", name, len(docs))
return nil
}
// LazyLoadDocument 按需加载单个文档
func (ms *MemoryStore) LazyLoadDocument(ctx context.Context, collectionName, docID string) (types.Document, error) {
coll, err := ms.GetCollection(collectionName)
if err != nil {
return types.Document{}, err
}
coll.mu.RLock()
// 检查文档是否已在内存中
if doc, exists := coll.documents[docID]; exists {
coll.mu.RUnlock()
return doc, nil
}
coll.mu.RUnlock()
// 如果不在内存中,并且尚未加载全部数据,则尝试从数据库获取单个文档
// 这里假设数据库适配器有按ID查询的方法如果没有就加载一页数据
// 由于当前接口没有按ID查询的方法我们暂时返回错误让上层决定是否加载整页
return types.Document{}, errors.ErrDocumentNotFnd
}
// LoadCollection 从数据库加载集合到内存
func (ms *MemoryStore) LoadCollection(ctx context.Context, name string) error {
// 检查集合是否存在
exists, err := ms.adapter.CollectionExists(ctx, name)
if err != nil {
return err
}
if !exists {
// 创建集合
if err := ms.adapter.CreateCollection(ctx, name); err != nil {
return err
}
}
// 从数据库加载所有文档
docs, err := ms.adapter.FindAll(ctx, name)
if err != nil {
return err
}
ms.mu.Lock()
defer ms.mu.Unlock()
coll := &Collection{
name: name,
documents: make(map[string]types.Document),
}
for _, doc := range docs {
coll.documents[doc.ID] = doc
}
ms.collections[name] = coll
return nil
}
// GetCollection 获取集合(支持 dbName.collection 和纯表名两种格式)
func (ms *MemoryStore) GetCollection(name string) (*Collection, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
// 首先尝试完整名称例如testdb.users
coll, exists := ms.collections[name]
if exists {
return coll, nil
}
// 如果找不到尝试去掉数据库前缀例如users
if idx := strings.Index(name, "."); idx > 0 {
tableName := name[idx+1:]
coll, exists = ms.collections[tableName]
if exists {
return coll, nil
}
}
return nil, errors.ErrCollectionNotFnd
}
// Insert 插入文档到内存(集合不存在时自动创建)
func (ms *MemoryStore) Insert(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
// 集合不存在则创建
ms.mu.Lock()
ms.collections[collection] = &Collection{
name: collection,
documents: make(map[string]types.Document),
pageSize: 1000,
loadedAll: true, // 新创建的集合认为是"已完全加载",因为我们只添加新文档
}
coll = ms.collections[collection]
ms.mu.Unlock()
}
coll.mu.Lock()
defer coll.mu.Unlock()
coll.documents[doc.ID] = doc
return nil
}
// Find 查询文档
func (ms *MemoryStore) Find(collection string, filter types.Filter) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
var results []types.Document
for _, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
results = append(results, doc)
}
}
return results, nil
}
// Update 更新文档(支持 upsert 和 arrayFilters
func (ms *MemoryStore) Update(collection string, filter types.Filter, update types.Update, upsert bool, arrayFilters []types.Filter) (int, int, []string, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, 0, nil, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
matched := 0
modified := 0
var upsertedIDs []string
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
matched++
// 应用更新
newData := applyUpdateWithFilters(doc.Data, update, false, arrayFilters)
coll.documents[id] = types.Document{
ID: doc.ID,
Data: newData,
CreatedAt: doc.CreatedAt,
UpdatedAt: time.Now(),
}
modified++
}
}
// 处理 upsert如果没有匹配的文档且设置了 upsert
if matched == 0 && upsert {
// 创建新文档
newID := generateID()
newDoc := make(map[string]interface{})
// 应用更新($setOnInsert 会生效)
newData := applyUpdateWithFilters(newDoc, update, true, arrayFilters)
coll.documents[newID] = types.Document{
ID: newID,
Data: newData,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
matched = 1
modified = 1
upsertedIDs = append(upsertedIDs, newID)
}
return matched, modified, upsertedIDs, nil
}
// Delete 删除文档
func (ms *MemoryStore) Delete(collection string, filter types.Filter) (int, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return 0, err
}
coll.mu.Lock()
defer coll.mu.Unlock()
deleted := 0
for id, doc := range coll.documents {
if MatchFilter(doc.Data, filter) {
delete(coll.documents, id)
deleted++
}
}
return deleted, nil
}
// SyncToDB 同步集合到数据库
func (ms *MemoryStore) SyncToDB(ctx context.Context, collection string) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
// 转换为文档数组
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
// 对于 SQLite去掉数据库前缀例如testdb.users -> users
tableName := collection
if idx := strings.Index(collection, "."); idx > 0 {
tableName = collection[idx+1:]
}
// 检查集合是否存在,不存在则创建
exists, err := ms.adapter.CollectionExists(ctx, tableName)
if err != nil {
// 如果 CollectionExists 未实现(返回 ErrNotImplemented尝试直接创建表
if err.Error() == "not implemented" {
// 尝试创建表,忽略已存在的错误
_ = ms.adapter.CreateCollection(ctx, tableName)
} else {
return err
}
} else if !exists {
// 集合不存在,创建它
if err := ms.adapter.CreateCollection(ctx, tableName); err != nil {
return err
}
}
// 批量插入/更新到数据库
// 注意:这里简化处理,实际应该区分新增和更新
return ms.adapter.InsertMany(ctx, tableName, docs)
}
// GetAllDocuments 获取集合的所有文档(用于聚合)
func (ms *MemoryStore) GetAllDocuments(collection string) ([]types.Document, error) {
coll, err := ms.GetCollection(collection)
if err != nil {
return nil, err
}
coll.mu.RLock()
defer coll.mu.RUnlock()
docs := make([]types.Document, 0, len(coll.documents))
for _, doc := range coll.documents {
docs = append(docs, doc)
}
return docs, nil
}
// DropCollection 删除整个集合
func (ms *MemoryStore) DropCollection(name string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
if _, exists := ms.collections[name]; !exists {
return errors.ErrCollectionNotFnd
}
delete(ms.collections, name)
// 如果使用了数据库适配器,同步到数据库
if ms.adapter != nil {
ctx := context.Background()
_ = ms.adapter.DropCollection(ctx, name) // 忽略错误,继续执行
}
return nil
}
// InsertDocument 插入单个文档(已存在则更新)
func (ms *MemoryStore) InsertDocument(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
// 集合不存在则创建
ms.mu.Lock()
ms.collections[collection] = &Collection{
name: collection,
documents: make(map[string]types.Document),
}
coll = ms.collections[collection]
ms.mu.Unlock()
}
coll.mu.Lock()
defer coll.mu.Unlock()
coll.documents[doc.ID] = doc
return nil
}
// UpdateDocument 更新单个文档
func (ms *MemoryStore) UpdateDocument(collection string, doc types.Document) error {
coll, err := ms.GetCollection(collection)
if err != nil {
return err
}
coll.mu.Lock()
defer coll.mu.Unlock()
if _, exists := coll.documents[doc.ID]; !exists {
return errors.ErrDocumentNotFnd
}
coll.documents[doc.ID] = doc
return nil
}