Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ func (provider *Badger) SetMultiLevel(baseKey, variedKey string, value []byte, v
now := time.Now()

compressed := new(bytes.Buffer)
writer := lz4.NewWriter(compressed)
writer := core.Lz4WriterPool.Get().(*lz4.Writer)

writer.Reset(compressed)
defer core.Lz4WriterPool.Put(writer)

if _, err := writer.Write(value); err != nil {
_ = writer.Close()
Expand Down
38 changes: 36 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@ package core
import (
"bufio"
"bytes"
"io"
"net/http"
"strings"
"sync"
"time"

"github.com/pierrec/lz4/v4"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
lz4ReaderPool = sync.Pool{New: func() any { return lz4.NewReader(nil) }}
bufReaderPool = sync.Pool{New: func() any { return bufio.NewReader(nil) }}
Lz4WriterPool = sync.Pool{New: func() any { return lz4.NewWriter(nil) }}
)

type Storer interface {
MapKeys(prefix string) map[string]string
ListKeys() []string
Expand Down Expand Up @@ -55,9 +63,35 @@ func DecodeMapping(item []byte) (*StorageMapper, error) {
}

func readResponse(data []byte, req *http.Request) (*http.Response, error) {
reader := lz4.NewReader(bytes.NewReader(data))
lz4r := lz4ReaderPool.Get().(*lz4.Reader)
lz4r.Reset(bytes.NewReader(data))

brp := bufReaderPool.Get().(*bufio.Reader)
brp.Reset(lz4r)

resp, err := http.ReadResponse(brp, req)

// Fully consume body before returning readers to the pool.
// The response.Body references br internally — returning br to the pool
// while the body is unread causes use-after-pool-return under concurrency.
if err == nil && resp.Body != nil {
bodyBytes, readErr := io.ReadAll(resp.Body)
_ = resp.Body.Close()

bufReaderPool.Put(brp)
lz4ReaderPool.Put(lz4r)

if readErr != nil {
return nil, readErr
}

resp.Body = io.NopCloser(bytes.NewReader(bodyBytes))
} else {
bufReaderPool.Put(brp)
lz4ReaderPool.Put(lz4r)
}

return http.ReadResponse(bufio.NewReader(reader), req)
return resp, err
}

func MappingElection(provider Storer, item []byte, req *http.Request, validator *Revalidator, logger Logger) (resultFresh *http.Response, resultStale *http.Response, e error) {
Expand Down
5 changes: 4 additions & 1 deletion etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ func (provider *Etcd) SetMultiLevel(baseKey, variedKey string, value []byte, var
}

compressed := new(bytes.Buffer)
writer := lz4.NewWriter(compressed)
writer := core.Lz4WriterPool.Get().(*lz4.Writer)

writer.Reset(compressed)
defer core.Lz4WriterPool.Put(writer)

if _, err := writer.Write(value); err != nil {
_ = writer.Close()
Expand Down
126 changes: 83 additions & 43 deletions go-redis/go-redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@ import (
"net/http"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/darkweak/storages/core"
"github.com/pierrec/lz4/v4"
"github.com/redis/go-redis/v9"
)

const (
scanCount = 1000
mgetBatchSize = 500
)

// Redis provider type.
type Redis struct {
inClient redis.UniversalClient
Expand All @@ -25,7 +32,8 @@ type Redis struct {
logger core.Logger
configuration redis.UniversalOptions
close func() error
reconnecting bool
reconnecting atomic.Bool
reconnectMu sync.Mutex
hashtags string
}

Expand Down Expand Up @@ -113,15 +121,15 @@ func (provider *Redis) Uuid() string {

// ListKeys method returns the list of existing keys.
func (provider *Redis) ListKeys() []string {
if provider.reconnecting {
if provider.reconnecting.Load() {
provider.logger.Error("Impossible to list the redis keys while reconnecting.")

return []string{}
}

keys := []string{}

iter := provider.inClient.Scan(provider.ctx, 0, provider.hashtags+core.MappingKeyPrefix+"*", 0).Iterator()
iter := provider.inClient.Scan(provider.ctx, 0, provider.hashtags+core.MappingKeyPrefix+"*", 1000).Iterator()
for iter.Next(provider.ctx) {
value := provider.Get(iter.Val())

Expand All @@ -140,7 +148,7 @@ func (provider *Redis) ListKeys() []string {
}

if err := iter.Err(); err != nil {
if !provider.reconnecting {
if !provider.reconnecting.Load() {
go provider.Reconnect()
}

Expand All @@ -152,30 +160,44 @@ func (provider *Redis) ListKeys() []string {
return keys
}

// MapKeys method returns the list of existing keys.
// MapKeys method returns the map of existing keys to their values.
// It processes keys in batches to avoid materializing the entire keyspace at once,
// reducing peak heap usage and GC pressure.
func (provider *Redis) MapKeys(prefix string) map[string]string {
mapKeys := map[string]string{}
keys := []string{}
mapKeys := make(map[string]string)

iter := provider.inClient.Scan(provider.ctx, 0, prefix+"*", 0).Iterator()
for iter.Next(provider.ctx) {
keys = append(keys, iter.Val())
}
var cursor uint64
for {
keys, nextCursor, err := provider.inClient.Scan(provider.ctx, cursor, prefix+"*", scanCount).Result()
if err != nil {
return mapKeys
}

if err := iter.Err(); err != nil {
return mapKeys
}
// Batch MGET for this SCAN page to bound peak memory.
for iteration := 0; iteration < len(keys); iteration += mgetBatchSize {
end := iteration + mgetBatchSize
if end > len(keys) {
end = len(keys)
}

vals, err := provider.inClient.MGet(provider.ctx, keys...).Result()
if err != nil {
return mapKeys
}
batch := keys[iteration:end]

for idx, item := range keys {
k, _ := strings.CutPrefix(item, prefix)
vals, err := provider.inClient.MGet(provider.ctx, batch...).Result()
if err != nil {
continue
}

if vals[idx] != nil {
mapKeys[k] = vals[idx].(string)
for idx, item := range batch {
k, _ := strings.CutPrefix(item, prefix)
if vals[idx] != nil {
mapKeys[k] = vals[idx].(string)
}
}
}

cursor = nextCursor
if cursor == 0 {
break
}
}

Expand All @@ -199,7 +221,10 @@ func (provider *Redis) SetMultiLevel(baseKey, variedKey string, value []byte, va
now := time.Now()

compressed := new(bytes.Buffer)
writer := lz4.NewWriter(compressed)
writer := core.Lz4WriterPool.Get().(*lz4.Writer)

writer.Reset(compressed)
defer core.Lz4WriterPool.Put(writer)

if _, err := writer.Write(value); err != nil {
_ = writer.Close()
Expand Down Expand Up @@ -242,23 +267,21 @@ func (provider *Redis) SetMultiLevel(baseKey, variedKey string, value []byte, va

// Get method returns the populated response if exists, empty response then.
func (provider *Redis) Get(key string) (item []byte) {
if provider.reconnecting {
if provider.reconnecting.Load() {
provider.logger.Error("Impossible to get the redis key while reconnecting.")

return
}

result, err := provider.inClient.Get(provider.ctx, key).Result()
item, err := provider.inClient.Get(provider.ctx, key).Bytes()
if err != nil {
if !errors.Is(err, redis.Nil) && !provider.reconnecting {
if !errors.Is(err, redis.Nil) && !provider.reconnecting.Load() {
go provider.Reconnect()
}

return
return nil
}

item = []byte(result)

return
}

Expand All @@ -270,7 +293,7 @@ func (provider *Redis) Prefix(key string) []string {

// Set method will store the response in Etcd provider.
func (provider *Redis) Set(key string, value []byte, duration time.Duration) error {
if provider.reconnecting {
if provider.reconnecting.Load() {
provider.logger.Error("Impossible to set the redis value while reconnecting.")

return errors.New("reconnecting error")
Expand All @@ -284,7 +307,7 @@ func (provider *Redis) Set(key string, value []byte, duration time.Duration) err

err := provider.inClient.Set(provider.ctx, key, value, duration).Err()
if err != nil {
if !provider.reconnecting {
if !provider.reconnecting.Load() {
go provider.Reconnect()
}

Expand All @@ -296,7 +319,7 @@ func (provider *Redis) Set(key string, value []byte, duration time.Duration) err

// Delete method will delete the response in Etcd provider if exists corresponding to key param.
func (provider *Redis) Delete(key string) {
if provider.reconnecting {
if provider.reconnecting.Load() {
provider.logger.Error("Impossible to delete the redis key while reconnecting.")

return
Expand All @@ -307,7 +330,7 @@ func (provider *Redis) Delete(key string) {

// DeleteMany method will delete the responses in Redis provider if exists corresponding to the regex key param.
func (provider *Redis) DeleteMany(key string) {
if provider.reconnecting {
if provider.reconnecting.Load() {
provider.logger.Error("Impossible to delete the redis keys while reconnecting.")

return
Expand All @@ -318,22 +341,24 @@ func (provider *Redis) DeleteMany(key string) {
return
}

keys := []string{}
iter := provider.inClient.Scan(provider.ctx, 0, "*", 0).Iterator()
keys := make([]string, 0, 64)
iter := provider.inClient.Scan(provider.ctx, 0, "*", scanCount).Iterator()

for iter.Next(provider.ctx) {
if rgKey.MatchString(iter.Val()) {
keys = append(keys, iter.Val())
}
}

if iter.Err() != nil && !provider.reconnecting {
if iter.Err() != nil && !provider.reconnecting.Load() {
go provider.Reconnect()

return
}

provider.inClient.Del(provider.ctx, keys...)
if len(keys) > 0 {
provider.inClient.Del(provider.ctx, keys...)
}
}

// Init method will.
Expand All @@ -343,7 +368,7 @@ func (provider *Redis) Init() error {

// Reset method will reset or close provider.
func (provider *Redis) Reset() error {
if provider.reconnecting {
if provider.reconnecting.Load() {
provider.logger.Error("Impossible to reset the redis instance while reconnecting.")

return nil
Expand All @@ -353,12 +378,27 @@ func (provider *Redis) Reset() error {
}

func (provider *Redis) Reconnect() {
provider.reconnecting = true
if !provider.reconnecting.CompareAndSwap(false, true) {
return
}

provider.reconnectMu.Lock()
defer provider.reconnectMu.Unlock()
defer provider.reconnecting.Store(false)

for range 30 {
newClient := redis.NewUniversalClient(&provider.configuration)
if newClient != nil {
oldClient := provider.inClient
provider.inClient = newClient

if oldClient != nil {
_ = oldClient.Close()
}

return
}

if provider.inClient = redis.NewUniversalClient(&provider.configuration); provider.inClient != nil {
provider.reconnecting = false
} else {
time.Sleep(10 * time.Second)
provider.Reconnect()
}
}
5 changes: 4 additions & 1 deletion nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ func (provider *Nats) SetMultiLevel(baseKey, variedKey string, value []byte, var
now := time.Now()

compressed := new(bytes.Buffer)
writer := lz4.NewWriter(compressed)
writer := core.Lz4WriterPool.Get().(*lz4.Writer)

writer.Reset(compressed)
defer core.Lz4WriterPool.Put(writer)

if _, err := writer.Write(value); err != nil {
_ = writer.Close()
Expand Down
5 changes: 4 additions & 1 deletion nuts/nuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ func (provider *Nuts) SetMultiLevel(baseKey, variedKey string, value []byte, var
now := time.Now()

compressed := new(bytes.Buffer)
writer := lz4.NewWriter(compressed)
writer := core.Lz4WriterPool.Get().(*lz4.Writer)

writer.Reset(compressed)
defer core.Lz4WriterPool.Put(writer)

if _, err := writer.Write(value); err != nil {
_ = writer.Close()
Expand Down
5 changes: 4 additions & 1 deletion olric/olric.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ func (provider *Olric) SetMultiLevel(baseKey, variedKey string, value []byte, va
defer provider.dm.Put(dmap)

compressed := new(bytes.Buffer)
writer := lz4.NewWriter(compressed)
writer := core.Lz4WriterPool.Get().(*lz4.Writer)

writer.Reset(compressed)
defer core.Lz4WriterPool.Put(writer)

if _, err := writer.Write(value); err != nil {
_ = writer.Close()
Expand Down
Loading
Loading