Commit 98a67e3b authored by 周昊's avatar 周昊

1、修改mqttjson数据结构

parent 7708b1e5
...@@ -9,13 +9,11 @@ import com.censoft.flink.transform.AlgorithmBaseFilterFunction; ...@@ -9,13 +9,11 @@ import com.censoft.flink.transform.AlgorithmBaseFilterFunction;
import com.censoft.flink.transform.AlgorithmTypeFlatMapFunction; import com.censoft.flink.transform.AlgorithmTypeFlatMapFunction;
import com.censoft.flink.transform.ResultExistFilterFunction; import com.censoft.flink.transform.ResultExistFilterFunction;
import com.censoft.flink.transform.UpdateLiveFilterFunction; import com.censoft.flink.transform.UpdateLiveFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays; import java.util.Arrays;
...@@ -44,8 +42,8 @@ public class StreamingJob { ...@@ -44,8 +42,8 @@ public class StreamingJob {
//2、接收消息并将数据转化为流 //2、接收消息并将数据转化为流
//todo 根据场景id 拼接对应的mqtt频道名称 //根据场景id 拼接对应的mqtt频道名称
DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/test1/" + sceneId)); DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/cpptest/" + sceneId));
//3、进行处理 //3、进行处理
//3、0 更新场景在线时间 //3、0 更新场景在线时间
...@@ -71,15 +69,17 @@ public class StreamingJob { ...@@ -71,15 +69,17 @@ public class StreamingJob {
} }
//3、4 Object -> String //3、4 Object -> String
SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(algorithmPushDto -> algorithmPushDto.toString()); SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(algorithmPushDto ->{
System.out.println(algorithmPushDto);
return algorithmPushDto.toString();
});
//3、5 输出kafka //3、5 输出kafka
// outputStreamOperator.addSink(new FlinkKafkaProducer("192.168.10.137:9092", "test-topic", new SimpleStringSchema()));
outputStreamOperator.addSink(new FlinkKafkaProducer("192.168.10.137:9092", "test-topic", new SimpleStringSchema()));
//3、6 打印输出 //3、6 打印输出
outputStreamOperator.print(); outputStreamOperator.print();
// outputStreamOperator.writeAsText("D:/word1.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1); outputStreamOperator.writeAsText("D:/word"+sceneId+".txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
//3、7 自动执行 //3、7 自动执行
env.execute(); env.execute();
} }
......
...@@ -10,22 +10,30 @@ import java.util.List; ...@@ -10,22 +10,30 @@ import java.util.List;
@Data @Data
public class AlgorithmPushDto { public class AlgorithmPushDto {
//摄像头标识符 //算法名称
private Integer sort; private String algorithmName;
//场景id //摄像头名称
private String sceneId; private String cameraName;
//视频地址
private String cameraUrl;
//图片地址 //图片地址
private String pictureAddress; private String pictureAddress;
//时间戳
private Long timeStamp;
//预警返回值 //预警返回值
private List<AlgorithmPushResultBoxDto> result; private List<AlgorithmPushResultBoxDto> result;
//摄像头标识符
private Integer sort;
//时间戳
private Long timeStamp;
//算法类别 不需要算法传 分类后赋值 //算法类别 不需要算法传 分类后赋值
private String algorithmType; private String algorithmType;
} }
...@@ -32,7 +32,7 @@ public class MqttConsumer extends RichParallelSourceFunction<AlgorithmPushDto> { ...@@ -32,7 +32,7 @@ public class MqttConsumer extends RichParallelSourceFunction<AlgorithmPushDto> {
//包装连接的方法 //包装连接的方法
private void connect() throws MqttException { private void connect() throws MqttException {
//配置连接参数 //配置连接参数
MqttConfig mqttConfigBean = new MqttConfig("", "", "tcp://192.168.4.221:1883", "DC" + (int) (Math.random() * 100000000), msgTopic); MqttConfig mqttConfigBean = new MqttConfig("", "", "tcp://192.168.3.82:1883", "DC" + (int) (Math.random() * 100000000), msgTopic);
//连接mqtt服务器 //连接mqtt服务器
client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence()); client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
...@@ -82,6 +82,7 @@ public class MqttConsumer extends RichParallelSourceFunction<AlgorithmPushDto> { ...@@ -82,6 +82,7 @@ public class MqttConsumer extends RichParallelSourceFunction<AlgorithmPushDto> {
//连接失败回调该函数 //连接失败回调该函数
@Override @Override
public void connectionLost(Throwable throwable) { public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
System.out.println("MQTT连接断开,发起重连"); System.out.println("MQTT连接断开,发起重连");
while (true) { while (true) {
try { try {
......
...@@ -34,9 +34,11 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu ...@@ -34,9 +34,11 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
.map(result -> { .map(result -> {
//复制 //复制
AlgorithmPushDto pushDto = new AlgorithmPushDto(); AlgorithmPushDto pushDto = new AlgorithmPushDto();
pushDto.setSort(algorithmPushDto.getSort()); pushDto.setAlgorithmName(algorithmPushDto.getAlgorithmName());
pushDto.setSceneId(algorithmPushDto.getSceneId()); pushDto.setCameraName(algorithmPushDto.getCameraName());
pushDto.setCameraUrl(algorithmPushDto.getCameraUrl());
pushDto.setPictureAddress(algorithmPushDto.getPictureAddress()); pushDto.setPictureAddress(algorithmPushDto.getPictureAddress());
pushDto.setSort(algorithmPushDto.getSort());
pushDto.setTimeStamp(algorithmPushDto.getTimeStamp()); pushDto.setTimeStamp(algorithmPushDto.getTimeStamp());
String label = result.getLabel(); String label = result.getLabel();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment