Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions external-service-impl/mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@
</ignoredDependencies>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.mqtt.i18n;

public final class MqttMessages {

// --- LinePayloadFormatter ---
public static final String INVALID_LINE_PROTOCOL = "Invalid line protocol format ,line is {}";
public static final String TAGS_ERROR = "The tags is error , line is {}";
public static final String ATTRIBUTES_ERROR = "The attributes is error , line is {}";
public static final String FIELDS_ERROR = "The fields is error , line is {}";
public static final String TIMESTAMP_ERROR = "The timestamp is error , line is {}";

// --- MPPPublishHandler ---
public static final String ON_PUBLISH_EXCEPTION =
"onPublish execution exception, msg is [{}], error is ";
public static final String PROCESS_RESULT = "process result: {}";

// --- MQTTService ---
public static final String SERVER_START_EXCEPTION = "Exception while starting server";
public static final String STOPPING_MQTT_SERVICE = "Stopping IoTDB MQTT service...";
public static final String MQTT_SERVICE_STOPPED = "IoTDB MQTT service stopped.";

// --- PayloadFormatManager ---
public static final String MQTT_DIR = "mqttDir: {}";
public static final String PAYLOAD_FORMAT_MANAGER_INIT_ERROR =
"MQTT PayloadFormatManager init() error.";
public static final String FORMATTER_IS_NULL = "PayloadFormatManager(), formatter is null.";
public static final String FIND_MQTT_PLUGIN =
"PayloadFormatManager(), find MQTT Payload Plugin {}.";
public static final String MQTT_PLUGIN_JAR_URLS = "MQTT Plugin jarURLs: {}";

// --- JSONPayloadFormatter ---
public static final String PAYLOAD_INVALID = "payload is invalidate";

private MqttMessages() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.mqtt.i18n;

public final class MqttMessages {

// --- LinePayloadFormatter ---
public static final String INVALID_LINE_PROTOCOL = "行协议格式无效,行内容:{}";
public static final String TAGS_ERROR = "标签格式错误,行内容:{}";
public static final String ATTRIBUTES_ERROR = "属性格式错误,行内容:{}";
public static final String FIELDS_ERROR = "字段格式错误,行内容:{}";
public static final String TIMESTAMP_ERROR = "时间戳格式错误,行内容:{}";

// --- MPPPublishHandler ---
public static final String ON_PUBLISH_EXCEPTION =
"onPublish 执行异常,消息为 [{}],错误:";
public static final String PROCESS_RESULT = "处理结果:{}";

// --- MQTTService ---
public static final String SERVER_START_EXCEPTION = "启动服务器时发生异常";
public static final String STOPPING_MQTT_SERVICE = "正在停止 IoTDB MQTT 服务...";
public static final String MQTT_SERVICE_STOPPED = "IoTDB MQTT 服务已停止。";

// --- PayloadFormatManager ---
public static final String MQTT_DIR = "mqttDir:{}";
public static final String PAYLOAD_FORMAT_MANAGER_INIT_ERROR =
"MQTT PayloadFormatManager init() 出错。";
public static final String FORMATTER_IS_NULL = "PayloadFormatManager(),formatter 为 null。";
public static final String FIND_MQTT_PLUGIN =
"PayloadFormatManager(),找到 MQTT Payload 插件 {}。";
public static final String MQTT_PLUGIN_JAR_URLS = "MQTT 插件 jarURLs:{}";

// --- JSONPayloadFormatter ---
public static final String PAYLOAD_INVALID = "payload 无效";

private MqttMessages() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.iotdb.mqtt;

import org.apache.iotdb.mqtt.i18n.MqttMessages;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand Down Expand Up @@ -79,7 +81,7 @@ public List<Message> format(String topic, ByteBuf payload) {
}
return messages;
}
throw new JsonParseException("payload is invalidate");
throw new JsonParseException(MqttMessages.PAYLOAD_INVALID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.iotdb.mqtt;

import org.apache.iotdb.mqtt.i18n.MqttMessages;

import io.netty.buffer.ByteBuf;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.external.commons.lang3.NotImplementedException;
Expand Down Expand Up @@ -83,7 +85,7 @@ public List<Message> format(String topic, ByteBuf payload) {
try {
Matcher matcher = pattern.matcher(line.trim());
if (!matcher.matches()) {
log.warn("Invalid line protocol format ,line is {}", line);
log.warn(MqttMessages.INVALID_LINE_PROTOCOL, line);
continue;
}

Expand All @@ -95,25 +97,25 @@ public List<Message> format(String topic, ByteBuf payload) {

// Parsing Tags
if (!setTags(matcher, message)) {
log.warn("The tags is error , line is {}", line);
log.warn(MqttMessages.TAGS_ERROR, line);
continue;
}

// Parsing Attributes
if (!setAttributes(matcher, message)) {
log.warn("The attributes is error , line is {}", line);
log.warn(MqttMessages.ATTRIBUTES_ERROR, line);
continue;
}

// Parsing Fields
if (!setFields(matcher, message)) {
log.warn("The fields is error , line is {}", line);
log.warn(MqttMessages.FIELDS_ERROR, line);
continue;
}

// Parsing timestamp
if (!setTimestamp(matcher, message)) {
log.warn("The timestamp is error , line is {}", line);
log.warn(MqttMessages.TIMESTAMP_ERROR, line);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.mqtt.i18n.MqttMessages;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;

Expand Down Expand Up @@ -159,7 +160,7 @@ public void onPublish(InterceptPublishMessage msg) {
}
}
} catch (Throwable t) {
LOG.warn("onPublish execution exception, msg is [{}], error is ", msg, t);
LOG.warn(MqttMessages.ON_PUBLISH_EXCEPTION, msg, t);
} finally {
// release the payload of the message
super.onPublish(msg);
Expand Down Expand Up @@ -191,7 +192,7 @@ private void insertTable(TableMessage message, MqttClientSession session) {

tsStatus = result.status;
if (LOG.isDebugEnabled()) {
LOG.debug("process result: {}", tsStatus);
LOG.debug(MqttMessages.PROCESS_RESULT, tsStatus);
}
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
Expand Down Expand Up @@ -310,7 +311,7 @@ private void insertTree(TreeMessage message, MqttClientSession session) {
false);
tsStatus = result.status;
if (LOG.isDebugEnabled()) {
LOG.debug("process result: {}", tsStatus);
LOG.debug(MqttMessages.PROCESS_RESULT, tsStatus);
}
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.externalservice.api.IExternalService;
import org.apache.iotdb.mqtt.i18n.MqttMessages;

import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void startup() {
try {
server.startServer(config, handlers, null, authenticator, null);
} catch (IOException e) {
throw new RuntimeException("Exception while starting server", e);
throw new RuntimeException(MqttMessages.SERVER_START_EXCEPTION, e);
}

LOG.info(
Expand All @@ -74,9 +75,9 @@ public void startup() {
.addShutdownHook(
new Thread(
() -> {
LOG.info("Stopping IoTDB MQTT service...");
LOG.info(MqttMessages.STOPPING_MQTT_SERVICE);
shutdown();
LOG.info("IoTDB MQTT service stopped.");
LOG.info(MqttMessages.MQTT_SERVICE_STOPPED);
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.mqtt.i18n.MqttMessages;

import com.google.common.base.Preconditions;
import org.apache.tsfile.external.commons.io.FileUtils;
Expand Down Expand Up @@ -54,13 +55,13 @@ private PayloadFormatManager() {}

private static void init() {
mqttDir = IoTDBDescriptor.getInstance().getConfig().getMqttDir();
logger.info("mqttDir: {}", mqttDir);
logger.info(MqttMessages.MQTT_DIR, mqttDir);

try {
makeMqttPluginDir();
buildMqttPluginMap();
} catch (IOException e) {
logger.error("MQTT PayloadFormatManager init() error.", e);
logger.error(MqttMessages.PAYLOAD_FORMAT_MANAGER_INIT_ERROR, e);
}
}

Expand All @@ -83,17 +84,17 @@ private static void buildMqttPluginMap() throws IOException {
ServiceLoader.load(PayloadFormatter.class, PayloadFormatManager.class.getClassLoader());
for (PayloadFormatter formatter : payloadFormatters) {
if (formatter == null) {
logger.error("PayloadFormatManager(), formatter is null.");
logger.error(MqttMessages.FORMATTER_IS_NULL);
continue;
}

String pluginName = formatter.getName();
mqttPayloadPluginMap.put(pluginName, formatter);
logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName);
logger.info(MqttMessages.FIND_MQTT_PLUGIN, pluginName);
}

URL[] jarURLs = getPluginJarURLs(mqttDir);
logger.debug("MQTT Plugin jarURLs: {}", Arrays.toString(jarURLs));
logger.debug(MqttMessages.MQTT_PLUGIN_JAR_URLS, Arrays.toString(jarURLs));

for (URL jarUrl : jarURLs) {
ClassLoader classLoader = new URLClassLoader(new URL[] {jarUrl});
Expand All @@ -104,7 +105,7 @@ private static void buildMqttPluginMap() throws IOException {

for (PayloadFormatter formatter : payloadFormatters2) {
if (formatter == null) {
logger.error("PayloadFormatManager(), formatter is null.");
logger.error(MqttMessages.FORMATTER_IS_NULL);
continue;
}

Expand All @@ -113,7 +114,7 @@ private static void buildMqttPluginMap() throws IOException {
continue;
}
mqttPayloadPluginMap.put(pluginName, formatter);
logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName);
logger.info(MqttMessages.FIND_MQTT_PLUGIN, pluginName);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions external-service-impl/rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@
</usedDependencies>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.rest.i18n;

public final class RestMessages {

// --- RestService ---
public static final String REST_SERVICE_START_FAILED = "RestService failed to start: {}";
public static final String REST_SERVICE_START_SUCCESS = "start RestService successfully";
public static final String REST_SERVICE_STOP_FAILED = "RestService failed to stop: {}";

// --- StatementConstructionHandler (v1 / v2 / table) ---
public static final String INVALID_INPUT = "Invalid input: ";

// --- RequestValidationHandler (v2) ---
public static final String PREFIX_PATHS_EMPTY = "prefix_paths should not be empty";

private RestMessages() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.rest.i18n;

public final class RestMessages {

// --- RestService ---
public static final String REST_SERVICE_START_FAILED = "RestService 启动失败:{}";
public static final String REST_SERVICE_START_SUCCESS = "RestService 启动成功";
public static final String REST_SERVICE_STOP_FAILED = "RestService 停止失败:{}";

// --- StatementConstructionHandler (v1 / v2 / table) ---
public static final String INVALID_INPUT = "无效输入:";

// --- RequestValidationHandler (v2) ---
public static final String PREFIX_PATHS_EMPTY = "prefix_paths 不能为空";

private RestMessages() {}
}
Loading
Loading