Commit ee8ebf12 authored by Vishnu Karthik Ravindran's avatar Vishnu Karthik Ravindran Committed by Faho Shubladze
Browse files

fix random ssm agent worker hang issue

cr: https://code.amazon.com/reviews/CR-55921914
parent 980e3544
......@@ -129,10 +129,7 @@ func DefaultConfig() SsmagentConfig {
ForceFileIPC: false,
GoMaxProcForAgentWorker: 0,
}
// setting the default GoMaxProc value to 1 for windows
if runtime.GOOS == "windows" {
agent.GoMaxProcForAgentWorker = 1
}
var os = OsInfo{
Lang: "en-US",
Version: "1",
......
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// package filewatcherbasedipc is used to establish IPC between master and workers using files.
package filewatcherbasedipc
import (
......@@ -50,9 +64,10 @@ type fileWatcherChannel struct {
startTime string
watcher *fsnotify.Watcher
mu sync.Mutex
closed bool
shouldDeleteAfterConsume bool
shouldReadRetry bool
isWatcherClosed bool
watcherClosedChan chan bool
}
//TODO make this constructor private
......@@ -101,16 +116,17 @@ func NewFileWatcherChannel(logger log.T, mode Mode, name string, shouldReadRetry
}
ch := &fileWatcherChannel{
path: name,
tmpPath: tmpPath,
watcher: watcher,
onMessageChan: onMessageChan,
logger: logger,
mode: mode,
counter: 0,
recvCounter: 0,
shouldReadRetry: shouldReadRetry,
startTime: fmt.Sprintf("%04d%02d%02d%02d%02d%02d", curTime.Year(), curTime.Month(), curTime.Day(), curTime.Hour(), curTime.Minute(), curTime.Second()),
path: name,
tmpPath: tmpPath,
watcher: watcher,
onMessageChan: onMessageChan,
logger: logger,
mode: mode,
counter: 0,
recvCounter: 0,
shouldReadRetry: shouldReadRetry,
watcherClosedChan: make(chan bool, 1),
startTime: fmt.Sprintf("%04d%02d%02d%02d%02d%02d", curTime.Year(), curTime.Month(), curTime.Day(), curTime.Hour(), curTime.Minute(), curTime.Second()),
}
if ch.mode == ModeRespondent {
ch.shouldDeleteAfterConsume = false
......@@ -138,7 +154,7 @@ func createIfNotExist(dir string) (err error) {
func (ch *fileWatcherChannel) Send(rawJson string) error {
ch.mu.Lock()
defer ch.mu.Unlock()
if ch.closed {
if ch.isWatcherClosed {
return errors.New("channel already closed")
}
log := ch.logger
......@@ -222,30 +238,43 @@ func (ch *fileWatcherChannel) isFileFromSameMode(filename string) bool {
// Close a filechannel
// non-blocking call, drain the buffered messages and clear file watcher resources
func (ch *fileWatcherChannel) Close() {
if ch.closed {
if ch.closeIfNotClosed() {
ch.logger.Infof("file channel already closed: %v", ch.path)
return
}
log := ch.logger
log.Infof("channel %v requested close", ch.path)
completedWatcherCleanup := make(chan bool, 1)
go ch.cleanUpWatcher(completedWatcherCleanup, log)
select {
case <-completedWatcherCleanup:
case <-time.After(time.Second):
log.Infof("allocated file watcher cleanup time expired")
}
ch.consumeAll() //read all the left over messages
// would be better to close after all the messages are read
close(ch.onMessageChan)
}
func (ch *fileWatcherChannel) closeIfNotClosed() bool {
ch.mu.Lock()
defer ch.mu.Unlock()
if ch.isWatcherClosed {
return true
}
// close the file watcher listener thread
ch.watcherClosedChan <- true
//block other threads to call Send()
ch.closed = true
//read all the left over messages
ch.consumeAll()
// fsnotify.watch.close() could be a blocking call, we should offload them to a different go-routine
go func() {
defer func() {
if msg := recover(); msg != nil {
log.Errorf("closing file watcher panics: %v", msg)
}
close(ch.onMessageChan)
log.Infof("channel %v closed", ch.path)
}()
//make sure the file watcher closed as well as the watch list is removed, otherwise can cause leak in ubuntu kernel
ch.watcher.Remove(ch.path)
ch.watcher.Close()
}()
ch.isWatcherClosed = true
return
return false
}
//parse the counter out of the sequence id, return -1 if parsing fails
......@@ -394,6 +423,9 @@ func (ch *fileWatcherChannel) watch() {
ch.consumeAll()
for {
select {
case <-ch.watcherClosedChan:
log.Infof("Closed the file watcher listener thread")
return
case event, ok := <-ch.watcher.Events:
if !ok {
log.Debug("fileWatcher already closed")
......
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// package filewatcherbasedipc is used to establish IPC between master and workers using files.
//
//+build darwin freebsd linux netbsd openbsd
package filewatcherbasedipc
import (
"github.com/aws/amazon-ssm-agent/agent/log"
)
// cleanUpWatcher removes and closes the file watchers added in this file
func (ch *fileWatcherChannel) cleanUpWatcher(completedWatcherCleanup chan bool, log log.T) {
defer func() {
completedWatcherCleanup <- true
if msg := recover(); msg != nil {
log.Errorf("file watcher remove/close panics: %v", msg)
}
log.Infof("channel %v closed", ch.path)
}()
//make sure the file watcher closed as well as the watch list is removed, otherwise can cause leak in ubuntu kernel
//TODO: Should test various platforms by removing the below Remove() as it is being called inside Close() again. Following the previous developer for now based on the above comment.
if removeError := ch.watcher.Remove(ch.path); removeError != nil {
log.Warnf("file watcher remove error: %v", removeError)
}
if closeError := ch.watcher.Close(); closeError != nil {
log.Warnf("file watcher close error: %v", closeError)
}
}
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// package filewatcherbasedipc is used to establish IPC between master and workers using files.
//
//+build windows
package filewatcherbasedipc
import (
"github.com/aws/amazon-ssm-agent/agent/log"
)
// cleanUpWatcher removes and closes the file watchers added in this file
func (ch *fileWatcherChannel) cleanUpWatcher(completedWatcherCleanup chan bool, log log.T) {
defer func() {
completedWatcherCleanup <- true
if msg := recover(); msg != nil {
log.Errorf("file watcher remove/close panics: %v", msg)
}
log.Warnf("channel %v closed", ch.path)
}()
// do not call watcher.Remove() for windows as it leaks file handles when done in our case
if closeError := ch.watcher.Close(); closeError != nil {
log.Warnf("file watcher close error: %v", closeError)
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment