Commit 6dd9cfe6 authored by 周昊's avatar 周昊

Merge branch 'master' into production

parents d492d11f e3f262be
...@@ -93,6 +93,7 @@ public class StreamingJob { ...@@ -93,6 +93,7 @@ public class StreamingJob {
warnPushBase.setStatus("wait"); warnPushBase.setStatus("wait");
warnPushBase.setSendUserIds("|1|"); warnPushBase.setSendUserIds("|1|");
warnPushBase.setDelFlag(0); warnPushBase.setDelFlag(0);
warnPushBase.setRemark(algorithmPushDto.getRemark());
return JSON.toJSONString(warnPushBase); return JSON.toJSONString(warnPushBase);
}); });
......
...@@ -34,6 +34,9 @@ public class AlgorithmPushDto { ...@@ -34,6 +34,9 @@ public class AlgorithmPushDto {
//算法类别 不需要算法传 分类后赋值 //算法类别 不需要算法传 分类后赋值
private String algorithmType; private String algorithmType;
//备注 放算法计算后所需信息
private String remark;
} }
...@@ -15,4 +15,7 @@ public class AlgorithmPushResultBoxDto { ...@@ -15,4 +15,7 @@ public class AlgorithmPushResultBoxDto {
private int y2; private int y2;
//预警标签 //预警标签
private String label; private String label;
//备注 放算法所需信息
private String remark;
} }
...@@ -36,4 +36,7 @@ public class AlgorithmWarnPushDto { ...@@ -36,4 +36,7 @@ public class AlgorithmWarnPushDto {
/** 删除标记 */ /** 删除标记 */
private Integer delFlag; private Integer delFlag;
/** 备注 */
private String remark;
} }
...@@ -56,7 +56,7 @@ public class AlarmSecondFilterFunction implements AlgorithmBaseFilterFunction { ...@@ -56,7 +56,7 @@ public class AlarmSecondFilterFunction implements AlgorithmBaseFilterFunction {
} }
//判断是否超过报警间隔 //判断是否超过报警间隔
if (alarmTimeStamp + alarmSecond * 1000 <= algorithmPushDto.getTimeStamp()) { if (alarmTimeStamp + alarmSecond <= algorithmPushDto.getTimeStamp()) {
cache.put(algorithmType, algorithmPushDto.getTimeStamp()); cache.put(algorithmType, algorithmPushDto.getTimeStamp());
return true; return true;
} }
......
package com.censoft.flink.transform; package com.censoft.flink.transform;
import com.censoft.flink.domain.AlgorithmPushDto; import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmPushResultBoxDto;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
...@@ -9,6 +10,7 @@ import org.apache.log4j.Logger; ...@@ -9,6 +10,7 @@ import org.apache.log4j.Logger;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* @author 周昊 * @author 周昊
...@@ -23,8 +25,12 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu ...@@ -23,8 +25,12 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
@Override @Override
public void flatMap(AlgorithmPushDto algorithmPushDto, Collector<AlgorithmPushDto> collector) { public void flatMap(AlgorithmPushDto algorithmPushDto, Collector<AlgorithmPushDto> collector) {
//1、筛选所配置的预警label的Key值
//2、将相同label的Key值分出多个预警
List<AlgorithmPushDto> collect = algorithmPushDto.getResult() List<AlgorithmPushResultBoxDto> boxDtos = algorithmPushDto.getResult();
List<String> types = boxDtos
.stream() .stream()
.filter(result -> { .filter(result -> {
String label = result.getLabel(); String label = result.getLabel();
...@@ -32,23 +38,31 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu ...@@ -32,23 +38,31 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
return algorithmTypes.contains(type); return algorithmTypes.contains(type);
}) })
.map(result -> { .map(result -> {
//复制
AlgorithmPushDto pushDto = new AlgorithmPushDto();
pushDto.setAlgorithmName(algorithmPushDto.getAlgorithmName());
pushDto.setCameraName(algorithmPushDto.getCameraName());
pushDto.setCameraUrl(algorithmPushDto.getCameraUrl());
pushDto.setPictureAddress(algorithmPushDto.getPictureAddress());
pushDto.setSort(algorithmPushDto.getSort());
pushDto.setTimeStamp(algorithmPushDto.getTimeStamp());
String label = result.getLabel(); String label = result.getLabel();
String type = label.substring(0, label.indexOf("_")); return label.substring(0, label.indexOf("_"));
pushDto.setAlgorithmType(type); })
pushDto.setResult(Collections.singletonList(result)); .distinct()
return pushDto; .collect(Collectors.toList());
}).collect(Collectors.toList());
for (String type : types) {
AlgorithmPushDto pushDto = new AlgorithmPushDto();
pushDto.setAlgorithmName(algorithmPushDto.getAlgorithmName());
pushDto.setCameraName(algorithmPushDto.getCameraName());
pushDto.setCameraUrl(algorithmPushDto.getCameraUrl());
pushDto.setPictureAddress(algorithmPushDto.getPictureAddress());
pushDto.setSort(algorithmPushDto.getSort());
pushDto.setTimeStamp(algorithmPushDto.getTimeStamp());
pushDto.setAlgorithmType(type);
List<AlgorithmPushResultBoxDto> result = boxDtos.stream().filter(boxDto -> {
String label = boxDto.getLabel();
return label.substring(0, label.indexOf("_")).equals(type);
}).collect(Collectors.toList());
pushDto.setResult(result);
for (AlgorithmPushDto pushDto : collect) {
collector.collect(pushDto); collector.collect(pushDto);
} }
......
...@@ -69,8 +69,8 @@ public class FrameFilterFunction implements AlgorithmBaseFilterFunction { ...@@ -69,8 +69,8 @@ public class FrameFilterFunction implements AlgorithmBaseFilterFunction {
//获取对应帧数匹配规则 //获取对应帧数匹配规则
//3.判断是否符合对应帧数限制 //3.判断是否符合对应帧数限制
//如果为连续 redis计数器加一 不连续制空 //如果为连续 redis计数器加一 不连续制空
if (frameNumberDto.getTimeStamp() + 1000 / frameSecond >= algorithmPushDto.getTimeStamp()) { if (frameNumberDto.getTimeStamp() * 1000 + 1000 / frameSecond >= algorithmPushDto.getTimeStamp() * 1000) {
frameNumberDto.setTimeStamp(algorithmPushDto.getTimeStamp()); frameNumberDto.setTimeStamp(algorithmPushDto.getTimeStamp()* 1000);
frameNumberDto.setFrameNumber(frameNumberDto.getFrameNumber() + 1); frameNumberDto.setFrameNumber(frameNumberDto.getFrameNumber() + 1);
cache.put(algorithmType, frameNumberDto); cache.put(algorithmType, frameNumberDto);
...@@ -79,7 +79,7 @@ public class FrameFilterFunction implements AlgorithmBaseFilterFunction { ...@@ -79,7 +79,7 @@ public class FrameFilterFunction implements AlgorithmBaseFilterFunction {
return false; return false;
} }
} else { } else {
frameNumberDto.setTimeStamp(algorithmPushDto.getTimeStamp()); frameNumberDto.setTimeStamp(algorithmPushDto.getTimeStamp() *1000);
frameNumberDto.setFrameNumber(0); frameNumberDto.setFrameNumber(0);
cache.put(algorithmType, frameNumberDto); cache.put(algorithmType, frameNumberDto);
return false; return false;
......
package com.censoft.flink.transform;
import cn.hutool.core.util.NumberUtil;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import com.censoft.flink.domain.AlgorithmScenePieceVariablePo;
import com.censoft.flink.exception.ParameterTransformException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 周昊
* @desc 人员脱岗超员算法筛选
*/
public class PersonNumberFilterFunction implements AlgorithmBaseFilterFunction {
//personNumberMax 最多人员数量
private final Integer personNumberMax;
//personNumberMin 最少人员数量
private final Integer personNumberMin;
//最多人员数量 参数KEY
private static String personNumberMaxKey = "person_number_max";
//最多人员数量 参数KEY
private static String personNumberMinKey = "person_number_min";
public PersonNumberFilterFunction(Integer personNumberMax, Integer personNumberMin) {
this.personNumberMax = personNumberMax;
this.personNumberMin = personNumberMin;
}
public static PersonNumberFilterFunction getFilterFunction(AlgorithmSceneBasePo algorithmSceneBasePo, AlgorithmScenePiecePo algorithmScenePiecePo) {
//参数准备
Optional<AlgorithmScenePieceVariablePo> maxVariablePo = algorithmScenePiecePo.getVariablePos()
.stream()
.filter(po -> personNumberMaxKey.equals(po.getVariableKey()))
.findFirst();
//参数准备
Optional<AlgorithmScenePieceVariablePo> minVariablePo = algorithmScenePiecePo.getVariablePos()
.stream()
.filter(po -> personNumberMinKey.equals(po.getVariableKey()))
.findFirst();
//判断参数是否存在,如果不存在抛出异常
if (!maxVariablePo.isPresent() || !minVariablePo.isPresent()) {
throw new ParameterTransformException();
}
Integer personNumberMax = Integer.valueOf(maxVariablePo.get().getVariableValue());
Integer personNumberMin = Integer.valueOf(minVariablePo.get().getVariableValue());
return new PersonNumberFilterFunction(personNumberMax, personNumberMin);
}
@Override
public boolean filter(AlgorithmPushDto algorithmPushDto) {
//判断是否为0/1的情况
int personNumber;
if (algorithmPushDto.getResult().size() == 1 ){
String remark = algorithmPushDto.getResult().get(0).getRemark();
if(NumberUtil.isInteger(remark)){
personNumber = Integer.valueOf(remark);
}else {
personNumber = 0;
}
}else {
personNumber = algorithmPushDto.getResult().size();
}
if (personNumber > personNumberMax) {
algorithmPushDto.setRemark("人员超岗,数量:" + personNumber);
return true;
}
if (personNumber < personNumberMin) {
algorithmPushDto.setRemark("人员脱岗,数量:" + personNumber);
return true;
}
return false;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment