-
Notifications
You must be signed in to change notification settings - Fork 634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: shutdown logger after container process exits #2337
base: main
Are you sure you want to change the base?
Changes from all commits
a45f5d1
5d5aa5c
6316ae9
17a33f9
808dd0c
0050ac5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,6 +18,7 @@ package main | |||||
|
||||||
import ( | ||||||
"fmt" | ||||||
"runtime" | ||||||
"strings" | ||||||
"testing" | ||||||
"time" | ||||||
|
@@ -143,3 +144,52 @@ func TestLogsWithFailingContainer(t *testing.T) { | |||||
base.Cmd("logs", "-f", containerName).AssertNoOut("baz") | ||||||
base.Cmd("rm", "-f", containerName).AssertOK() | ||||||
} | ||||||
|
||||||
func TestLogsWithRunningContainer(t *testing.T) { | ||||||
t.Parallel() | ||||||
base := testutil.NewBase(t) | ||||||
containerName := testutil.Identifier(t) | ||||||
defer base.Cmd("rm", "-f", containerName).Run() | ||||||
expected := make([]string, 10) | ||||||
for i := 0; i < 10; i++ { | ||||||
expected[i] = fmt.Sprint(i + 1) | ||||||
} | ||||||
|
||||||
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, | ||||||
"sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added sleep to have both the container task and the logs command running concurrently |
||||||
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) | ||||||
} | ||||||
|
||||||
func TestLogsWithoutNewlineOrEOF(t *testing.T) { | ||||||
if runtime.GOOS != "linux" { | ||||||
t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows") | ||||||
} | ||||||
t.Parallel() | ||||||
base := testutil.NewBase(t) | ||||||
containerName := testutil.Identifier(t) | ||||||
defer base.Cmd("rm", "-f", containerName).Run() | ||||||
expected := []string{"Hello World!", "There is no newline"} | ||||||
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, | ||||||
"printf", "'Hello World!\nThere is no newline'").AssertOK() | ||||||
time.Sleep(3 * time.Second) | ||||||
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) | ||||||
} | ||||||
|
||||||
func TestLogsAfterRestartingContainer(t *testing.T) { | ||||||
if runtime.GOOS != "linux" { | ||||||
t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem <id>: The requested operation for attach namespace failed.: unknown") | ||||||
} | ||||||
fahedouch marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
t.Parallel() | ||||||
base := testutil.NewBase(t) | ||||||
containerName := testutil.Identifier(t) | ||||||
defer base.Cmd("rm", "-f", containerName).Run() | ||||||
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, | ||||||
"printf", "'Hello World!\nThere is no newline'").AssertOK() | ||||||
expected := []string{"Hello World!", "There is no newline"} | ||||||
time.Sleep(3 * time.Second) | ||||||
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) | ||||||
// restart and check logs again | ||||||
base.Cmd("start", containerName) | ||||||
time.Sleep(3 * time.Second) | ||||||
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -27,9 +27,12 @@ import ( | |||||||
"path/filepath" | ||||||||
"sort" | ||||||||
"sync" | ||||||||
"time" | ||||||||
|
||||||||
"github.com/containerd/containerd" | ||||||||
"github.com/containerd/containerd/errdefs" | ||||||||
"github.com/containerd/containerd/runtime/v2/logging" | ||||||||
"github.com/containerd/nerdctl/pkg/lockutil" | ||||||||
"github.com/sirupsen/logrus" | ||||||||
) | ||||||||
|
||||||||
|
@@ -113,9 +116,10 @@ func Main(argv2 string) error { | |||||||
|
||||||||
// LogConfig is marshalled as "log-config.json" | ||||||||
type LogConfig struct { | ||||||||
Driver string `json:"driver"` | ||||||||
Opts map[string]string `json:"opts,omitempty"` | ||||||||
LogURI string `json:"-"` | ||||||||
Driver string `json:"driver"` | ||||||||
Opts map[string]string `json:"opts,omitempty"` | ||||||||
HostAddress string `json:"host"` | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Ideally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need a default value for |
||||||||
LogURI string `json:"-"` | ||||||||
} | ||||||||
|
||||||||
// LogConfigFilePath returns the path of log-config.json | ||||||||
|
@@ -140,10 +144,76 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { | |||||||
return logConfig, nil | ||||||||
} | ||||||||
|
||||||||
func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Config) error { | ||||||||
func getLockPath(dataStore, ns, id string) string { | ||||||||
return filepath.Join(dataStore, "containers", ns, id, "logger-lock") | ||||||||
} | ||||||||
|
||||||||
// WaitForLogger waits until the logger has finished executing and processing container logs | ||||||||
func WaitForLogger(dataStore, ns, id string) error { | ||||||||
return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error { | ||||||||
return nil | ||||||||
}) | ||||||||
} | ||||||||
|
||||||||
// getContainerWait loads the container from ID and returns its wait channel | ||||||||
func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { | ||||||||
client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
con, err := client.LoadContainer(ctx, config.ID) | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
|
||||||||
task, err := con.Task(ctx, nil) | ||||||||
if err == nil { | ||||||||
return task.Wait(ctx) | ||||||||
} | ||||||||
if !errdefs.IsNotFound(err) { | ||||||||
return nil, err | ||||||||
} | ||||||||
|
||||||||
// If task was not found, it's possible that the container runtime is still being created. | ||||||||
// Retry every 100ms. | ||||||||
ticker := time.NewTicker(100 * time.Millisecond) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
for { | ||||||||
select { | ||||||||
case <-ctx.Done(): | ||||||||
return nil, fmt.Errorf("timed out waiting for container task to start") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
case <-ticker.C: | ||||||||
task, err = con.Task(ctx, nil) | ||||||||
if err != nil { | ||||||||
if errdefs.IsNotFound(err) { | ||||||||
continue | ||||||||
} | ||||||||
return nil, err | ||||||||
} | ||||||||
return task.Wait(ctx) | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { | ||||||||
if err := driver.PreProcess(dataStore, config); err != nil { | ||||||||
return err | ||||||||
} | ||||||||
|
||||||||
// initialize goroutines to copy stdout and stderr streams to a closable pipe | ||||||||
stdoutR, stdoutW := io.Pipe() | ||||||||
stderrR, stderrW := io.Pipe() | ||||||||
copyStream := func(reader io.Reader, writer *io.PipeWriter) { | ||||||||
// copy using a buffer of size 32K | ||||||||
buf := make([]byte, 32<<10) | ||||||||
_, err := io.CopyBuffer(writer, reader, buf) | ||||||||
if err != nil { | ||||||||
logrus.Errorf("failed to copy stream: %s", err) | ||||||||
} | ||||||||
} | ||||||||
go copyStream(config.Stdout, stdoutW) | ||||||||
go copyStream(config.Stderr, stderrW) | ||||||||
|
||||||||
// scan and process logs from pipes | ||||||||
var wg sync.WaitGroup | ||||||||
wg.Add(3) | ||||||||
stdout := make(chan string, 10000) | ||||||||
|
@@ -161,12 +231,24 @@ func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Conf | |||||||
} | ||||||||
} | ||||||||
|
||||||||
go processLogFunc(config.Stdout, stdout) | ||||||||
go processLogFunc(config.Stderr, stderr) | ||||||||
go processLogFunc(stdoutR, stdout) | ||||||||
go processLogFunc(stderrR, stderr) | ||||||||
go func() { | ||||||||
defer wg.Done() | ||||||||
fahedouch marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
driver.Process(stdout, stderr) | ||||||||
}() | ||||||||
go func() { | ||||||||
// close stdout and stderr upon container exit | ||||||||
defer stdoutW.Close() | ||||||||
defer stderrW.Close() | ||||||||
|
||||||||
exitCh, err := getContainerWait(ctx, hostAddress, config) | ||||||||
if err != nil { | ||||||||
logrus.Errorf("failed to get container task wait channel: %v", err) | ||||||||
return | ||||||||
} | ||||||||
<-exitCh | ||||||||
}() | ||||||||
wg.Wait() | ||||||||
return driver.PostProcess() | ||||||||
} | ||||||||
|
@@ -175,7 +257,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { | |||||||
if dataStore == "" { | ||||||||
return nil, errors.New("got empty data store") | ||||||||
} | ||||||||
return func(_ context.Context, config *logging.Config, ready func() error) error { | ||||||||
return func(ctx context.Context, config *logging.Config, ready func() error) error { | ||||||||
if config.Namespace == "" || config.ID == "" { | ||||||||
return errors.New("got invalid config") | ||||||||
} | ||||||||
|
@@ -189,11 +271,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { | |||||||
if err != nil { | ||||||||
return err | ||||||||
} | ||||||||
if err := ready(); err != nil { | ||||||||
|
||||||||
loggerLock := getLockPath(dataStore, config.Namespace, config.ID) | ||||||||
f, err := os.Create(loggerLock) | ||||||||
if err != nil { | ||||||||
return err | ||||||||
} | ||||||||
defer f.Close() | ||||||||
|
||||||||
// the logger will obtain an exclusive lock on a file until the container is | ||||||||
// stopped and the driver has finished processing all output, | ||||||||
// so that waiting log viewers can be signalled when the process is complete. | ||||||||
return lockutil.WithDirLock(loggerLock, func() error { | ||||||||
if err := ready(); err != nil { | ||||||||
return err | ||||||||
} | ||||||||
|
||||||||
return loggingProcessAdapter(driver, dataStore, config) | ||||||||
return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) | ||||||||
}) | ||||||||
} else if !errors.Is(err, os.ErrNotExist) { | ||||||||
// the file does not exist if the container was created with nerdctl < 0.20 | ||||||||
return err | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what this test covers comparing to this one ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other test runs the log command after the container has stopped running, but this one tests logs during the task execution and as logs are being sent to the logger with the
--follow
option.