Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/shell-operator/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/deckhouse/deckhouse/pkg/log"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -42,6 +44,20 @@ func start(logger *log.Logger, cfg *app.Config) func(cmd *cobra.Command, args []

operator.Start()

// Listen for SIGUSR1 (soft reinit) and SIGUSR2 (full reset) in the background.
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGUSR1, syscall.SIGUSR2)
for sig := range ch {
switch sig {
case syscall.SIGUSR1:
operator.Reinit(ctx, false)
case syscall.SIGUSR2:
operator.Reinit(ctx, true)
}
}
}()

// Block until OS signal.
utils_signal.WaitForProcessInterruption(func() {
operator.Shutdown()
Expand Down
87 changes: 87 additions & 0 deletions pkg/shell-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"log/slog"
"os"
"time"

"github.com/deckhouse/deckhouse/pkg/log"
Expand Down Expand Up @@ -825,6 +826,92 @@ func (op *ShellOperator) initAndStartHookQueues() {
}
}

// Reinit reloads all hooks without stopping the operator.
//
// It disables existing kubernetes and schedule bindings, re-runs hook discovery
// and --config, then re-queues binding-enable tasks into the already-running main
// queue. In-flight hook executions are not interrupted — they finish naturally.
//
// If fullReset is true, onStartup hooks are also re-queued (SIGUSR2 behaviour).
// If fullReset is false, only bindings are re-enabled (SIGUSR1 behaviour).
//
// On any error the process exits with code 1, matching startup failure behaviour.
func (op *ShellOperator) Reinit(ctx context.Context, fullReset bool) {
op.logger.Info("reinit begin", slog.Bool("fullReset", fullReset))

// Disable all existing bindings before rebuilding the hook index.
for _, hookName := range op.HookManager.GetHookNames() {
h := op.HookManager.GetHook(hookName)
if h.HookController != nil {
h.HookController.DisableKubernetesBindings()
h.HookController.DisableScheduleBindings()
}
}

if err := op.initHookManager(); err != nil {
op.logger.Error("reinit: hook manager init failed, exiting", log.Err(err))
os.Exit(1)
}

// Ensure queues exist for any new hooks introduced by the reload.
op.initAndStartHookQueues()

mainQueue := op.TaskQueues.GetMain()
logEntry := op.logger.With(pkg.LogKeyOperatorComponent, "reinit")

if fullReset {
onStartupHooks, err := op.HookManager.GetHooksInOrder(types.OnStartup)
if err != nil {
op.logger.Error("reinit: get onStartup hooks failed, exiting", log.Err(err))
os.Exit(1)
}
for _, hookName := range onStartupHooks {
bc := bindingcontext.BindingContext{Binding: string(types.OnStartup)}
bc.Metadata.BindingType = types.OnStartup
newTask := task.NewTask(task_metadata.HookRun).
WithMetadata(task_metadata.HookMetadata{
HookName: hookName,
BindingType: types.OnStartup,
BindingContext: []bindingcontext.BindingContext{bc},
}).
WithCompactionID(hookName).
WithQueuedAt(time.Now())
mainQueue.AddLast(newTask)
logEntry.Info("queue onStartup task", slog.String(pkg.LogKeyHook, hookName))
}
}

for _, hookName := range op.HookManager.GetHookNames() {
h := op.HookManager.GetHook(hookName)

if h.GetConfig().HasBinding(types.OnKubernetesEvent) {
newTask := task.NewTask(task_metadata.EnableKubernetesBindings).
WithMetadata(task_metadata.HookMetadata{
HookName: hookName,
Binding: string(task_metadata.EnableKubernetesBindings),
}).
WithCompactionID(hookName).
WithQueuedAt(time.Now())
mainQueue.AddLast(newTask)
logEntry.Info("queue EnableKubernetesBindings task", slog.String(pkg.LogKeyHook, hookName))
}

if h.GetConfig().HasBinding(types.Schedule) {
newTask := task.NewTask(task_metadata.EnableScheduleBindings).
WithMetadata(task_metadata.HookMetadata{
HookName: hookName,
Binding: string(task_metadata.EnableScheduleBindings),
}).
WithCompactionID(hookName).
WithQueuedAt(time.Now())
mainQueue.AddLast(newTask)
logEntry.Info("queue EnableScheduleBindings task", slog.String(pkg.LogKeyHook, hookName))
}
}

op.logger.Info("reinit complete", slog.Bool("fullReset", fullReset))
}

// Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop.
func (op *ShellOperator) Shutdown() {
op.logger.Info("shutdown begin", slog.String(pkg.LogKeyPhase, "shutdown"))
Expand Down
88 changes: 88 additions & 0 deletions pkg/shell-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"github.com/deckhouse/deckhouse/pkg/log"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flant/shell-operator/pkg/app"
"github.com/flant/shell-operator/pkg/hook/controller"
. "github.com/flant/shell-operator/pkg/hook/task_metadata"
htypes "github.com/flant/shell-operator/pkg/hook/types"
"github.com/flant/shell-operator/pkg/metric"
Expand Down Expand Up @@ -82,3 +84,89 @@ func Test_Operator_startup_tasks(t *testing.T) {
i++
})
}

// newTestOperator builds a minimal ShellOperator from the startup_tasks testdata hooks,
// ready for Reinit testing. The main queue is bootstrapped but not started.
func newTestOperator(t *testing.T) *ShellOperator {
t.Helper()
hooksDir, err := utils.RequireExistingDirectory("testdata/startup_tasks/hooks")
require.NoError(t, err)

op := NewShellOperator(context.Background(), nil, nil, WithLogger(log.NewNop()))
op.SetupEventManagers()
op.setupHookManagers(app.NewConfig(), hooksDir, t.TempDir())
require.NoError(t, op.initHookManager())
op.bootstrapMainQueue(op.TaskQueues)
return op
}

// collectTasks returns all tasks currently in the main queue as a slice.
func collectTasks(op *ShellOperator) []task.Task {
var tasks []task.Task
op.TaskQueues.GetMain().IterateSnapshot(func(tsk task.Task) {
tasks = append(tasks, tsk)
})
return tasks
}

func Test_Reinit_softReinit(t *testing.T) {
op := newTestOperator(t)

initialCount := len(collectTasks(op))

op.Reinit(context.Background(), false)

all := collectTasks(op)
reinitTasks := all[initialCount:]

for _, tsk := range reinitTasks {
hm, ok := HookMetadataAccessor(tsk)
require.True(t, ok)
assert.NotEqual(t, htypes.OnStartup, hm.BindingType, "soft reinit must not enqueue onStartup tasks")
}

// Binding-enable tasks must be present for hooks that have those bindings.
types := make(map[task.TaskType]bool)
for _, tsk := range reinitTasks {
types[tsk.GetType()] = true
}
assert.True(t, types[EnableKubernetesBindings], "EnableKubernetesBindings tasks must be enqueued")
assert.True(t, types[EnableScheduleBindings], "EnableScheduleBindings tasks must be enqueued")
}

func Test_Reinit_fullReset(t *testing.T) {
op := newTestOperator(t)

// Capture old controller pointers before reinit to verify they are replaced.
oldControllers := map[string]*controller.HookController{}
for _, name := range op.HookManager.GetHookNames() {
oldControllers[name] = op.HookManager.GetHook(name).HookController
}

initialCount := len(collectTasks(op))

op.Reinit(context.Background(), true)

all := collectTasks(op)
reinitTasks := all[initialCount:]

taskTypes := make(map[task.TaskType]bool)
hasOnStartup := false
for _, tsk := range reinitTasks {
taskTypes[tsk.GetType()] = true
hm, ok := HookMetadataAccessor(tsk)
require.True(t, ok)
if hm.BindingType == htypes.OnStartup {
hasOnStartup = true
}
}
assert.True(t, hasOnStartup, "full reset must re-enqueue onStartup tasks")
assert.True(t, taskTypes[EnableKubernetesBindings], "EnableKubernetesBindings tasks must be enqueued")
assert.True(t, taskTypes[EnableScheduleBindings], "EnableScheduleBindings tasks must be enqueued")

// Verify that old hook controllers were replaced, proving the disable+rebuild cycle ran.
for _, name := range op.HookManager.GetHookNames() {
newCtrl := op.HookManager.GetHook(name).HookController
assert.NotSame(t, oldControllers[name], newCtrl, "hook %s: controller must be a new instance after full reset", name)
}
}