Commit 09ab4ddd authored by nitikag's avatar nitikag Committed by Daniel Robinson
Browse files

Update datachannel retry strategy to not retry for a specific error scenario

cr https://code.amazon.com/reviews/CR-54790048
parent 5742704c
...@@ -65,6 +65,9 @@ const ( ...@@ -65,6 +65,9 @@ const (
DataChannelRetryInitialDelayMillis = 100 DataChannelRetryInitialDelayMillis = 100
DataChannelRetryMaxIntervalMillis = 5000 DataChannelRetryMaxIntervalMillis = 5000
// MGS Errors
SessionAlreadyTerminatedError = "Session is already terminated"
IpcFileName = "ipcTempFile" IpcFileName = "ipcTempFile"
ExecOutputFileName = "output" ExecOutputFileName = "output"
LogFileExtension = ".log" LogFileExtension = ".log"
......
...@@ -123,6 +123,7 @@ func (controlChannel *ControlChannel) SetWebSocket(context context.T, ...@@ -123,6 +123,7 @@ func (controlChannel *ControlChannel) SetWebSocket(context context.T,
InitialDelayInMilli: rand.Intn(mgsConfig.ControlChannelRetryInitialDelayMillis) + mgsConfig.ControlChannelRetryInitialDelayMillis, InitialDelayInMilli: rand.Intn(mgsConfig.ControlChannelRetryInitialDelayMillis) + mgsConfig.ControlChannelRetryInitialDelayMillis,
MaxDelayInMilli: mgsConfig.ControlChannelRetryMaxIntervalMillis, MaxDelayInMilli: mgsConfig.ControlChannelRetryMaxIntervalMillis,
MaxAttempts: mgsConfig.ControlChannelNumMaxRetries, MaxAttempts: mgsConfig.ControlChannelNumMaxRetries,
NonRetryableErrors: getNonRetryableControlChannelErrors(),
} }
// add a jitter to the first control-channel call // add a jitter to the first control-channel call
...@@ -364,3 +365,8 @@ func delayWithJitter(maxDelayMillis int64) { ...@@ -364,3 +365,8 @@ func delayWithJitter(maxDelayMillis int64) {
jitter := rand.Int63n(maxDelayMillis) jitter := rand.Int63n(maxDelayMillis)
time.Sleep(time.Duration(jitter) * time.Millisecond) time.Sleep(time.Duration(jitter) * time.Millisecond)
} }
// getNonRetryableControlChannelErrors returns list of non retryable errors for control channel retry strategy
func getNonRetryableControlChannelErrors() []string {
return []string{}
}
...@@ -299,6 +299,7 @@ func (dataChannel *DataChannel) SetWebSocket(context context.T, ...@@ -299,6 +299,7 @@ func (dataChannel *DataChannel) SetWebSocket(context context.T,
InitialDelayInMilli: rand.Intn(mgsConfig.DataChannelRetryInitialDelayMillis) + mgsConfig.DataChannelRetryInitialDelayMillis, InitialDelayInMilli: rand.Intn(mgsConfig.DataChannelRetryInitialDelayMillis) + mgsConfig.DataChannelRetryInitialDelayMillis,
MaxDelayInMilli: mgsConfig.DataChannelRetryMaxIntervalMillis, MaxDelayInMilli: mgsConfig.DataChannelRetryMaxIntervalMillis,
MaxAttempts: mgsConfig.DataChannelNumMaxAttempts, MaxAttempts: mgsConfig.DataChannelNumMaxAttempts,
NonRetryableErrors: getNonRetryableDataChannelErrors(),
} }
if _, err := retryer.Call(); err != nil { if _, err := retryer.Call(); err != nil {
log.Error(err) log.Error(err)
...@@ -1142,3 +1143,8 @@ func getMgsEndpoint(context context.T, region string) (string, error) { ...@@ -1142,3 +1143,8 @@ func getMgsEndpoint(context context.T, region string) (string, error) {
endpointBuilder.WriteString(hostName) endpointBuilder.WriteString(hostName)
return endpointBuilder.String(), nil return endpointBuilder.String(), nil
} }
// getNonRetryableDataChannelErrors returns list of non retryable errors for data channel retry strategy
func getNonRetryableDataChannelErrors() []string {
return []string{mgsConfig.SessionAlreadyTerminatedError}
}
...@@ -17,6 +17,7 @@ package retry ...@@ -17,6 +17,7 @@ package retry
import ( import (
"math" "math"
"math/rand" "math/rand"
"strings"
"time" "time"
) )
...@@ -34,6 +35,7 @@ type ExponentialRetryer struct { ...@@ -34,6 +35,7 @@ type ExponentialRetryer struct {
InitialDelayInMilli int InitialDelayInMilli int
MaxDelayInMilli int MaxDelayInMilli int
MaxAttempts int MaxAttempts int
NonRetryableErrors []string
} }
// Init initializes the retryer // Init initializes the retryer
...@@ -63,7 +65,7 @@ func (retryer *ExponentialRetryer) Call() (channel interface{}, err error) { ...@@ -63,7 +65,7 @@ func (retryer *ExponentialRetryer) Call() (channel interface{}, err error) {
failedAttemptsSoFar := 0 failedAttemptsSoFar := 0
for { for {
channel, err := retryer.CallableFunc() channel, err := retryer.CallableFunc()
if err == nil || failedAttemptsSoFar == retryer.MaxAttempts { if err == nil || failedAttemptsSoFar == retryer.MaxAttempts || retryer.isNonRetryableError(err) {
return channel, err return channel, err
} }
sleep, exceedMaxDelay := retryer.NextSleepTime(attempt) sleep, exceedMaxDelay := retryer.NextSleepTime(attempt)
...@@ -74,3 +76,13 @@ func (retryer *ExponentialRetryer) Call() (channel interface{}, err error) { ...@@ -74,3 +76,13 @@ func (retryer *ExponentialRetryer) Call() (channel interface{}, err error) {
failedAttemptsSoFar++ failedAttemptsSoFar++
} }
} }
// isNonRetryableError returns true if passed error is in the list of NonRetryableErrors
func (retryer *ExponentialRetryer) isNonRetryableError(err error) bool {
for _, nonRetryableError := range retryer.NonRetryableErrors {
if strings.Contains(err.Error(), nonRetryableError) {
return true
}
}
return false
}
...@@ -37,6 +37,8 @@ var ( ...@@ -37,6 +37,8 @@ var (
totalAttempts = totalAttempts + 1 totalAttempts = totalAttempts + 1
return RetryCounter{TotalAttempts: totalAttempts}, errors.New("error occured in callable function") return RetryCounter{TotalAttempts: totalAttempts}, errors.New("error occured in callable function")
} }
nonRetryableError = "non retryable error"
retryableError = "retryable error"
) )
func TestRepeatableExponentialRetryerRetriesForGivenNumberOfMaxAttempts(t *testing.T) { func TestRepeatableExponentialRetryerRetriesForGivenNumberOfMaxAttempts(t *testing.T) {
...@@ -47,6 +49,7 @@ func TestRepeatableExponentialRetryerRetriesForGivenNumberOfMaxAttempts(t *testi ...@@ -47,6 +49,7 @@ func TestRepeatableExponentialRetryerRetriesForGivenNumberOfMaxAttempts(t *testi
initialDelayInMilli, initialDelayInMilli,
maxDelayInMilli, maxDelayInMilli,
maxAttempts, maxAttempts,
[]string{},
} }
retryCounterInterface, err := retryer.Call() retryCounterInterface, err := retryer.Call()
...@@ -65,9 +68,82 @@ func TestExponentialRetryerWithJitter(t *testing.T) { ...@@ -65,9 +68,82 @@ func TestExponentialRetryerWithJitter(t *testing.T) {
initialDelayInMilli, initialDelayInMilli,
maxDelayInMilli, maxDelayInMilli,
1, 1,
[]string{},
} }
minDelay := int64(initialDelayInMilli) * time.Millisecond.Nanoseconds() minDelay := int64(initialDelayInMilli) * time.Millisecond.Nanoseconds()
maxDelay := int64(float64(minDelay) * (1.0 + jitterRatio)) maxDelay := int64(float64(minDelay) * (1.0 + jitterRatio))
sleep, _ := retryerWithJitter.NextSleepTime(0) sleep, _ := retryerWithJitter.NextSleepTime(0)
assert.True(t, sleep.Nanoseconds() >= minDelay && sleep.Nanoseconds() < maxDelay) assert.True(t, sleep.Nanoseconds() >= minDelay && sleep.Nanoseconds() < maxDelay)
} }
func TestRepeatableExponentialRetryerDoesNotRetryInCaseOfNoError(t *testing.T) {
totalAttempts := 0
callableFunc := func() (interface{}, error) {
totalAttempts = totalAttempts + 1
return RetryCounter{TotalAttempts: totalAttempts}, nil
}
retryer := ExponentialRetryer{
callableFunc,
retryGeometricRatio,
jitterRatio,
initialDelayInMilli,
maxDelayInMilli,
maxAttempts,
[]string{nonRetryableError},
}
retryCounterInterface, err := retryer.Call()
retryCounter := retryCounterInterface.(RetryCounter)
assert.Nil(t, err)
assert.Equal(t, retryCounter.TotalAttempts, 1)
}
func TestRepeatableExponentialRetryerDoesNotRetryInCaseOfNonRetryableError(t *testing.T) {
totalAttempts := 0
callableFunc := func() (interface{}, error) {
totalAttempts = totalAttempts + 1
return RetryCounter{TotalAttempts: totalAttempts}, errors.New(nonRetryableError)
}
retryer := ExponentialRetryer{
callableFunc,
retryGeometricRatio,
jitterRatio,
initialDelayInMilli,
maxDelayInMilli,
maxAttempts,
[]string{nonRetryableError},
}
retryCounterInterface, err := retryer.Call()
retryCounter := retryCounterInterface.(RetryCounter)
assert.NotNil(t, err)
assert.Equal(t, retryCounter.TotalAttempts, 1)
}
func TestRepeatableExponentialRetryerRetriesInCaseOfRetryableError(t *testing.T) {
totalAttempts := 0
callableFunc := func() (interface{}, error) {
totalAttempts = totalAttempts + 1
return RetryCounter{TotalAttempts: totalAttempts}, errors.New(retryableError)
}
retryer := ExponentialRetryer{
callableFunc,
retryGeometricRatio,
jitterRatio,
initialDelayInMilli,
maxDelayInMilli,
maxAttempts,
[]string{nonRetryableError},
}
retryCounterInterface, err := retryer.Call()
retryCounter := retryCounterInterface.(RetryCounter)
assert.NotNil(t, err)
assert.Equal(t, retryCounter.TotalAttempts, maxAttempts+1)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment