Commit a4d5f9da authored by Ivan Aguilar's avatar Ivan Aguilar Committed by Leo Ahnn
Browse files

Update time comparison during message poll

parent 6a83aefd
...@@ -75,7 +75,7 @@ func (s *RunCommandService) ModuleExecute() (err error) { ...@@ -75,7 +75,7 @@ func (s *RunCommandService) ModuleExecute() (err error) {
return return
} }
log.Info("Starting message polling") log.Info("Scheduling message polling")
s.messagePollWaitGroup = &sync.WaitGroup{} s.messagePollWaitGroup = &sync.WaitGroup{}
if s.messagePollJob, err = scheduler.Every(pollMessageFrequencyMinutes).Minutes().Run(s.messagePollLoop); err != nil { if s.messagePollJob, err = scheduler.Every(pollMessageFrequencyMinutes).Minutes().Run(s.messagePollLoop); err != nil {
s.context.Log().Errorf("unable to schedule message poll job. %v", err) s.context.Log().Errorf("unable to schedule message poll job. %v", err)
......
...@@ -63,9 +63,12 @@ func (s *RunCommandService) messagePollLoop() { ...@@ -63,9 +63,12 @@ func (s *RunCommandService) messagePollLoop() {
// time lock to only have one loop active anytime. // time lock to only have one loop active anytime.
// this is extra insurance to prevent any race condition // this is extra insurance to prevent any race condition
pollStartTime := time.Now() pollStartTime := time.Now()
log := s.context.Log()
if s.name == mdsName {
log.Debug("Starting message poll")
}
updateLastPollTime(s.name, pollStartTime) updateLastPollTime(s.name, pollStartTime)
log := s.context.Log()
if err := s.checkStopPolicy(log); err != nil { if err := s.checkStopPolicy(log); err != nil {
return return
} }
...@@ -84,9 +87,13 @@ func (s *RunCommandService) messagePollLoop() { ...@@ -84,9 +87,13 @@ func (s *RunCommandService) messagePollLoop() {
// check if any other poll loop has started in the meantime // check if any other poll loop has started in the meantime
// to prevent any possible race condition due to the scheduler // to prevent any possible race condition due to the scheduler
if getLastPollTime(s.name) == pollStartTime { if pollStartTime.Equal(getLastPollTime(s.name)) {
// skip waiting for the next scheduler polling event and start polling immediately // skip waiting for the next scheduler polling event and start polling immediately
scheduleNextRun(s.messagePollJob) scheduleNextRun(s.messagePollJob)
} else {
if s.name == mdsName {
log.Debugf("Other message poll already started at %v, scheduler wait will not be skipped", getLastPollTime(s.name))
}
} }
} }
...@@ -150,9 +157,6 @@ func (s *RunCommandService) stop() { ...@@ -150,9 +157,6 @@ func (s *RunCommandService) stop() {
// pollOnce calls GetMessages once and processes the result. // pollOnce calls GetMessages once and processes the result.
func (s *RunCommandService) pollOnce() { func (s *RunCommandService) pollOnce() {
log := s.context.Log() log := s.context.Log()
if s.name == mdsName {
log.Debugf("Polling for messages")
}
messages, err := s.service.GetMessages(log, s.config.InstanceID) messages, err := s.service.GetMessages(log, s.config.InstanceID)
if err != nil { if err != nil {
sdkutil.HandleAwsError(log, err, s.processorStopPolicy) sdkutil.HandleAwsError(log, err, s.processorStopPolicy)
...@@ -166,6 +170,6 @@ func (s *RunCommandService) pollOnce() { ...@@ -166,6 +170,6 @@ func (s *RunCommandService) pollOnce() {
processMessage(s, msg) processMessage(s, msg)
} }
if s.name == mdsName { if s.name == mdsName {
log.Debugf("Done poll once") log.Debugf("Finished message poll")
} }
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment