Commit 31d0ca6b authored by 周昊's avatar 周昊

1、修改正式环境数据库配置

parent 0d40ecc9
...@@ -99,7 +99,7 @@ public class StreamingJob { ...@@ -99,7 +99,7 @@ public class StreamingJob {
}); });
//3、5 输出kafka //3、5 输出kafka
outputStreamOperator.addSink(new FlinkKafkaProducer("127.0.0.1:9092", "test-topic", new SimpleStringSchema())); outputStreamOperator.addSink(new FlinkKafkaProducer("172.16.20.211:9092", "test-topic", new SimpleStringSchema()));
//3、6 打印输出 //3、6 打印输出
outputStreamOperator.print(); outputStreamOperator.print();
......
...@@ -32,7 +32,7 @@ public class MqttConsumer extends RichParallelSourceFunction<AlgorithmPushDto> { ...@@ -32,7 +32,7 @@ public class MqttConsumer extends RichParallelSourceFunction<AlgorithmPushDto> {
//包装连接的方法 //包装连接的方法
private void connect() throws MqttException { private void connect() throws MqttException {
//配置连接参数 //配置连接参数
MqttConfig mqttConfigBean = new MqttConfig("", "", "tcp://127.0.0.1:1883", "DC" + (int) (Math.random() * 100000000), msgTopic); MqttConfig mqttConfigBean = new MqttConfig("", "", "tcp://172.16.20.211:1883", "DC" + (int) (Math.random() * 100000000), msgTopic);
//连接mqtt服务器 //连接mqtt服务器
client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence()); client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment