Skip to content

Commit

Permalink
Merge pull request #74 from drone-runners/kube-error-rework
Browse files Browse the repository at this point in the history
(fix) updated code for k8s error detection: removed 'aborted container' error
  • Loading branch information
marko-gacesa authored Sep 9, 2021
2 parents b358a1e + 9c8d91a commit 25a6660
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 47 deletions.
2 changes: 1 addition & 1 deletion engine/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (l *Launcher) startContainers(requests map[string]*request) {
if err != nil {
logrus.
WithError(err).
Debugf("Launch of %d containers failed. Duration=%.2fs", len(requests), time.Since(t).Seconds())
Errorf("Launch of %d containers failed. Duration=%.2fs", len(requests), time.Since(t).Seconds())

for _, req := range requests {
req.chErr <- err
Expand Down
31 changes: 31 additions & 0 deletions engine/podwatcher/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package podwatcher
import (
"context"
"fmt"
"time"
)

type ContainerWatcher interface {
Expand Down Expand Up @@ -40,6 +41,36 @@ type containerInfo struct {
placeholder string
image string
exitCode int32

// failAt is used by PodWatcher to recover from invalid Kubernetes events.
_failAt time.Time
}

func (info *containerInfo) diff(old *containerInfo) (m map[string]interface{}) {
if old == nil {
return
}

m = make(map[string]interface{})

if info.state != old.state {
m["state"] = old.state.String() + "->" + info.state.String()
}
if info.stateInfo != old.stateInfo {
if old.stateInfo == "" {
m["stateInfo"] = info.stateInfo
} else {
m["stateInfo"] = old.stateInfo + "->" + info.stateInfo
}
}
if info.image != old.image {
m["image"] = old.image + "->" + info.image
}
if info.exitCode != old.exitCode {
m["exitCode"] = info.exitCode
}

return
}

type containerState int
Expand Down
15 changes: 0 additions & 15 deletions engine/podwatcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,6 @@ func (e FailedContainerError) Error() string {
e.container, e.exitCode, e.reason)
}

// AbortedContainerError is an error returned when a container, that was earlier started successfully,
// suddenly reverted image back to the placeholder image and terminated.
type AbortedContainerError struct {
container string
state containerState
exitCode int32
reason string
}

func (e AbortedContainerError) Error() string {
return fmt.Sprintf(
"kubernetes has failed: container failed to start and reverted back to placeholder image: id=%s state=%s exitCode=%d reason=%s",
e.container, e.state, e.exitCode, e.reason)
}

// StartTimeoutContainerError is returned as an error when a container fails to run after some predefined time.
type StartTimeoutContainerError struct {
Container string
Expand Down
144 changes: 113 additions & 31 deletions engine/podwatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,52 +155,135 @@ func (pw *PodWatcher) updateContainers(containers []containerInfo) {
continue // unknown container
}

if c.image == cs.image && c.state == cs.state && c.stateInfo == cs.stateInfo {
// If we already declared a container as finished, just notify all
// that the container is terminated (with or without an error)
if c.state == stateTerminated {
var err error

if cs.state != stateTerminated {
// Should not happen... a container that was marked as terminated is now running again...
diff := cs.diff(c)
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithFields(diff).
Trace("PodWatcher: Container zombie found...")

err = FailedContainerError{
container: c.id,
exitCode: c.exitCode,
reason: c.stateInfo,
}
} else if c.image == c.placeholder {
err = FailedContainerError{
container: c.id,
exitCode: c.exitCode,
reason: c.stateInfo,
}
}

pw.notifyClientsContainerChange(c, err)
continue
}

if cs.image == c.placeholder && c.image != c.placeholder {
err := AbortedContainerError{
container: c.id,
state: cs.state,
exitCode: cs.exitCode,
reason: cs.stateInfo,
if cs.state == stateTerminated {
diff := cs.diff(c)

// Sometimes, kubernetes sends an event about a terminated container with:
// Terminated.ExitCode=2 and Terminated.Reason="Error".
// Often container image will revert back to the placeholder image.
// In these cases we give Kubernetes some time to send the correct event,
// but if it fails, we do declare the container as terminated.
// Note: For this logic to work, the periodic container status check must work.
if cs.exitCode == 2 && cs.stateInfo == "Error" {
if c._failAt.IsZero() {
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithFields(diff).
Trace("PodWatcher: Container failed. Trying recovery...")
c._failAt = time.Now()
} else if time.Since(c._failAt) < 15*time.Second {
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithFields(diff).
Trace("PodWatcher: Container failed. Waiting to recover...")
} else {
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithFields(diff).
Warn("PodWatcher: Container failed.")

c.state = stateTerminated
c.stateInfo = cs.stateInfo
c.image = cs.image
c.exitCode = cs.exitCode

err := FailedContainerError{
container: c.id,
exitCode: c.exitCode,
reason: c.stateInfo,
}

pw.notifyClientsContainerChange(c, err)
}

continue
}

c.state = stateTerminated
c.stateInfo = cs.stateInfo
c.image = cs.image
c.exitCode = cs.exitCode
c._failAt = time.Time{}

var err error

if c.image == c.placeholder {
err = FailedContainerError{
container: c.id,
exitCode: c.exitCode,
reason: c.stateInfo,
}
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithFields(diff).
Warn("PodWatcher: Container failed.")
} else {
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithFields(diff).
Debug("PodWatcher: Container terminated.")
}

pw.notifyClientsContainerChange(c, err)

continue
}

c.image = cs.image
if c.image == cs.image && c.state == cs.state && c.stateInfo == cs.stateInfo {
continue // container unchanged
}

diff := cs.diff(c)

c.state = cs.state
c.image = cs.image
c.stateInfo = cs.stateInfo
c.exitCode = cs.exitCode
c._failAt = time.Time{}

if c.image == c.placeholder {
if c.state == stateTerminated {
err := FailedContainerError{
container: c.id,
exitCode: c.exitCode,
reason: c.stateInfo,
}
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithFields(diff).
Debug("PodWatcher: Container state changed")

pw.notifyClientsContainerChange(c, err)
}
} else {
logrus.
WithField("pod", pw.podName).
WithField("container", c.id).
WithField("image", c.image).
WithField("state", c.state).
WithField("stateInfo", c.stateInfo).
Debug("PodWatcher: Container state changed")

pw.notifyClientsContainerChange(c, nil)
}
pw.notifyClientsContainerChange(c, nil)
}
}

Expand Down Expand Up @@ -241,8 +324,7 @@ func _tryResolveWaitClient(cl *waitClient, c *containerInfo, err error) bool {
func (pw *PodWatcher) notifyClientsContainerChange(c *containerInfo, err error) {
if err != nil {
_, isFailed := err.(FailedContainerError)
_, isAborted := err.(AbortedContainerError)
if isKubeError := isFailed || isAborted; isKubeError {
if isKubeError := isFailed; isKubeError {
for _, cl := range pw.clientList {
if cl.containerId == c.id {
cl.resolveCh <- err
Expand Down Expand Up @@ -291,7 +373,7 @@ func (pw *PodWatcher) waitForEvent(containerId string, state containerState) (er
WithField("pod", pw.podName).
WithField("container", containerId).
WithField("state", state.String()).
Trace("PodWatcher: Waiting...")
Debug("PodWatcher: Waiting...")

defer func(t time.Time) {
logrus.
Expand Down

0 comments on commit 25a6660

Please sign in to comment.