4949
5050import static org .zstack .core .Platform .*;
5151import static org .zstack .core .cloudbus .CloudBusGlobalProperty .SYNC_CALL_TIMEOUT ;
52+ import static org .zstack .header .errorcode .SysErrors .CLOUD_BUS_SEND_ERROR ;
5253import static org .zstack .utils .BeanUtils .getProperty ;
5354import static org .zstack .utils .BeanUtils .setProperty ;
5455
@@ -101,6 +102,7 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
101102 private final static TimeoutRestTemplate http = RESTFacade .createRestTemplate (CoreGlobalProperty .REST_FACADE_READ_TIMEOUT , CoreGlobalProperty .REST_FACADE_CONNECT_TIMEOUT );
102103
103104 public static final String HTTP_BASE_URL = "/cloudbus" ;
105+ public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion (null );
104106
105107 {
106108 if (CloudBusGlobalProperty .MESSAGE_LOG != null ) {
@@ -114,6 +116,8 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
114116 CloudBusGlobalProperty .HTTP_CONTEXT_PATH = "" ;
115117 CloudBusGlobalProperty .HTTP_PORT = 8989 ;
116118 }
119+
120+ SEND_CONFIRMED .success ();
117121 }
118122
119123 public static String getManagementNodeUUIDFromServiceID (String serviceID ) {
@@ -125,6 +129,12 @@ public static String getManagementNodeUUIDFromServiceID(String serviceID) {
125129 return ss [0 ];
126130 }
127131
132+ private FutureCompletion sendFail (ErrorCode errorCode ) {
133+ FutureCompletion c = new FutureCompletion (null );
134+ c .fail (errorCode );
135+ return c ;
136+ }
137+
128138 private abstract class Envelope {
129139 long startTime ;
130140
@@ -250,8 +260,8 @@ public void deActiveService(String id) {
250260 }
251261
252262 @ Override
253- public void send (Message msg ) {
254- send (msg , true );
263+ public FutureCompletion send (Message msg ) {
264+ return send (msg , true );
255265 }
256266
257267 @ Override
@@ -291,11 +301,11 @@ private MessageReply createTimeoutReply(NeedReplyMessage m) {
291301 }
292302
293303 @ Override
294- public void send (NeedReplyMessage msg , CloudBusCallBack callback ) {
304+ public FutureCompletion send (NeedReplyMessage msg , CloudBusCallBack callback ) {
295305 evaluateMessageTimeout (msg );
296306 if (msg .getTimeout () <= 1 ) {
297307 callback .run (createTimeoutReply (msg ));
298- return ;
308+ return SEND_CONFIRMED ;
299309 }
300310
301311 Envelope e = new Envelope () {
@@ -346,7 +356,7 @@ public void timeout() {
346356
347357 envelopes .put (msg .getId (), e );
348358 msgExts .forEach (m -> m .afterAddEnvelopes (msg .getId ()));
349- send (msg , false );
359+ return send (msg , false );
350360 }
351361
352362 @ Override
@@ -454,7 +464,7 @@ public void reply(Message request, MessageReply reply) {
454464 callReplyPreSendingExtensions (reply , (NeedReplyMessage ) request );
455465 } catch (Exception e ) {
456466 logger .error ("failed to call pre-sending reply extension:" , e );
457- reply .setError (operr ( e .getMessage ()));
467+ reply .setError (err ( CLOUD_BUS_SEND_ERROR , e .getMessage ()));
458468 }
459469 }
460470
@@ -526,29 +536,36 @@ public MessageSender(Message msg) {
526536 localSend = !CloudBusGlobalProperty .HTTP_ALWAYS && managementNodeId .equals (Platform .getManagementServerId ());
527537 }
528538
529- void send () {
539+ FutureCompletion send () {
530540 try {
531- doSend ();
541+ return doSend ();
532542 } catch (Throwable th ) {
533- replyErrorIfNeeded (operr (th .getMessage ()));
543+ ErrorCode err = err (CLOUD_BUS_SEND_ERROR , th .getMessage ());
544+ replyErrorIfNeeded (err );
545+
546+ FutureCompletion c = new FutureCompletion (null );
547+ c .fail (err );
548+ return c ;
534549 }
535550 }
536551
537- private void doSend () {
552+ private FutureCompletion doSend () {
538553 if (msg instanceof Event ) {
539554 eventSend ();
540- return ;
555+ return SEND_CONFIRMED ;
541556 }
542557
543558 if (localSend ) {
544559 localSend ();
560+ return SEND_CONFIRMED ;
545561 } else {
546- httpSend ();
562+ return httpSend ();
547563 }
548564 }
549565
550- private void httpSendInQueue (String ip ) {
551- thdf .chainSubmit (new ChainTask (null ) {
566+ private FutureCompletion httpSendInQueue (String ip ) {
567+ FutureCompletion sendCompletion = new FutureCompletion (null );
568+ thdf .chainSubmit (new ChainTask (sendCompletion ) {
552569 @ Override
553570 public String getSyncSignature () {
554571 return "http-send-in-queue" ;
@@ -557,6 +574,7 @@ public String getSyncSignature() {
557574 @ Override
558575 public void run (SyncTaskChain chain ) {
559576 httpSend (ip );
577+ sendCompletion .success ();
560578 chain .next ();
561579 }
562580
@@ -570,25 +588,29 @@ public String getName() {
570588 return getSyncSignature ();
571589 }
572590 });
591+ return sendCompletion ;
573592 }
574593
575- private void httpSend () {
594+ private FutureCompletion httpSend () {
576595 buildSchema (msg );
596+ String ip ;
577597 try {
578- String ip = destMaker .getNodeInfo (managementNodeId ).getNodeIP ();
579- httpSendInQueue (ip );
598+ ip = destMaker .getNodeInfo (managementNodeId ).getNodeIP ();
580599 } catch (ManagementNodeNotFoundException e ) {
581- if (msg instanceof MessageReply ) {
582- if (!deadMessageManager .handleManagementNodeNotFoundError (managementNodeId , msg , () -> {
583- String ip = destMaker .getNodeInfo (managementNodeId ).getNodeIP ();
584- httpSendInQueue (ip );
585- })) {
586- throw e ;
587- }
600+ boolean errorHandled = msg instanceof MessageReply &&
601+ deadMessageManager .handleManagementNodeNotFoundError (managementNodeId , msg , () -> {
602+ String otherIp = destMaker .getNodeInfo (managementNodeId ).getNodeIP ();
603+ logger .warn (String .format ("resend the message[id:%s] to node[ip:%s]" , msg .getId (), otherIp ));
604+ httpSendInQueue (otherIp );
605+ });
606+ if (errorHandled ) {
607+ return SEND_CONFIRMED ;
588608 } else {
589609 throw e ;
590610 }
591611 }
612+
613+ return httpSendInQueue (ip );
592614 }
593615
594616 private void httpSend (String ip ) {
@@ -612,12 +634,12 @@ protected ResponseEntity<String> call() {
612634 }.run ();
613635
614636 if (!rsp .getStatusCode ().is2xxSuccessful ()) {
615- replyErrorIfNeeded (operr ( "HTTP ERROR, status code: %s, body: %s" , rsp .getStatusCode (), rsp .getBody ()));
637+ replyErrorIfNeeded (err ( CLOUD_BUS_SEND_ERROR , "HTTP ERROR, status code: %s, body: %s" , rsp .getStatusCode (), rsp .getBody ()));
616638 }
617639 } catch (OperationFailureException e ) {
618640 replyErrorIfNeeded (e .getErrorCode ());
619641 } catch (Throwable e ) {
620- replyErrorIfNeeded (operr ( e .getMessage ()));
642+ replyErrorIfNeeded (err ( CLOUD_BUS_SEND_ERROR , e .getMessage ()));
621643 }
622644 }
623645
@@ -1194,7 +1216,7 @@ private void evalThreadContextToMessage(Message msg) {
11941216 }
11951217 }
11961218
1197- private void doSendAndCallExtensions (Message msg ) {
1219+ private FutureCompletion doSendAndCallExtensions (Message msg ) {
11981220 // for unit test finding invocation chain
11991221 MessageCommandRecorder .record (msg .getClass ());
12001222
@@ -1209,20 +1231,20 @@ private void doSendAndCallExtensions(Message msg) {
12091231 interceptor .beforeSendMessage (msg );
12101232 }
12111233
1212- doSend (msg );
1234+ return doSend (msg );
12131235 }
12141236
1215- private void doSend (Message msg ) {
1237+ private FutureCompletion doSend (Message msg ) {
12161238 evalThreadContextToMessage (msg );
12171239
12181240 if (logger .isTraceEnabled () && islogMessage (msg )) {
12191241 logger .trace (String .format ("[msg send]: %s" , dumpMessage (msg )));
12201242 }
12211243
1222- new MessageSender (msg ).send ();
1244+ return new MessageSender (msg ).send ();
12231245 }
12241246
1225- private void send (Message msg , Boolean noNeedReply ) {
1247+ private FutureCompletion send (Message msg , Boolean noNeedReply ) {
12261248 if (msg .getServiceId () == null ) {
12271249 throw new IllegalArgumentException (String .format ("service id cannot be null: %s" , msg .getClass ().getName ()));
12281250 }
@@ -1238,7 +1260,7 @@ private void send(Message msg, Boolean noNeedReply) {
12381260 msg .putHeaderEntry (NO_NEED_REPLY_MSG , noNeedReply .toString ());
12391261 }
12401262
1241- doSendAndCallExtensions (msg );
1263+ return doSendAndCallExtensions (msg );
12421264 }
12431265
12441266 private void restoreFromSchema (Message msg , Map raw ) throws ClassNotFoundException {
0 commit comments