Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug][storage] Make ES-Rollover idempotent by checking if the index or alias already exist #6638

Merged
merged 3 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 12 additions & 24 deletions cmd/es-rollover/app/init/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
package init

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"

"github.com/jaegertracing/jaeger/cmd/es-rollover/app"
"github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch/mappings"
Expand Down Expand Up @@ -69,30 +66,21 @@ func (c Action) Do() error {
}

func createIndexIfNotExist(c client.IndexAPI, index string) error {
err := c.CreateIndex(index)
exists, err := c.IndexExists(index)
if err != nil {
var esErr client.ResponseError
if errors.As(err, &esErr) {
if esErr.StatusCode != http.StatusBadRequest || esErr.Body == nil {
return esErr.Err
}
// check for the reason of the error
jsonError := map[string]any{}
err := json.Unmarshal(esErr.Body, &jsonError)
if err != nil {
// return unmarshal error
return err
}
errorMap := jsonError["error"].(map[string]any)
// check for reason, ignore already exist error
if strings.Contains(errorMap["type"].(string), "resource_already_exists_exception") {
return nil
}
}
// Return any other error unrelated to the response
return err
}
return nil
if exists {
return nil
}
aliasExists, err := c.AliasExists(index)
if err != nil {
return err
}
if aliasExists {
return nil
}
return c.CreateIndex(index)
}

func (c Action) init(version uint, indexopt app.IndexOption) error {
Expand Down
72 changes: 36 additions & 36 deletions cmd/es-rollover/app/init/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package init

import (
"errors"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -18,59 +17,52 @@ import (
)

func TestIndexCreateIfNotExist(t *testing.T) {
const esErrResponse = `{"error":{"root_cause":[{"type":"resource_already_exists_exception","reason":"]"}],"type":"resource_already_exists_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"},"status":400}`

tests := []struct {
name string
returnErr error
expectedErr error
containsError string
name string
indexExists bool
indexExistsErr error
aliasExists bool
aliasExistsErr error
createIndexErr error
expectedError string
}{
{
name: "success",
name: "success when index exists",
indexExists: true,
},
{
name: "generic error",
returnErr: errors.New("may be an http error?"),
expectedErr: errors.New("may be an http error?"),
name: "generic error from IndexExists",
indexExistsErr: errors.New("may be an http error from index exists"),
expectedError: "may be an http error from index exists",
},
{
name: "response error",
returnErr: client.ResponseError{
Err: errors.New("x"),
StatusCode: http.StatusForbidden,
},
expectedErr: errors.New("x"),
name: "success when alias exists",
aliasExists: true,
},
{
name: "unmarshal error",
returnErr: client.ResponseError{
Err: errors.New("x"),
StatusCode: http.StatusBadRequest,
Body: []byte("blablabla"),
},
containsError: "invalid character",
name: "generic error from AliasExists",
aliasExistsErr: errors.New("may be an http error from alias exists"),
expectedError: "may be an http error from alias exists",
},
{
name: "existing error",
returnErr: client.ResponseError{
Err: errors.New("x"),
StatusCode: http.StatusBadRequest,
Body: []byte(esErrResponse),
},
expectedErr: nil,
name: "generic error from create index",
createIndexErr: errors.New("may be an http error from create index"),
expectedError: "may be an http error from create index",
},
{
name: "success when index and alias does not exist",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
indexClient := &mocks.IndexAPI{}
indexClient.On("CreateIndex", "jaeger-span").Return(test.returnErr)
indexClient.On("IndexExists", "jaeger-span").Return(test.indexExists, test.indexExistsErr)
indexClient.On("AliasExists", "jaeger-span").Return(test.aliasExists, test.aliasExistsErr)
indexClient.On("CreateIndex", "jaeger-span").Return(test.createIndexErr)
err := createIndexIfNotExist(indexClient, "jaeger-span")
if test.containsError != "" {
assert.ErrorContains(t, err, test.containsError)
} else {
assert.Equal(t, test.expectedErr, err)
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
}
})
}
Expand Down Expand Up @@ -157,6 +149,8 @@ func TestRolloverAction(t *testing.T) {
name: "fail to get jaeger indices",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("AliasExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, errors.New("error getting jaeger indices"))
Expand All @@ -173,6 +167,8 @@ func TestRolloverAction(t *testing.T) {
name: "fail to create alias",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("AliasExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil)
Expand All @@ -193,6 +189,8 @@ func TestRolloverAction(t *testing.T) {
name: "create rollover index",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("AliasExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil)
Expand All @@ -213,6 +211,8 @@ func TestRolloverAction(t *testing.T) {
name: "create rollover index with ilm",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, ilmClient *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("AliasExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil)
Expand Down
17 changes: 17 additions & 0 deletions internal/storage/integration/es_index_rollover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ func TestIndexRollover_FailIfILMNotPresent(t *testing.T) {
assert.Empty(t, indices)
}

func TestIndexRollover_Idempotency(t *testing.T) {
SkipUnlessEnv(t, "elasticsearch", "opensearch")
t.Cleanup(func() {
testutils.VerifyGoLeaksOnceForES(t)
})
client, err := createESClient(t, getESHttpClient(t))
require.NoError(t, err)
// Make sure that es is clean before the test!
cleanES(t, client, defaultILMPolicyName)
err = runEsRollover("init", []string{}, false)
require.NoError(t, err)
// Run again and it should return without any error
err = runEsRollover("init", []string{}, false)
require.NoError(t, err)
cleanES(t, client, defaultILMPolicyName)
}

func TestIndexRollover_CreateIndicesWithILM(t *testing.T) {
SkipUnlessEnv(t, "elasticsearch", "opensearch")
t.Cleanup(func() {
Expand Down
36 changes: 36 additions & 0 deletions pkg/es/client/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,42 @@ func (i *IndicesClient) DeleteAlias(aliases []Alias) error {
return nil
}

// AliasExists check whether an alias exists or not
func (i *IndicesClient) AliasExists(alias string) (bool, error) {
_, err := i.request(elasticRequest{
endpoint: "_alias/" + alias,
method: http.MethodHead,
})
if err != nil {
var responseError ResponseError
if errors.As(err, &responseError) {
if responseError.StatusCode == http.StatusNotFound {
return false, nil
}
}
return false, fmt.Errorf("failed to check if alias exists: %w", err)
}
return true, nil
}

// IndexExists check whether an index exists or not
func (i *IndicesClient) IndexExists(index string) (bool, error) {
_, err := i.request(elasticRequest{
endpoint: index,
method: http.MethodHead,
})
if err != nil {
var responseError ResponseError
if errors.As(err, &responseError) {
if responseError.StatusCode == http.StatusNotFound {
return false, nil
}
}
return false, fmt.Errorf("failed to check if index exists: %w", err)
}
return true, nil
}

func (*IndicesClient) aliasesString(aliases []Alias) string {
concatAliases := ""
for _, alias := range aliases {
Expand Down
83 changes: 83 additions & 0 deletions pkg/es/client/index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,89 @@ func TestClientDeleteIndices(t *testing.T) {
}
}

func TestIndexExists(t *testing.T) {
t.Run("index exists", func(t *testing.T) {
testIndexOrAliasExistence(t, "index")
})
}

func TestAliasExists(t *testing.T) {
t.Run("alias exists", func(t *testing.T) {
testIndexOrAliasExistence(t, "alias")
})
}

func testIndexOrAliasExistence(t *testing.T, existence string) {
maxURLPathLength := 4000
type indexOrAliasExistence struct {
name string
exists bool
responseCode int
expectedErr string
}
tests := []indexOrAliasExistence{
{
name: "exists",
responseCode: http.StatusOK,
exists: true,
},
{
name: "not exists",
responseCode: http.StatusNotFound,
exists: false,
},
}
if existence == "index" {
test := indexOrAliasExistence{
name: "generic error",
responseCode: http.StatusBadRequest,
expectedErr: "failed to check if index exists: request failed, status code: 400",
}
tests = append(tests, test)
} else if existence == "alias" {
test := indexOrAliasExistence{
name: "generic error",
responseCode: http.StatusBadRequest,
expectedErr: "failed to check if alias exists: request failed, status code: 400",
}
tests = append(tests, test)
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
apiTriggered := false
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
apiTriggered = true
assert.Equal(t, http.MethodHead, req.Method)
assert.Equal(t, "Basic foobar", req.Header.Get("Authorization"))
assert.LessOrEqual(t, len(req.URL.Path), maxURLPathLength)
res.WriteHeader(test.responseCode)
}))
defer testServer.Close()
c := &IndicesClient{
Client: Client{
Client: testServer.Client(),
Endpoint: testServer.URL,
BasicAuth: "foobar",
},
}
var exists bool
var err error
if existence == "index" {
exists, err = c.IndexExists("jaeger-span")
} else if existence == "alias" {
exists, err = c.AliasExists("jaeger-span")
}
if test.expectedErr != "" {
require.ErrorContains(t, err, test.expectedErr)
} else {
require.NoError(t, err)
}
assert.True(t, apiTriggered)
assert.Equal(t, test.exists, exists)
})
}
}

func TestClientRequestError(t *testing.T) {
c := &IndicesClient{
Client: Client{
Expand Down
2 changes: 2 additions & 0 deletions pkg/es/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package client

type IndexAPI interface {
GetJaegerIndices(prefix string) ([]Index, error)
IndexExists(index string) (bool, error)
AliasExists(alias string) (bool, error)
DeleteIndices(indices []Index) error
CreateIndex(index string) error
CreateAlias(aliases []Alias) error
Expand Down
Loading
Loading