Skip to content

Commit 981fdb2

Browse files
update storage-api to only build index if it owns the namespace (#108418)
* update storage-api to only build index if it owns the namespace --------- Co-authored-by: Mustafa Sencer Özcan <[email protected]>
1 parent 4df4f9c commit 981fdb2

File tree

6 files changed

+73
-16
lines changed

6 files changed

+73
-16
lines changed

pkg/storage/unified/resource/search.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"cmp"
55
"context"
66
"fmt"
7+
"hash/fnv"
78
"log/slog"
89
"slices"
910
"strings"
@@ -19,6 +20,7 @@ import (
1920
"k8s.io/apimachinery/pkg/runtime/schema"
2021

2122
"github.com/grafana/authlib/types"
23+
"github.com/grafana/dskit/ring"
2224

2325
dashboardv1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1beta1"
2426
folders "github.com/grafana/grafana/apps/folder/pkg/apis/folder/v1beta1"
@@ -109,6 +111,9 @@ type searchSupport struct {
109111
initMinSize int
110112
initMaxSize int
111113

114+
ring *ring.Ring
115+
ringLifecycler *ring.BasicLifecycler
116+
112117
buildIndex singleflight.Group
113118

114119
// Index queue processors
@@ -128,7 +133,7 @@ var (
128133
_ resourcepb.ManagedObjectIndexServer = (*searchSupport)(nil)
129134
)
130135

131-
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, tracer trace.Tracer, indexMetrics *BleveIndexMetrics) (support *searchSupport, err error) {
136+
func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.AccessClient, blob BlobSupport, tracer trace.Tracer, indexMetrics *BleveIndexMetrics, ring *ring.Ring, ringLifecycler *ring.BasicLifecycler) (support *searchSupport, err error) {
132137
// No backend search support
133138
if opts.Backend == nil {
134139
return nil, nil
@@ -155,6 +160,8 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A
155160
indexEventsChan: make(chan *IndexEvent),
156161
indexQueueProcessors: make(map[string]*indexQueueProcessor),
157162
rebuildInterval: opts.RebuildInterval,
163+
ring: ring,
164+
ringLifecycler: ringLifecycler,
158165
}
159166

160167
info, err := opts.Resources.GetDocumentBuilders()
@@ -379,6 +386,33 @@ func (s *searchSupport) GetStats(ctx context.Context, req *resourcepb.ResourceSt
379386
return rsp, nil
380387
}
381388

389+
func (s *searchSupport) shouldBuildIndex(info ResourceStats) bool {
390+
if s.ring == nil {
391+
s.log.Debug("ring is not setup. Will proceed to build index")
392+
return true
393+
}
394+
395+
if s.ringLifecycler == nil {
396+
s.log.Error("missing ring lifecycler")
397+
return true
398+
}
399+
400+
ringHasher := fnv.New32a()
401+
_, err := ringHasher.Write([]byte(info.Namespace))
402+
if err != nil {
403+
s.log.Error("error hashing namespace", "namespace", info.Namespace, "err", err)
404+
return true
405+
}
406+
407+
rs, err := s.ring.GetWithOptions(ringHasher.Sum32(), searchOwnerRead, ring.WithReplicationFactor(s.ring.ReplicationFactor()))
408+
if err != nil {
409+
s.log.Error("error getting replicaset from ring", "namespace", info.Namespace, "err", err)
410+
return true
411+
}
412+
413+
return rs.Includes(s.ringLifecycler.GetInstanceAddr())
414+
}
415+
382416
func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, error) {
383417
totalBatchesIndexed := 0
384418
group := errgroup.Group{}
@@ -395,6 +429,11 @@ func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, er
395429
continue
396430
}
397431

432+
if !s.shouldBuildIndex(info) {
433+
s.log.Debug("skip building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource)
434+
continue
435+
}
436+
398437
group.Go(func() error {
399438
if rebuild {
400439
// we need to clear the cache to make sure we get the latest usage insights data

pkg/storage/unified/resource/search_server_distributor.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,14 @@ type distributorServer struct {
8181
log log.Logger
8282
}
8383

84-
var activeRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
85-
return s != ring.ACTIVE
86-
})
84+
var (
85+
// operation used by the distributor to select only ACTIVE instances to handle search-related requests
86+
searchRingRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
87+
return s != ring.ACTIVE
88+
})
89+
// operation used by the search-servers to check if they own the namespace
90+
searchOwnerRead = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
91+
)
8792

8893
func (ds *distributorServer) Search(ctx context.Context, r *resourcepb.ResourceSearchRequest) (*resourcepb.ResourceSearchResponse, error) {
8994
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Search")
@@ -128,7 +133,7 @@ func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, n
128133
return ctx, nil, err
129134
}
130135

131-
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), activeRingOp, ring.WithReplicationFactor(ds.ring.ReplicationFactor()))
136+
rs, err := ds.ring.GetWithOptions(ringHasher.Sum32(), searchRingRead, ring.WithReplicationFactor(ds.ring.ReplicationFactor()))
132137
if err != nil {
133138
return ctx, nil, err
134139
}

pkg/storage/unified/resource/search_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func TestBuildIndexes_MaxCountThreshold(t *testing.T) {
246246
InitMaxCount: tt.initMaxSize,
247247
}
248248

249-
support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil)
249+
support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil)
250250
require.NoError(t, err)
251251
require.NotNil(t, support)
252252

@@ -304,7 +304,7 @@ func TestSearchGetOrCreateIndex(t *testing.T) {
304304
InitMaxCount: 0,
305305
}
306306

307-
support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil)
307+
support, err := newSearchSupport(opts, storage, nil, nil, noop.NewTracerProvider().Tracer("test"), nil, nil, nil)
308308
require.NoError(t, err)
309309
require.NotNil(t, support)
310310

pkg/storage/unified/resource/server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
claims "github.com/grafana/authlib/types"
2222
"github.com/grafana/dskit/backoff"
23+
"github.com/grafana/dskit/ring"
2324

2425
"github.com/grafana/grafana/pkg/apimachinery/utils"
2526
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
@@ -180,6 +181,8 @@ type SearchOptions struct {
180181

181182
// Interval for periodic index rebuilds (0 disables periodic rebuilds)
182183
RebuildInterval time.Duration
184+
185+
Ring *ring.Ring
183186
}
184187

185188
type ResourceServerOptions struct {
@@ -223,6 +226,9 @@ type ResourceServerOptions struct {
223226

224227
// QOSQueue is the quality of service queue used to enqueue
225228
QOSQueue QOSEnqueuer
229+
230+
Ring *ring.Ring
231+
RingLifecycler *ring.BasicLifecycler
226232
}
227233

228234
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
@@ -305,7 +311,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
305311

306312
if opts.Search.Resources != nil {
307313
var err error
308-
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics)
314+
s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer, opts.IndexMetrics, opts.Ring, opts.RingLifecycler)
309315
if err != nil {
310316
return nil, err
311317
}

pkg/storage/unified/sql/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"go.opentelemetry.io/otel/trace"
1010

1111
"github.com/grafana/authlib/types"
12+
"github.com/grafana/dskit/ring"
1213
"github.com/grafana/dskit/services"
1314

1415
infraDB "github.com/grafana/grafana/pkg/infra/db"
@@ -37,6 +38,8 @@ type ServerOptions struct {
3738
IndexMetrics *resource.BleveIndexMetrics
3839
Features featuremgmt.FeatureToggles
3940
QOSQueue QOSEnqueueDequeuer
41+
Ring *ring.Ring
42+
RingLifecycler *ring.BasicLifecycler
4043
}
4144

4245
// Creates a new ResourceServer
@@ -96,6 +99,8 @@ func NewResourceServer(
9699
serverOptions.Search = opts.SearchOptions
97100
serverOptions.IndexMetrics = opts.IndexMetrics
98101
serverOptions.QOSQueue = opts.QOSQueue
102+
serverOptions.Ring = opts.Ring
103+
serverOptions.RingLifecycler = opts.RingLifecycler
99104

100105
return resource.NewResourceServer(serverOptions)
101106
}

pkg/storage/unified/sql/service.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ type service struct {
7575

7676
docBuilders resource.DocumentBuilderSupplier
7777

78-
storageRing *ring.Ring
79-
lifecycler *ring.BasicLifecycler
78+
searchRing *ring.Ring
79+
ringLifecycler *ring.BasicLifecycler
8080

8181
queue QOSEnqueueDequeuer
8282
scheduler *scheduler.Scheduler
@@ -91,7 +91,7 @@ func ProvideUnifiedStorageGrpcService(
9191
docBuilders resource.DocumentBuilderSupplier,
9292
storageMetrics *resource.StorageMetrics,
9393
indexMetrics *resource.BleveIndexMetrics,
94-
storageRing *ring.Ring,
94+
searchRing *ring.Ring,
9595
memberlistKVConfig kv.Config,
9696
) (UnifiedStorageGrpcService, error) {
9797
var err error
@@ -116,7 +116,7 @@ func ProvideUnifiedStorageGrpcService(
116116
docBuilders: docBuilders,
117117
storageMetrics: storageMetrics,
118118
indexMetrics: indexMetrics,
119-
storageRing: storageRing,
119+
searchRing: searchRing,
120120
subservicesWatcher: services.NewFailureWatcher(),
121121
}
122122

@@ -143,7 +143,7 @@ func ProvideUnifiedStorageGrpcService(
143143
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
144144
delegate = ring.NewAutoForgetDelegate(resource.RingHeartbeatTimeout*2, delegate, log)
145145

146-
s.lifecycler, err = ring.NewBasicLifecycler(
146+
s.ringLifecycler, err = ring.NewBasicLifecycler(
147147
lifecyclerCfg,
148148
resource.RingName,
149149
resource.RingKey,
@@ -155,7 +155,7 @@ func ProvideUnifiedStorageGrpcService(
155155
if err != nil {
156156
return nil, fmt.Errorf("failed to initialize storage-ring lifecycler: %s", err)
157157
}
158-
subservices = append(subservices, s.lifecycler)
158+
subservices = append(subservices, s.ringLifecycler)
159159
}
160160

161161
if cfg.QOSEnabled {
@@ -220,6 +220,8 @@ func (s *service) starting(ctx context.Context) error {
220220
IndexMetrics: s.indexMetrics,
221221
Features: s.features,
222222
QOSQueue: s.queue,
223+
Ring: s.searchRing,
224+
RingLifecycler: s.ringLifecycler,
223225
}
224226
server, err := NewResourceServer(serverOptions)
225227
if err != nil {
@@ -254,12 +256,12 @@ func (s *service) starting(ctx context.Context) error {
254256
s.log.Info("waiting until resource server is JOINING in the ring")
255257
lfcCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
256258
defer cancel()
257-
if err := ring.WaitInstanceState(lfcCtx, s.storageRing, s.lifecycler.GetInstanceID(), ring.JOINING); err != nil {
259+
if err := ring.WaitInstanceState(lfcCtx, s.searchRing, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
258260
return fmt.Errorf("error switching to JOINING in the ring: %s", err)
259261
}
260262
s.log.Info("resource server is JOINING in the ring")
261263

262-
if err := s.lifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
264+
if err := s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
263265
return fmt.Errorf("error switching to ACTIVE in the ring: %s", err)
264266
}
265267
s.log.Info("resource server is ACTIVE in the ring")

0 commit comments

Comments
 (0)