Commit 07c31e19 authored by 周昊's avatar 周昊

Merge branch 'master' into production

# Conflicts:
#	src/main/java/com/censoft/flink/StreamingJob.java
parents d34f7282 f0e5bef6
package com.censoft.flink;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import com.alibaba.fastjson2.JSON;
import com.censoft.flink.domain.*;
import com.censoft.flink.mapper.SqlFactory;
import com.censoft.flink.mqtt.MqttConsumer;
import com.censoft.flink.transform.AlgorithmBaseFilterFunction;
......@@ -19,7 +18,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
public class StreamingJob {
......@@ -37,6 +38,12 @@ public class StreamingJob {
//2.2 获取场景算法块数据
List<AlgorithmScenePiecePo> algorithmScenePieceList = sqlFactory.getAlgorithmScenePieceList(sceneId);
//2.3 获取算法列表
List<AlgorithmPo> algorithmPoList = sqlFactory.getAlgorithmPoList();
//2.4 获取摄像头列表
List<AlgorithmCameraPo> algorithmCameraPoList = sqlFactory.getAlgorithmCameraPoList();
//1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
......@@ -71,15 +78,46 @@ public class StreamingJob {
}
//3、4 Object -> String
SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(AlgorithmPushDto::toString);
SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(algorithmPushDto -> {
AlgorithmWarnPushDto warnPushBase = new AlgorithmWarnPushDto();
warnPushBase.setOrderName("AI预警平台");
//获取摄像头名称
warnPushBase.setAlarmDeviceName(getAlgorithmCameraPoByRtsp(algorithmCameraPoList,algorithmPushDto.getCameraName())
.getCameraName());
warnPushBase.setGradeName("二级预警");
//获取摄像头名称
warnPushBase.setGradeType(getAlgorithmPoById(algorithmPoList,Long.valueOf(algorithmPushDto.getAlgorithmName()))
.getAlgorithmName());
warnPushBase.setAlarmTime(new Date(algorithmPushDto.getTimeStamp() * 1000));
warnPushBase.setPicture("http://172.16.21.3"+algorithmPushDto.getPictureAddress());
warnPushBase.setStatus("wait");
warnPushBase.setSendUserIds("|1|");
warnPushBase.setDelFlag(0);
return JSON.toJSONString(warnPushBase);
});
//3、5 输出kafka
outputStreamOperator.addSink(new FlinkKafkaProducer("192.168.10.137:9092", "test-topic", new SimpleStringSchema()));
outputStreamOperator.addSink(new FlinkKafkaProducer("172.16.33.152:9092", "test-topic", new SimpleStringSchema()));
//3、6 打印输出
outputStreamOperator.print();
outputStreamOperator.writeAsText("D:/word"+sceneId+".txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
outputStreamOperator.writeAsText("D:/word" + sceneId + ".txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
//3、7 自动执行
env.execute();
}
private static AlgorithmCameraPo getAlgorithmCameraPoByRtsp(List<AlgorithmCameraPo> cameraPos, String rtsp) {
return cameraPos
.stream()
.filter(cameraPo -> rtsp.equals(cameraPo.getRtsp())).findFirst().get();
}
private static AlgorithmPo getAlgorithmPoById(List<AlgorithmPo> algorithmPos, Long id) {
return algorithmPos
.stream()
.filter(algorithmPo -> id.equals(algorithmPo.getId())).findFirst().get();
}
}
package com.censoft.flink.domain;
import lombok.Data;
import java.io.Serializable;
/**
* @author 周昊
* @desc ...
* @date 2023-09-15 16:43:51
*/
@Data
public class AlgorithmCameraPo implements Serializable {
/** 主键id */
private Long id;
/** 摄像头名 */
private String cameraName;
/** 摄像头视频流地址 */
private String rtsp;
/** 排序 */
private Long sort;
/** 帐号状态(0正常 */
private String status;
/** 删除标志(0代表存在 */
private String delFlag;
}
package com.censoft.flink.domain;
import lombok.Data;
import java.io.Serializable;
/**
* @author 周昊
* @desc ...
* @date 2023-09-15 16:47:21
*/
@Data
public class AlgorithmPo implements Serializable {
/** 主键id */
private Long id;
/** 算法名 */
private String algorithmName;
/** 算法唯一标识 */
private String algorithmKey;
/** 排序 */
private Long sort;
/** 帐号状态(0正常 */
private String status;
/** 删除标志(0代表存在 */
private String delFlag;
}
package com.censoft.flink.domain;
import lombok.Data;
import java.util.Date;
/**
* 算法计算结果标准接收对象
*/
@Data
public class AlgorithmWarnPushDto {
/** 所属系统名称 */
private String orderName;
/** 预警设备名称 */
private String alarmDeviceName;
/** 预警等级 */
private String gradeName;
/** 预警类型 */
private String gradeType;
/** 报警时间 */
private Date alarmTime;
/** 图片地址 */
private String picture;
/** 处置状态 */
private String status;
/** 接受发送人 */
private String sendUserIds;
/** 删除标记 */
private Integer delFlag;
}
package com.censoft.flink.mapper;
import com.censoft.flink.domain.AlgorithmCameraPo;
import com.censoft.flink.domain.AlgorithmPo;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import org.apache.ibatis.annotations.Param;
......@@ -13,4 +15,8 @@ public interface AlgorithmSceneDao {
public List<AlgorithmScenePiecePo> getAlgorithmScenePieceList(@Param("sceneId") Long sceneId);
public int updateLiveBySceneId(Long sceneId);
public List<AlgorithmCameraPo> getAlgorithmCameraPoList();
public List<AlgorithmPo> getAlgorithmPoList();
}
......@@ -45,5 +45,29 @@
ORDER BY asp.sort ASC
</select>
<select id="getAlgorithmCameraPoList" resultType="com.censoft.flink.domain.AlgorithmCameraPo">
SELECT
id,
camera_name as cameraName,
rtsp,
sort,
STATUS,
del_flag as delFlag
FROM
algorithm_camera_base
</select>
<select id="getAlgorithmPoList" resultType="com.censoft.flink.domain.AlgorithmPo">
SELECT
id,
algorithm_name AS algorithmName,
algorithm_key AS algorithmKey,
sort,
STATUS,
del_flag AS delFlag
FROM
algorithm_base
</select>
</mapper>
\ No newline at end of file
package com.censoft.flink.mapper;
import com.censoft.flink.domain.AlgorithmCameraPo;
import com.censoft.flink.domain.AlgorithmPo;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import org.apache.ibatis.io.Resources;
......@@ -42,6 +44,30 @@ public class SqlFactory {
return algorithmScenePiecePoList;
}
public List<AlgorithmCameraPo> getAlgorithmCameraPoList() throws IOException {
SqlSession session = initSqlSession();
//指定要执行的SQL语句的id
//SQL语句的id=namespace+"."+SQL语句所在标签的id属性的值
String SQLID = "com.censoft.flink.mapper.AlgorithmSceneDao" + "." + "getAlgorithmCameraPoList";
//通过SqlSession对象的方法执行SQL语句
List<AlgorithmCameraPo> algorithmCameraPos = session.selectList(SQLID);
//最后我们关闭SqlSession对象
session.close();
return algorithmCameraPos;
}
public List<AlgorithmPo> getAlgorithmPoList() throws IOException {
SqlSession session = initSqlSession();
//指定要执行的SQL语句的id
//SQL语句的id=namespace+"."+SQL语句所在标签的id属性的值
String SQLID = "com.censoft.flink.mapper.AlgorithmSceneDao" + "." + "getAlgorithmPoList";
//通过SqlSession对象的方法执行SQL语句
List<AlgorithmPo> algorithmPos = session.selectList(SQLID);
//最后我们关闭SqlSession对象
session.close();
return algorithmPos;
}
public int updateLiveBySceneId(Long sceneId) throws IOException {
SqlSession session = initSqlSession();
//指定要执行的SQL语句的id
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment