Commit 3a2778fd authored by 周昊's avatar 周昊

1、修改日志

parent 98a67e3b
...@@ -43,13 +43,19 @@ public class StreamingJob { ...@@ -43,13 +43,19 @@ public class StreamingJob {
//2、接收消息并将数据转化为流 //2、接收消息并将数据转化为流
//根据场景id 拼接对应的mqtt频道名称 //根据场景id 拼接对应的mqtt频道名称
DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/cpptest/" + sceneId)); DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/cpptest/" + (sceneId - 1)));
//3、进行处理 //3、进行处理
//3、0 更新场景在线时间 //3、0 更新场景在线时间
SingleOutputStreamOperator<AlgorithmPushDto> streamOperator = mqttStream SingleOutputStreamOperator<AlgorithmPushDto> streamOperator = mqttStream
.filter(new UpdateLiveFilterFunction(sceneId)); .filter(new UpdateLiveFilterFunction(sceneId));
//3、4 Object -> String
streamOperator = streamOperator.map(algorithmPushDto ->{
System.out.println(algorithmPushDto);
return algorithmPushDto;
});
//3、1 默认 是否存在结果集判断 //3、1 默认 是否存在结果集判断
streamOperator = streamOperator streamOperator = streamOperator
.filter(new ResultExistFilterFunction()); .filter(new ResultExistFilterFunction());
...@@ -69,10 +75,7 @@ public class StreamingJob { ...@@ -69,10 +75,7 @@ public class StreamingJob {
} }
//3、4 Object -> String //3、4 Object -> String
SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(algorithmPushDto ->{ SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(AlgorithmPushDto::toString);
System.out.println(algorithmPushDto);
return algorithmPushDto.toString();
});
//3、5 输出kafka //3、5 输出kafka
// outputStreamOperator.addSink(new FlinkKafkaProducer("192.168.10.137:9092", "test-topic", new SimpleStringSchema())); // outputStreamOperator.addSink(new FlinkKafkaProducer("192.168.10.137:9092", "test-topic", new SimpleStringSchema()));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment