Commit e647376d authored by 周昊's avatar 周昊

Merge branch 'master' into production

parents 75295791 3a2778fd
......@@ -45,13 +45,19 @@ public class StreamingJob {
//2、接收消息并将数据转化为流
//根据场景id 拼接对应的mqtt频道名称
DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/cpptest/" + sceneId));
DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/cpptest/" + (sceneId - 1)));
//3、进行处理
//3、0 更新场景在线时间
SingleOutputStreamOperator<AlgorithmPushDto> streamOperator = mqttStream
.filter(new UpdateLiveFilterFunction(sceneId));
//3、4 Object -> String
streamOperator = streamOperator.map(algorithmPushDto ->{
System.out.println(algorithmPushDto);
return algorithmPushDto;
});
//3、1 默认 是否存在结果集判断
streamOperator = streamOperator
.filter(new ResultExistFilterFunction());
......@@ -71,10 +77,7 @@ public class StreamingJob {
}
//3、4 Object -> String
SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(algorithmPushDto ->{
System.out.println(algorithmPushDto);
return algorithmPushDto.toString();
});
SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(AlgorithmPushDto::toString);
//3、5 输出kafka
outputStreamOperator.addSink(new FlinkKafkaProducer("172.16.33.152:9092", "test-topic", new SimpleStringSchema()));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment