Commit a4ce79f1 authored by Darrell Kienzle's avatar Darrell Kienzle Committed by Daniel Robinson
Browse files

Accomodate Windows filepaths, Fix locking

cr: https://code.amazon.com/reviews/CR-54853932
parent 46cd37e8
......@@ -4,7 +4,6 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"runtime/debug"
"strings"
......@@ -50,7 +49,7 @@ type fileWatcherChannel struct {
recvCounter int
startTime string
watcher *fsnotify.Watcher
mu sync.RWMutex
mu sync.Mutex
closed bool
shouldDeleteAfterConsume bool
shouldReadRetry bool
......@@ -65,7 +64,10 @@ type fileWatcherChannel struct {
*/
func NewFileWatcherChannel(logger log.T, mode Mode, name string, shouldReadRetry bool) (*fileWatcherChannel, error) {
tmpPath := path.Join(name, "tmp")
// Clean the name -- replace any "/" with Separator (e.g. "\" on Windows) -- before it is passed to any other functions
name = filepath.Clean(name)
tmpPath := filepath.Join(name, "tmp")
curTime := time.Now()
//TODO if client is RunAs, server needs to grant client user R/W access respectively
if err := createIfNotExist(name); err != nil {
......@@ -134,21 +136,21 @@ func createIfNotExist(dir string) (err error) {
*/
func (ch *fileWatcherChannel) Send(rawJson string) error {
ch.mu.Lock()
defer ch.mu.Unlock()
if ch.closed {
return errors.New("channel already closed")
}
log := ch.logger
ch.mu.RLock()
defer ch.mu.RUnlock()
sequenceID := fmt.Sprintf("%v-%s-%03d", ch.mode, ch.startTime, ch.counter)
filepath := path.Join(ch.path, sequenceID)
tmp_filepath := path.Join(ch.tmpPath, sequenceID)
pathname := filepath.Join(ch.path, sequenceID)
tmp_pathname := filepath.Join(ch.tmpPath, sequenceID)
//ensure sync exclusive write
if err := ioutil.WriteFile(tmp_filepath, []byte(rawJson), defaultFileWriteMode); err != nil {
log.Errorf("write file %v encountered error: %v \n", tmp_filepath, err)
if err := ioutil.WriteFile(tmp_pathname, []byte(rawJson), defaultFileWriteMode); err != nil {
log.Errorf("write file %v encountered error: %v \n", tmp_pathname, err)
return err
}
if err := os.Rename(tmp_filepath, filepath); err != nil {
if err := os.Rename(tmp_pathname, pathname); err != nil {
log.Errorf("send renaming file encountered error: %v", err)
return err
}
......@@ -248,9 +250,8 @@ func (ch *fileWatcherChannel) Close() {
//parse the counter out of the sequence id, return -1 if parsing fails
//counter is defined as the padding last element of - separated integer
//On windows, path.Base() does not work
func parseSequenceCounter(filepath string) int {
_, name := path.Split(filepath)
func parseSequenceCounter(pathname string) int {
name := filepath.Base(pathname)
parts := strings.Split(name, "-")
counter, err := strconv.ParseInt(parts[len(parts)-1], 10, 64)
if err != nil {
......@@ -268,7 +269,7 @@ func (ch *fileWatcherChannel) consumeAll() {
for _, info := range fileInfos {
name := info.Name()
if ch.isReadable(name) {
ch.consume(path.Join(ch.path, name))
ch.consume(filepath.Join(ch.path, name))
} else {
ch.logger.Debugf("IPC file not readable: %s", name)
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment