Commit bd02b255 authored by 周昊's avatar 周昊

1、修改lable中算法类型筛选逻辑

2、开发超岗脱岗算法开发
3、传输kafka数据添加字段
parent 1cc3abec
......@@ -34,6 +34,9 @@ public class AlgorithmPushDto {
//算法类别 不需要算法传 分类后赋值
private String algorithmType;
//备注 放算法计算后所需信息
private String remark;
}
......@@ -15,4 +15,7 @@ public class AlgorithmPushResultBoxDto {
private int y2;
//预警标签
private String label;
//备注 放算法所需信息
private String remark;
}
......@@ -36,4 +36,7 @@ public class AlgorithmWarnPushDto {
/** 删除标记 */
private Integer delFlag;
/** 备注 */
private String remark;
}
package com.censoft.flink.transform;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmPushResultBoxDto;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.LogManager;
......@@ -9,6 +10,7 @@ import org.apache.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author 周昊
......@@ -23,8 +25,12 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
@Override
public void flatMap(AlgorithmPushDto algorithmPushDto, Collector<AlgorithmPushDto> collector) {
//1、筛选所配置的预警label的Key值
//2、将相同label的Key值分出多个预警
List<AlgorithmPushDto> collect = algorithmPushDto.getResult()
List<AlgorithmPushResultBoxDto> boxDtos = algorithmPushDto.getResult();
List<String> types = boxDtos
.stream()
.filter(result -> {
String label = result.getLabel();
......@@ -32,23 +38,31 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
return algorithmTypes.contains(type);
})
.map(result -> {
//复制
AlgorithmPushDto pushDto = new AlgorithmPushDto();
pushDto.setAlgorithmName(algorithmPushDto.getAlgorithmName());
pushDto.setCameraName(algorithmPushDto.getCameraName());
pushDto.setCameraUrl(algorithmPushDto.getCameraUrl());
pushDto.setPictureAddress(algorithmPushDto.getPictureAddress());
pushDto.setSort(algorithmPushDto.getSort());
pushDto.setTimeStamp(algorithmPushDto.getTimeStamp());
String label = result.getLabel();
String type = label.substring(0, label.indexOf("_"));
pushDto.setAlgorithmType(type);
pushDto.setResult(Collections.singletonList(result));
return pushDto;
}).collect(Collectors.toList());
return label.substring(0, label.indexOf("_"));
})
.distinct()
.collect(Collectors.toList());
for (String type : types) {
AlgorithmPushDto pushDto = new AlgorithmPushDto();
pushDto.setAlgorithmName(algorithmPushDto.getAlgorithmName());
pushDto.setCameraName(algorithmPushDto.getCameraName());
pushDto.setCameraUrl(algorithmPushDto.getCameraUrl());
pushDto.setPictureAddress(algorithmPushDto.getPictureAddress());
pushDto.setSort(algorithmPushDto.getSort());
pushDto.setTimeStamp(algorithmPushDto.getTimeStamp());
pushDto.setAlgorithmType(type);
List<AlgorithmPushResultBoxDto> result = boxDtos.stream().filter(boxDto -> {
String label = boxDto.getLabel();
return label.substring(0, label.indexOf("_")).equals(type);
}).collect(Collectors.toList());
pushDto.setResult(result);
for (AlgorithmPushDto pushDto : collect) {
collector.collect(pushDto);
}
......
package com.censoft.flink.transform;
import cn.hutool.core.util.NumberUtil;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import com.censoft.flink.domain.AlgorithmScenePieceVariablePo;
import com.censoft.flink.exception.ParameterTransformException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 周昊
* @desc 人员脱岗超员算法筛选
*/
public class PersonNumberFilterFunction implements AlgorithmBaseFilterFunction {
//personNumberMax 最多人员数量
private final Integer personNumberMax;
//personNumberMin 最少人员数量
private final Integer personNumberMin;
//最多人员数量 参数KEY
private static String personNumberMaxKey = "person_number_max";
//最多人员数量 参数KEY
private static String personNumberMinKey = "person_number_min";
public PersonNumberFilterFunction(Integer personNumberMax, Integer personNumberMin) {
this.personNumberMax = personNumberMax;
this.personNumberMin = personNumberMin;
}
public static PersonNumberFilterFunction getFilterFunction(AlgorithmSceneBasePo algorithmSceneBasePo, AlgorithmScenePiecePo algorithmScenePiecePo) {
//参数准备
Optional<AlgorithmScenePieceVariablePo> maxVariablePo = algorithmScenePiecePo.getVariablePos()
.stream()
.filter(po -> personNumberMaxKey.equals(po.getVariableKey()))
.findFirst();
//参数准备
Optional<AlgorithmScenePieceVariablePo> minVariablePo = algorithmScenePiecePo.getVariablePos()
.stream()
.filter(po -> personNumberMinKey.equals(po.getVariableKey()))
.findFirst();
//判断参数是否存在,如果不存在抛出异常
if (!maxVariablePo.isPresent() || !minVariablePo.isPresent()) {
throw new ParameterTransformException();
}
Integer personNumberMax = Integer.valueOf(maxVariablePo.get().getVariableValue());
Integer personNumberMin = Integer.valueOf(minVariablePo.get().getVariableValue());
return new PersonNumberFilterFunction(personNumberMax, personNumberMin);
}
@Override
public boolean filter(AlgorithmPushDto algorithmPushDto) {
//判断是否为0/1的情况
int personNumber;
if (algorithmPushDto.getResult().size() == 1 ){
String remark = algorithmPushDto.getResult().get(0).getRemark();
if(NumberUtil.isInteger(remark)){
personNumber = Integer.valueOf(remark);
}else {
personNumber = 0;
}
}else {
personNumber = algorithmPushDto.getResult().size();
}
if (personNumber > personNumberMax) {
algorithmPushDto.setRemark("人员超岗,数量:" + personNumber);
return true;
}
if (personNumber < personNumberMin) {
algorithmPushDto.setRemark("人员脱岗,数量:" + personNumber);
return true;
}
return false;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment