diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index bb2a80020..e5d97cef3 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -62,6 +62,7 @@ type SolutionManager struct { KeyLockProvider keylock.IKeyLockProvider IsTarget bool TargetNames []string + TargetNamespace string ApiClientHttp api_utils.ApiClient } @@ -122,6 +123,11 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. s.TargetNames = strings.Split(targetNames, ",") + s.TargetNamespace = "default" + if v, ok := config.Properties["targetNamespace"]; ok && strings.TrimSpace(v) != "" { + s.TargetNamespace = v + } + if s.IsTarget { if len(s.TargetNames) == 0 { return errors.New("target mode is set but target name is not set") @@ -134,7 +140,7 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. return err } } - s.ApiClientHttp, err = api_utils.GetParentApiClient(s.Context.SiteInfo.ParentSite.BaseUrl) + s.ApiClientHttp, err = api_utils.GetParentApiClient(s.Context.SiteInfo.CurrentSite.BaseUrl) if err != nil { return err } @@ -682,15 +688,20 @@ func (s *SolutionManager) Enabled() bool { return s.Config.Properties["poll.enabled"] == "true" } func (s *SolutionManager) Poll() []error { - if s.Config.Properties["poll.enabled"] == "true" && s.Context.SiteInfo.ParentSite.BaseUrl != "" && s.IsTarget { + log.InfofCtx(context.Background(), " M (Solution): Poll() called, pollEnabled=%s, currentUrl=%s, isTarget=%v, targetNames=%v", + s.Config.Properties["poll.enabled"], s.Context.SiteInfo.CurrentSite.BaseUrl, s.IsTarget, s.TargetNames) + if s.Config.Properties["poll.enabled"] == "true" && s.Context.SiteInfo.CurrentSite.BaseUrl != "" && s.IsTarget { + log.InfofCtx(context.Background(), " M (Solution): conditions met, starting to process %d targets", len(s.TargetNames)) for _, target := range s.TargetNames { + namespace := s.TargetNamespace catalogs, err := s.ApiClientHttp.GetCatalogsWithFilter(context.Background(), "", "label", "staged_target="+target, - s.Context.SiteInfo.ParentSite.Username, - s.Context.SiteInfo.ParentSite.Password) + s.Context.SiteInfo.CurrentSite.Username, + s.Context.SiteInfo.CurrentSite.Password) if err != nil { return []error{err} } for _, c := range catalogs { + namespace = c.ObjectMeta.Namespace if vs, ok := c.Spec.Properties["deployment"]; ok { deployment := model.DeploymentSpec{} jData, _ := json.Marshal(vs) @@ -719,13 +730,28 @@ func (s *SolutionManager) Poll() []error { err = s.ApiClientHttp.ReportCatalogs(context.Background(), deployment.Instance.ObjectMeta.Name+"-"+target, components, - s.Context.SiteInfo.ParentSite.Username, - s.Context.SiteInfo.ParentSite.Password) + s.Context.SiteInfo.CurrentSite.Username, + s.Context.SiteInfo.CurrentSite.Password) if err != nil { return []error{err} } } } + // Report target status on every poll cycle regardless of whether catalogs are staged + targetReportErr := s.ApiClientHttp.ReportTargetStatus(context.Background(), + target, + namespace, + map[string]string{ + "agent.status": "online", + "agent.lastReport": time.Now().UTC().Format(time.RFC3339), + }, + s.Context.SiteInfo.CurrentSite.Username, + s.Context.SiteInfo.CurrentSite.Password) + if targetReportErr != nil { + log.WarnfCtx(context.Background(), " M (Solution): failed to report target status for target %s: %v", target, targetReportErr) + } else { + log.InfofCtx(context.Background(), " M (Solution): successfully reported target status for target %s in namespace %s", target, namespace) + } } } return nil diff --git a/api/pkg/apis/v1alpha1/utils/apiclient.go b/api/pkg/apis/v1alpha1/utils/apiclient.go index 676756e89..89e3e2a20 100644 --- a/api/pkg/apis/v1alpha1/utils/apiclient.go +++ b/api/pkg/apis/v1alpha1/utils/apiclient.go @@ -38,6 +38,7 @@ type ( tokenProvider TokenProvider client *http.Client caCertPath string + useSATokens bool } ApiClientOption func(*apiClient) @@ -87,6 +88,7 @@ type ( SyncStageStatus(ctx context.Context, status model.StageStatus, user string, password string) error SendVisualizationPacket(ctx context.Context, payload []byte, user string, password string) error ReportCatalogs(ctx context.Context, instance string, components []model.ComponentSpec, user string, password string) error + ReportTargetStatus(ctx context.Context, target string, namespace string, properties map[string]string, user string, password string) error CreateSolutionContainer(ctx context.Context, instanceContainer string, payload []byte, namespace string, user string, password string) error DeleteSolutionContainer(ctx context.Context, instanceContainer string, namespace string, user string, password string) error GetSolutionContainer(ctx context.Context, instanceContainer string, namespace string, user string, password string) (model.SolutionContainerState, error) @@ -140,6 +142,7 @@ func noTokenProvider(ctx context.Context, baseUrl string, client *http.Client, u func WithUserPassword(ctx context.Context) ApiClientOption { return func(a *apiClient) { + a.useSATokens = false a.tokenProvider = func(ctx context.Context, baseUrl string, _ *http.Client, user string, password string) (string, error) { request := AuthRequest{UserName: user, Password: password} requestData, _ := json.Marshal(request) @@ -161,6 +164,7 @@ func WithUserPassword(ctx context.Context) ApiClientOption { func WithServiceAccountToken() ApiClientOption { return func(a *apiClient) { + a.useSATokens = true a.tokenProvider = func(ctx context.Context, _ string, _ *http.Client, _ string, _ string) (string, error) { path := os.Getenv(constants.SATokenPathName) if path == "" { @@ -198,6 +202,7 @@ func NewApiClient(ctx context.Context, baseUrl string, opts ...ApiClientOption) baseUrl: baseUrl, tokenProvider: noTokenProvider, client: client, + useSATokens: false, } for _, opt := range opts { @@ -738,6 +743,28 @@ func (a *apiClient) ReportCatalogs(ctx context.Context, instance string, compone return nil } +func (a *apiClient) ReportTargetStatus(ctx context.Context, target string, namespace string, properties map[string]string, user string, password string) error { + token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password) + if err != nil { + return err + } + if properties == nil { + properties = map[string]string{} + } + path := "targets/status/" + url.QueryEscape(target) + "?namespace=" + url.QueryEscape(namespace) + payload := map[string]interface{}{ + "status": map[string]interface{}{ + "properties": properties, + }, + } + jData, _ := json.Marshal(payload) + _, err = a.callRestAPI(ctx, path, "PUT", jData, token) + if err != nil { + return err + } + return nil +} + func (a *apiClient) UpsertSolution(ctx context.Context, solution string, payload []byte, namespace string, user string, password string) error { token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password) if err != nil { @@ -834,7 +861,9 @@ func (a *apiClient) SendVisualizationPacket(ctx context.Context, payload []byte, } func (a *apiClient) callRestAPI(ctx context.Context, route string, method string, payload []byte, token string) ([]byte, error) { - urlString := fmt.Sprintf("%s%s", a.baseUrl, path.Clean(route)) + baseURL := strings.TrimRight(a.baseUrl, "/") + cleanRoute := strings.TrimLeft(path.Clean(route), "/") + urlString := fmt.Sprintf("%s/%s", baseURL, cleanRoute) ctx, span := observability.StartSpan("Symphony-API-Client", ctx, &map[string]string{ "method": "callRestAPI", "http.method": method, @@ -896,8 +925,8 @@ func (a *apiClient) callRestAPI(ctx context.Context, route string, method string if resp.StatusCode >= 300 { if resp.StatusCode == http.StatusForbidden { // 403 is a retriable error, so we return a COAError with the same status code - // This should only happen at k8s token provider so can skip the username and password - if ShouldUseSATokens() { + // Refresh service-account token only when this client was configured to use SA tokens. + if a.useSATokens { token, err := a.tokenProvider(ctx, a.baseUrl, a.client, "", "") if err != nil { return err diff --git a/api/pkg/apis/v1alpha1/utils/symphony-api.go b/api/pkg/apis/v1alpha1/utils/symphony-api.go index bb8dd1b13..fb7ae6584 100644 --- a/api/pkg/apis/v1alpha1/utils/symphony-api.go +++ b/api/pkg/apis/v1alpha1/utils/symphony-api.go @@ -15,6 +15,7 @@ import ( "net/http" "net/url" "os" + "strconv" "strings" "sync" @@ -29,7 +30,6 @@ import ( var ( SymphonyAPIAddressBase = "http://symphony-service:8080/v1alpha2/" - useSAToken = os.Getenv(constants.UseServiceAccountTokenEnvName) apiCertPath = os.Getenv(constants.ApiCertEnvName) ) @@ -70,8 +70,10 @@ func getApiClient() (*apiClient, error) { } if ShouldUseSATokens() { + log.Infof("Configuring API client with service account token provider") clientOptions = append(clientOptions, WithServiceAccountToken()) } else { + log.Infof("Configuring API client with user/password token provider") clientOptions = append(clientOptions, WithUserPassword(context.TODO())) } @@ -89,6 +91,7 @@ func GetParentApiClient(baseUrl string) (*apiClient, error) { clientOptions = append(clientOptions, WithCertAuth(caCert)) } + log.Infof("Configuring parent API client with user/password token provider for baseUrl: %s", baseUrl) clientOptions = append(clientOptions, WithUserPassword(context.TODO())) client, err := NewApiClient(context.Background(), baseUrl, clientOptions...) if err != nil { @@ -98,11 +101,26 @@ func GetParentApiClient(baseUrl string) (*apiClient, error) { } func ShouldUseSATokens() bool { - return !ShouldUseUserCreds() + raw, ok := os.LookupEnv(constants.UseServiceAccountTokenEnvName) + if !ok { + return true + } + + raw = strings.TrimSpace(raw) + if raw == "" { + return true + } + + v, err := strconv.ParseBool(strings.ToLower(raw)) + if err != nil { + log.Warnf("Invalid value for %s: %q; defaulting to service account token auth", constants.UseServiceAccountTokenEnvName, raw) + return true + } + return v } func ShouldUseUserCreds() bool { - return strings.ToLower(os.Getenv(constants.UseServiceAccountTokenEnvName)) == "false" + return !ShouldUseSATokens() } var log = logger.NewLogger("coa.runtime") diff --git a/api/symphony-agent.json b/api/symphony-agent.json index fa6864d85..d277a555f 100644 --- a/api/symphony-agent.json +++ b/api/symphony-agent.json @@ -1,4 +1,5 @@ { + "shutdownGracePeriod": "30s", "siteInfo": { "siteId": "hq", "currentSite": { diff --git a/cli/cmd/agent.go b/cli/cmd/agent.go index c58918f03..66db04241 100644 --- a/cli/cmd/agent.go +++ b/cli/cmd/agent.go @@ -13,6 +13,8 @@ import ( "os" "os/user" "path/filepath" + "runtime" + "strings" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model" "github.com/eclipse-symphony/symphony/cli/config" @@ -25,7 +27,6 @@ var ( mqttRequestTopic string mqttResponseTopic string mqttClientId string - asDaemon bool targetName string targetNamespace string upsertTarget bool @@ -34,49 +35,150 @@ var ( var AgentCmd = &cobra.Command{ Use: "agent", - Short: "Install Symphony agent on the machine", + Short: "Manage Symphony agent process and service", Run: func(cmd *cobra.Command, args []string) { - c := config.GetMaestroConfig(configFile) - ctx := c.DefaultContext - if configContext != "" { - ctx = configContext + runAgentWorkflow("run") + }, +} + +var AgentRunCmd = &cobra.Command{ + Use: "run", + Short: "Run Symphony agent in foreground", + Run: func(cmd *cobra.Command, args []string) { + runAgentWorkflow("run") + }, +} + +var AgentInstallCmd = &cobra.Command{ + Use: "install", + Short: "Install Symphony agent as a service", + Run: func(cmd *cobra.Command, args []string) { + runAgentWorkflow("install") + }, +} + +var AgentUninstallCmd = &cobra.Command{ + Use: "uninstall", + Short: "Uninstall Symphony agent service", + Run: func(cmd *cobra.Command, args []string) { + if strings.TrimSpace(targetName) == "" { + fmt.Printf("\n%s --target is required%s\n\n", utils.ColorRed(), utils.ColorReset()) + return + } + err := uninstallAgentService(targetName) + if err != nil { + fmt.Printf("\n%s Failed to uninstall agent service: %s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) + return } + fmt.Printf("\n%s Agent service removed for target: %s%s\n\n", utils.ColorGreen(), targetName, utils.ColorReset()) + }, +} + +func runAgentWorkflow(mode string) { + ctx, err := resolveCurrentContext() + if err != nil { + fmt.Printf("\n%s%s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) + return + } + + err = resolveMqttSettingsFromContext(ctx) + if err != nil { + fmt.Printf("\n%s%s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) + return + } - if ctx == "" { - ctx = "default" + agentFile, err := updateAgentConfig(ctx) + if err != nil { + fmt.Printf("\n%s%s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) + return + } + if configOnly { + fmt.Printf("\n%s Agent configuration file updated: %s%s\n\n", utils.ColorGreen(), agentFile, utils.ColorReset()) + return + } + + if upsertTarget { + err = createTarget(ctx) + if err != nil { + fmt.Printf("\n%s%s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) + return } + } + switch mode { + case "run": u, err := user.Current() if err != nil { fmt.Printf("\n%s Failed: %s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) return } - agentFile, err := updateAgentConfig() + _, err = utils.RunCommandNoCapture("Launching Symphony agent", "done", filepath.Join(u.HomeDir, ".symphony/symphony-api"), "-c", agentFile, "-l", "Debug") if err != nil { - fmt.Printf("\n%s%s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) - return + fmt.Printf("\n%s Failed: %s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) } - if configOnly { - fmt.Printf("\n%s Agent configuration file updated: %s%s\n\n", utils.ColorGreen(), agentFile, utils.ColorReset()) + case "install": + err = installAgentService(targetName, agentFile) + if err != nil { + fmt.Printf("\n%s Failed to install agent service: %s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) return } - if upsertTarget { - err = createTarget(c.Contexts[ctx]) - if err != nil { - fmt.Printf("\n%s%s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) - return - } - } - if !asDaemon { - _, err := utils.RunCommandNoCapture("Launching Symphony in standalone mode", "done", filepath.Join(u.HomeDir, ".symphony/symphony-api"), "-c", filepath.Join(u.HomeDir, ".symphony/symphony-agent-"+targetName+".json"), "-l", "Debug") - if err != nil { - fmt.Printf("\n%s Failed: %s%s\n\n", utils.ColorRed(), err.Error(), utils.ColorReset()) - return - } + fmt.Printf("\n%s Agent service installed for target: %s%s\n\n", utils.ColorGreen(), targetName, utils.ColorReset()) + default: + fmt.Printf("\n%s Unsupported agent mode: %s%s\n\n", utils.ColorRed(), mode, utils.ColorReset()) + } +} + +func resolveCurrentContext() (config.MaestroContext, error) { + c := config.GetMaestroConfig(configFile) + ctxName := c.DefaultContext + if configContext != "" { + ctxName = configContext + } + if ctxName == "" { + ctxName = "default" + } + + ctx, ok := c.Contexts[ctxName] + if !ok { + return config.MaestroContext{}, fmt.Errorf("context %q not found in Maestro config", ctxName) + } + return ctx, nil +} + +func resolveMqttSettingsFromContext(ctx config.MaestroContext) error { + if strings.TrimSpace(targetName) == "" { + return fmt.Errorf("--target is required") + } + + if mqttBrokerAddress == "" { + mqttBrokerAddress = ctx.Mqtt.BrokerAddress + } + if mqttRequestTopic == "" { + if ctx.Mqtt.RequestTopic != "" { + mqttRequestTopic = ctx.Mqtt.RequestTopic } else { - fmt.Printf("\n%s Running as daemon is not currently supported%s\n\n", utils.ColorRed(), utils.ColorReset()) + mqttRequestTopic = "coa-request" } - }, + } + if mqttResponseTopic == "" { + if ctx.Mqtt.ResponseTopic != "" { + mqttResponseTopic = ctx.Mqtt.ResponseTopic + } else { + mqttResponseTopic = "coa-response" + } + } + if mqttClientId == "" { + mqttClientId = ctx.Mqtt.ClientID + } + if mqttClientId == "" { + mqttClientId = "symphony-agent-" + targetName + } + + if mqttBrokerAddress == "" { + return fmt.Errorf("MQTT broker is not configured. Use --mqtt-broker or run maestro up --with-mqtt-broker to persist broker info in context") + } + + return nil } func createTarget(ctx config.MaestroContext) error { @@ -108,7 +210,7 @@ func createTarget(ctx config.MaestroContext) error { return utils.Upsert(ctx.Url, ctx.User, ctx.Secret, "target", targetName, targetData) } -func updateAgentConfig() (string, error) { +func updateAgentConfig(ctx config.MaestroContext) (string, error) { dirname, err := os.UserHomeDir() if err != nil { return "", fmt.Errorf("failed to get user home directory: %s", err.Error()) @@ -128,8 +230,14 @@ func updateAgentConfig() (string, error) { if err != nil { return "", err } + ensureAgentConfigDefaults(&agentConfig) + + agentConfig.SiteInfo.CurrentSite.BaseURL = ctx.Url + agentConfig.SiteInfo.CurrentSite.Username = ctx.User + agentConfig.SiteInfo.CurrentSite.Password = ctx.Secret agentConfig.API.Vendors[1].Managers[0].Properties.TargetNames = targetName + agentConfig.API.Vendors[1].Managers[0].Properties.TargetNamespace = targetNamespace agentConfig.API.Vendors[1].Managers[0].Providers[targetName] = config.TargetProviderConfig{ Type: "providers.target.mock", Config: map[string]interface{}{}, @@ -153,21 +261,126 @@ func updateAgentConfig() (string, error) { return agentConfigPath, nil } +func ensureAgentConfigDefaults(agentConfig *config.SymphonyAgentConfig) { + if strings.TrimSpace(agentConfig.ShutdownGracePeriod) == "" { + agentConfig.ShutdownGracePeriod = "30s" + } + if len(agentConfig.API.Pubsub) == 0 { + agentConfig.API.Pubsub = map[string]interface{}{ + "shared": true, + "provider": map[string]interface{}{ + "type": "providers.pubsub.memory", + "config": map[string]interface{}{}, + }, + } + } + if len(agentConfig.API.Keylock) == 0 { + agentConfig.API.Keylock = map[string]interface{}{ + "shared": true, + "provider": map[string]interface{}{ + "type": "providers.keylock.memory", + "config": map[string]interface{}{ + "mode": "Global", + "cleanInterval": 30, + "purgeDuration": 43200, + }, + }, + } + } +} + +func installAgentService(target string, agentConfigPath string) error { + u, err := user.Current() + if err != nil { + return err + } + + binaryPath := filepath.Join(u.HomeDir, ".symphony", "symphony-api") + serviceName := "symphony-agent-" + target + + switch runtime.GOOS { + case "linux": + serviceDir := filepath.Join(u.HomeDir, ".config", "systemd", "user") + if err = os.MkdirAll(serviceDir, 0755); err != nil { + return err + } + serviceFile := filepath.Join(serviceDir, serviceName+".service") + serviceContent := fmt.Sprintf(`[Unit] +Description=Symphony Agent (%s) +After=network.target + +[Service] +Type=simple +ExecStart=%s -c %s -l Debug +Restart=always +RestartSec=5 + +[Install] +WantedBy=default.target +`, target, binaryPath, agentConfigPath) + if err = os.WriteFile(serviceFile, []byte(serviceContent), 0644); err != nil { + return err + } + + if _, _, err = utils.RunCommand("Reloading user systemd", "done", verbose, "systemctl", "--user", "daemon-reload"); err != nil { + return err + } + if _, _, err = utils.RunCommand("Enabling agent service", "done", verbose, "systemctl", "--user", "enable", "--now", serviceName); err != nil { + return err + } + return nil + case "windows": + binPath := fmt.Sprintf("\"%s\" -c \"%s\" -l Debug", binaryPath, agentConfigPath) + if _, _, err = utils.RunCommand("Creating agent Windows service", "done", verbose, "sc.exe", "create", serviceName, "binPath=", binPath, "start=", "auto"); err != nil { + return err + } + if _, _, err = utils.RunCommand("Starting agent Windows service", "done", verbose, "sc.exe", "start", serviceName); err != nil { + return err + } + return nil + default: + return fmt.Errorf("unsupported OS for install: %s", runtime.GOOS) + } +} + +func uninstallAgentService(target string) error { + u, err := user.Current() + if err != nil { + return err + } + + serviceName := "symphony-agent-" + target + + switch runtime.GOOS { + case "linux": + _, _, _ = utils.RunCommand("Stopping agent service", "done", verbose, "systemctl", "--user", "disable", "--now", serviceName) + serviceFile := filepath.Join(u.HomeDir, ".config", "systemd", "user", serviceName+".service") + _ = os.Remove(serviceFile) + _, _, _ = utils.RunCommand("Reloading user systemd", "done", verbose, "systemctl", "--user", "daemon-reload") + return nil + case "windows": + _, _, _ = utils.RunCommand("Stopping agent Windows service", "done", verbose, "sc.exe", "stop", serviceName) + _, _, err = utils.RunCommand("Deleting agent Windows service", "done", verbose, "sc.exe", "delete", serviceName) + return err + default: + return fmt.Errorf("unsupported OS for uninstall: %s", runtime.GOOS) + } +} + func init() { - AgentCmd.Flags().StringVarP(&targetNamespace, "namespace", "n", "default", "Target namespace") - AgentCmd.Flags().StringVarP(&targetName, "target", "t", "", "Target name") - AgentCmd.Flags().StringVarP(&mqttBrokerAddress, "mqtt-broker", "m", "", "MQTT broker address") - AgentCmd.Flags().StringVarP(&mqttRequestTopic, "request-topic", "i", "coa-request", "MQTT request topic") - AgentCmd.Flags().StringVarP(&mqttResponseTopic, "response-topic", "o", "coa-response", "MQTT response topic") - AgentCmd.Flags().StringVarP(&mqttClientId, "mqtt-client-id", "e", "", "MQTT client id") - AgentCmd.Flags().BoolVar(&asDaemon, "daemon", false, "Run agent as daemon") - AgentCmd.Flags().BoolVar(&upsertTarget, "upsert-target", false, "Upsert target") - AgentCmd.Flags().BoolVar(&configOnly, "config-only", false, "Only update the agent configuration file") - AgentCmd.Flags().StringVarP(&configFile, "config", "c", "", "Maestro CLI config file") - AgentCmd.Flags().StringVarP(&configContext, "context", "", "", "Maestro CLI configuration context") - - AgentCmd.MarkFlagRequired("target") - AgentCmd.MarkFlagRequired("mqtt-broker") - AgentCmd.MarkFlagRequired("mqtt-client-id") + AgentCmd.PersistentFlags().StringVarP(&targetNamespace, "namespace", "n", "default", "Target namespace") + AgentCmd.PersistentFlags().StringVarP(&targetName, "target", "t", "", "Target name") + AgentCmd.PersistentFlags().StringVarP(&mqttBrokerAddress, "mqtt-broker", "m", "", "MQTT broker address (overrides context)") + AgentCmd.PersistentFlags().StringVarP(&mqttRequestTopic, "request-topic", "i", "", "MQTT request topic (default: context or coa-request)") + AgentCmd.PersistentFlags().StringVarP(&mqttResponseTopic, "response-topic", "o", "", "MQTT response topic (default: context or coa-response)") + AgentCmd.PersistentFlags().StringVarP(&mqttClientId, "mqtt-client-id", "e", "", "MQTT client id (default: context or symphony-agent-)") + AgentCmd.PersistentFlags().BoolVar(&upsertTarget, "upsert-target", false, "Upsert target") + AgentCmd.PersistentFlags().BoolVar(&configOnly, "config-only", false, "Only update the agent configuration file") + AgentCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "", "Maestro CLI config file") + AgentCmd.PersistentFlags().StringVarP(&configContext, "context", "", "", "Maestro CLI configuration context") + + AgentCmd.AddCommand(AgentRunCmd) + AgentCmd.AddCommand(AgentInstallCmd) + AgentCmd.AddCommand(AgentUninstallCmd) RootCmd.AddCommand(AgentCmd) } diff --git a/cli/cmd/up.go b/cli/cmd/up.go index e5193bd7b..5b60e60df 100644 --- a/cli/cmd/up.go +++ b/cli/cmd/up.go @@ -32,6 +32,8 @@ var ( //useWizard bool noK8s bool noRestApi bool + withMqttBroker bool + helmChartPath string storageRP string storageAccount string storageContainer string @@ -61,7 +63,7 @@ var UpCmd = &cobra.Command{ return } if noK8s { - if !updateSymphonyContext("no-k8s", "localhost") { + if !updateSymphonyContext("no-k8s", "localhost", config.MaestroMqttConfig{}) { return } os.Setenv("SYMPHONY_API_URL", "http://localhost:8082/v1alpha2/") @@ -110,7 +112,17 @@ var UpCmd = &cobra.Command{ return } - if !updateSymphonyContext(k8sContext, apiAddress) { + mqttConfig := config.MaestroMqttConfig{} + if withMqttBroker { + _, mqttAddress := checkMqttAddress() + mqttConfig = config.MaestroMqttConfig{ + BrokerAddress: mqttAddress, + RequestTopic: "coa-request", + ResponseTopic: "coa-response", + } + } + + if !updateSymphonyContext(k8sContext, apiAddress, mqttConfig) { return } @@ -157,6 +169,8 @@ func init() { UpCmd.Flags().StringVarP(&symphonyVersion, "symphony-version", "s", SymphonyAPIVersion, "Symphony API version") UpCmd.Flags().BoolVar(&noK8s, "no-k8s", false, "Launch in standalone mode (no Kubernetes)") UpCmd.Flags().BoolVar(&noRestApi, "no-rest-api", false, "Doesn't expose Symphony API, interact with k8s.") + UpCmd.Flags().BoolVar(&withMqttBroker, "with-mqtt-broker", false, "Install Mosquitto MQTT broker alongside Symphony API") + UpCmd.Flags().StringVar(&helmChartPath, "helm-chart-path", "", "Use a local Helm chart path instead of OCI chart (e.g. ./packages/helm/symphony)") //UpCmd.Flags().StringVarP(&portalVersion, "portal-version", "p", KANPortalVersion, "Symphony Portal version") //UpCmd.Flags().StringVarP(&portalType, "with-portal", "", "", "Install Symphony Portal") // UpCmd.Flags().StringVarP(&storageRP, "storage-resource-group", "", "", "Azure Storage account resource group") @@ -215,6 +229,22 @@ func checkSymphonyAddress() (bool, string) { } } +func checkMqttAddress() (bool, string) { + count := 0 + for { + str, _, err := utils.RunCommand("Checking public Symphony MQTT broker address", "", verbose, "kubectl", "get", "svc", "symphony-mqtt", "-n", namespace, "-o", "jsonpath={.status.loadBalancer.ingress[0].ip}") + if err == nil && str != "" { + return true, "tcp://" + str + ":1883" + } + count += 1 + if count > 5 { + fmt.Printf("\n%s Failed to check public Symphony MQTT broker address. Falling back to in-cluster service DNS.%s\n\n", utils.ColorYellow(), utils.ColorReset()) + return true, fmt.Sprintf("tcp://symphony-mqtt.%s.svc.cluster.local:1883", namespace) + } + time.Sleep(5 * time.Second) + } +} + // func handlePortal(apiAddress string) bool { // switch strings.ToLower(portalType) { // case "oss": @@ -253,8 +283,18 @@ func checkSymphonyAddress() (bool, string) { func handleSymphony(norest bool) bool { str, _, _ := utils.RunCommand("Checking Symphony API (Symphony)", "done", verbose, "helm", "list", "-q", "-l", "name=symphony") - if str != "symphony" { + // Ensure namespace exists, create if not + _, _, nsErr := utils.RunCommand("Checking namespace existence", "done", verbose, "kubectl", "get", "namespace", namespace) + if nsErr != nil { + fmt.Printf(" Namespace '%s' does not exist. Creating...\n", namespace) + _, _, createErr := utils.RunCommand("Creating namespace", "done", verbose, "kubectl", "create", "namespace", namespace) + if createErr != nil { + fmt.Printf("\n%s Failed to create namespace '%s'.%s\n\n", utils.ColorRed(), namespace, utils.ColorReset()) + return false + } + } + if str != "symphony" { cmd := exec.Command("kubectl", "get", "target", "--no-headers=true", "-o", "custom-columns=Name:.metadata.name") stdout, _ := cmd.Output() targets := strings.Fields(string(stdout)) @@ -272,9 +312,45 @@ func handleSymphony(norest bool) bool { } } - fmt.Printf(" Deploying Symphony API (Symphony), installServiceExt: %t\n", !norest) + chartRef := "oci://ghcr.io/eclipse-symphony/helm/symphony" + if strings.TrimSpace(helmChartPath) != "" { + absChartPath, absErr := filepath.Abs(helmChartPath) + if absErr != nil { + fmt.Printf("\n%s Invalid --helm-chart-path: %s%s\n\n", utils.ColorRed(), absErr.Error(), utils.ColorReset()) + return false + } + if _, statErr := os.Stat(absChartPath); statErr != nil { + fmt.Printf("\n%s --helm-chart-path not found: %s%s\n\n", utils.ColorRed(), absChartPath, utils.ColorReset()) + return false + } + chartRef = absChartPath + } + + fmt.Printf(" Deploying Symphony API (Symphony), chart: %s, installServiceExt: %t, with MQTT broker: %t\n", chartRef, !norest, withMqttBroker) installServiceExt := fmt.Sprintf("installServiceExt=%t", !norest) - _, errOutput, err := utils.RunCommandWithRetry("Deploying Symphony API (Symphony)", "done", verbose, debug, "helm", "upgrade", "--install", "symphony", "oci://ghcr.io/eclipse-symphony/helm/symphony", "--version", symphonyVersion, "--set", "CUSTOM_VISION_KEY=dummy", "--set", "symphonyImage.pullPolicy=Always", "--set", "paiImage.pullPolicy=Always", "--set", installServiceExt, "--namespace", namespace) + + helmArgs := []string{ + "upgrade", "--install", "symphony", chartRef, + "--set", "CUSTOM_VISION_KEY=dummy", + "--set", "symphonyImage.pullPolicy=Always", + "--set", "paiImage.pullPolicy=Always", + "--set", installServiceExt, + "--namespace", namespace, + } + if !strings.HasPrefix(chartRef, "oci://") { + helmArgs = append(helmArgs, + "--set", "symphonyImage.tag="+symphonyVersion, + "--set", "paiImage.tag="+symphonyVersion, + ) + } + if withMqttBroker { + helmArgs = append(helmArgs, "--set", "mqtt.enabled=true") + } + if strings.HasPrefix(chartRef, "oci://") { + helmArgs = append(helmArgs, "--version", symphonyVersion) + } + + _, errOutput, err := utils.RunCommandWithRetry("Deploying Symphony API (Symphony)", "done", verbose, debug, "helm", helmArgs...) if err != nil { fmt.Printf("\n%s Failed.%s\n\n", utils.ColorRed(), utils.ColorReset()) fmt.Printf("\n%s Detailed Messages: %s%s\n\n", utils.ColorRed(), errOutput, utils.ColorReset()) @@ -289,8 +365,8 @@ func handleDocker() bool { } return true } -func updateSymphonyContext(context string, apiAddress string) bool { - err := config.UpdateMaestroConfig(context, apiAddress) +func updateSymphonyContext(context string, apiAddress string, mqtt config.MaestroMqttConfig) bool { + err := config.UpdateMaestroConfig(context, apiAddress, mqtt) if err != nil { fmt.Printf("\n%s Failed to update maestro config file.%s\n\n", utils.ColorRed(), utils.ColorReset()) } diff --git a/cli/config/config.go b/cli/config/config.go index 676510b2c..3a6336bdf 100644 --- a/cli/config/config.go +++ b/cli/config/config.go @@ -20,7 +20,8 @@ type TargetProviderConfig struct { Config map[string]interface{} `json:"config"` } type SymphonyAgentConfig struct { - SiteInfo struct { + ShutdownGracePeriod string `json:"shutdownGracePeriod,omitempty"` + SiteInfo struct { SiteId string `json:"siteId"` CurrentSite struct { BaseURL string `json:"baseUrl"` @@ -29,6 +30,8 @@ type SymphonyAgentConfig struct { } `json:"currentSite"` } `json:"siteInfo"` API struct { + Pubsub map[string]interface{} `json:"pubsub,omitempty"` + Keylock map[string]interface{} `json:"keylock,omitempty"` Vendors []struct { Type string `json:"type"` Route string `json:"route"` @@ -38,8 +41,10 @@ type SymphonyAgentConfig struct { Type string `json:"type"` Properties struct { ProvidersPersistentState string `json:"providers.persistentstate"` + ProvidersKeylock string `json:"providers.keylock,omitempty"` IsTarget string `json:"isTarget"` TargetNames string `json:"targetNames"` + TargetNamespace string `json:"targetNamespace,omitempty"` ProvidersConfig string `json:"providers.config"` ProvidersSecret string `json:"providers.secret"` PollEnabled string `json:"poll.enabled"` @@ -60,16 +65,24 @@ type SymphonyAgentConfig struct { } type MaestroContext struct { - Url string `json:"url"` - User string `json:"user"` - Secret string `json:"secret,omitempty"` + Url string `json:"url"` + User string `json:"user"` + Secret string `json:"secret,omitempty"` + Mqtt MaestroMqttConfig `json:"mqtt,omitempty"` +} + +type MaestroMqttConfig struct { + BrokerAddress string `json:"brokerAddress,omitempty"` + RequestTopic string `json:"requestTopic,omitempty"` + ResponseTopic string `json:"responseTopic,omitempty"` + ClientID string `json:"clientID,omitempty"` } type MaestroConfig struct { DefaultContext string `json:"default,omitempty"` Contexts map[string]MaestroContext `json:"contexts,omitempty"` } -func UpdateMaestroConfig(context string, address string) error { +func UpdateMaestroConfig(context string, address string, mqtt MaestroMqttConfig) error { config := GetMaestroConfig("") if config.Contexts == nil { config.Contexts = make(map[string]MaestroContext) @@ -78,6 +91,7 @@ func UpdateMaestroConfig(context string, address string) error { Url: "http://" + address + ":8080/v1alpha2", User: "admin", Secret: "", + Mqtt: mqtt, } config.DefaultContext = context return SaveMaestroConfig(config) diff --git a/packages/helm/symphony/templates/mqtt.yaml b/packages/helm/symphony/templates/mqtt.yaml new file mode 100644 index 000000000..9a9c4ce80 --- /dev/null +++ b/packages/helm/symphony/templates/mqtt.yaml @@ -0,0 +1,86 @@ +{{- if .Values.mqtt.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "symphony.fullname" . }}-mqtt-config + namespace: {{ .Release.Namespace }} +data: + mosquitto.conf: | + listener 1883 + protocol mqtt + allow_anonymous true +--- +{{- if include "CheckMqttPvSetting" . }} +{{- if .Values.mqtt.persistentVolume.enabled}} +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: mqtt-pvc + namespace: {{ .Release.Namespace }} +spec: + storageClassName: {{ include "MqttPVCStorageClassName" . }} + accessModes: + - {{ .Values.mqtt.persistentVolume.accessMode }} + resources: + requests: + storage: {{ .Values.mqtt.persistentVolume.size }} +{{- end}} +{{- end }} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "symphony.fullname" . }}-mqtt + namespace: {{ .Release.Namespace }} + labels: + app: {{ include "symphony.name" . }}-mqtt +spec: + selector: + matchLabels: + app: {{ include "symphony.name" . }}-mqtt + replicas: 1 + template: + metadata: + labels: + app: {{ include "symphony.name" . }}-mqtt + spec: + containers: + - name: mqtt + image: {{ .Values.mqtt.image }} + ports: + - containerPort: {{ .Values.mqtt.port }} + name: mqtt + volumeMounts: + - name: mqtt-config + mountPath: /mosquitto/config + {{- if .Values.mqtt.persistentVolume.enabled }} + - name: mqtt-data + mountPath: /mosquitto/data + {{- end }} + volumes: + - name: mqtt-config + configMap: + name: {{ include "symphony.fullname" . }}-mqtt-config + {{- if .Values.mqtt.persistentVolume.enabled }} + - name: mqtt-data + persistentVolumeClaim: + claimName: mqtt-pvc + {{- end }} +--- +apiVersion: v1 +kind: Service +metadata: + name: {{ include "symphony.fullname" . }}-mqtt + namespace: {{ .Release.Namespace }} + labels: + app: {{ include "symphony.name" . }}-mqtt +spec: + selector: + app: {{ include "symphony.name" . }}-mqtt + ports: + - name: mqtt + port: {{ .Values.mqtt.port }} + targetPort: {{ .Values.mqtt.port }} + protocol: TCP + type: {{ ternary "LoadBalancer" "ClusterIP" .Values.installServiceExt }} +{{- end }} diff --git a/packages/helm/symphony/templates/symphony-core/_helpers.tpl b/packages/helm/symphony/templates/symphony-core/_helpers.tpl index 91a80bc13..599024ce0 100644 --- a/packages/helm/symphony/templates/symphony-core/_helpers.tpl +++ b/packages/helm/symphony/templates/symphony-core/_helpers.tpl @@ -279,3 +279,46 @@ true {{- define "symphony.tls.caBundleLabelValue" -}} {{- default "false" .Values.observability.tls.caBundleLabelValue }} {{- end }} + +{{- define "MqttPVCStorageClassName" -}} +{{- $pvcName := "mqtt-pvc" -}} +{{- $existingPVC := (lookup "v1" "PersistentVolumeClaim" .Release.Namespace $pvcName) -}} +{{- if .Values.mqtt.persistentVolume.storageClass }} +{{- $storageClass := .Values.mqtt.persistentVolume.storageClass }} +{{- $sc := lookup "storage.k8s.io/v1" "StorageClass" "" $storageClass }} +{{- if not $sc }} +{{- fail (printf "Error: StorageClass '%s' not found. Please create it before installing." $storageClass)}} +{{- end }} +{{- .Values.mqtt.persistentVolume.storageClass -}} +{{- else if $existingPVC }} +{{- $existingPVC.spec.storageClassName -}} +{{- else }} +{{- $defaultStorageClass := "" -}} +{{- range $sc := (lookup "storage.k8s.io/v1" "StorageClass" "" "").items -}} + {{- if and $sc.metadata.annotations (kindIs "map" $sc.metadata.annotations) (hasKey $sc.metadata.annotations "storageclass.kubernetes.io/is-default-class") -}} + {{- $annotations := $sc.metadata.annotations -}} + {{- $labelValue := index $annotations "storageclass.kubernetes.io/is-default-class" -}} + {{- if eq $labelValue "true" -}} + {{- $defaultStorageClass = $sc.metadata.name -}} + {{- end -}} + {{- end -}} +{{- end -}} +{{- if eq $defaultStorageClass "" -}} +{{- fail (printf "Error: No default storage class found. Please ensure a storage class with the label 'is-default-class' set to 'true' exists.")}} +{{- end -}} +{{- $defaultStorageClass -}} +{{- end -}} +{{- end -}} + +{{- define "CheckMqttPvSetting" -}} +{{- $configMap := (lookup "v1" "ConfigMap" .Release.Namespace "mqtt-config-map") -}} +{{- if not $configMap }} +true +{{- else if eq ($configMap.data.pvEnabled | quote) ""}} +true +{{- else if ne ($configMap.data.pvEnabled | quote) (.Values.mqtt.persistentVolume.enabled | quote)}} +{{- fail (printf ".Values.mqtt.persistentVolume.enabled is immutable. Unable to change %s to %s" ($configMap.data.pvEnabled | quote) (.Values.mqtt.persistentVolume.enabled | quote))}} +{{- else}} +true +{{- end -}} +{{- end -}} diff --git a/packages/helm/symphony/values.yaml b/packages/helm/symphony/values.yaml index 47e6e3f0c..f3bc49b1d 100644 --- a/packages/helm/symphony/values.yaml +++ b/packages/helm/symphony/values.yaml @@ -109,3 +109,16 @@ Azure: otlpLogsEndpointGrpc: otlpMetricsEndpointGrpc: otlpInsecureGrpc: true + +mqtt: + # whether to enable and deploy Mosquitto MQTT broker + enabled: false + image: eclipse-mosquitto:2.0 + port: 1883 + persistentVolume: + # whether to enable persistent volume for MQTT broker + enabled: false + # the storage class where the MQTT PV is provisioned. Default SC is used if not specified + storageClass: "" + accessMode: ReadWriteOnce + size: 1Gi