Commit e294f93b authored by 周昊's avatar 周昊

Initial commit

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>my-flink-project</groupId>
<artifactId>my-flink-project</artifactId>
<name>Flink Quickstart Job</name>
<version>0.1</version>
<url>http://www.myorganization.org</url>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore />
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore />
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet />
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer>
<mainClass>com.censoft.flink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<repositories>
<repository>
<releases>
<enabled>false</enabled>
</releases>
<snapshots />
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.9.3</flink.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>${java.version}</maven.compiler.target>
<fastjson.version>2.0.25</fastjson.version>
<mqtt.version>1.2.5</mqtt.version>
</properties>
</project>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>my-flink-project</groupId>
<artifactId>my-flink-project</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>http://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<java.version>1.8</java.version>
<flink.version>1.9.3</flink.version>
<mqtt.version>1.2.5</mqtt.version>
<fastjson.version>2.0.25</fastjson.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mqtt.version}</version>
</dependency>
<!-- 阿里JSON解析器 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- lombok工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
<!-- 小而全的Java类库 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.0.7</version>
</dependency>
<!-- 引入MyBatis的jar包的依赖-->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.6</version>
</dependency>
<!-- MySQL驱动jar包的依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<!-- 所在的目录-->
<directory>src/main/java</directory>
<includes><!-- 这个目录下的所有.xml,.properties文件都会被扫描加载到-->
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<!-- 所在的目录-->
<directory>src/main/resources</directory>
<includes><!-- 这个目录下的所有.xml,.properties文件都会被扫描加载到-->
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<!--<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>-->
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.censoft.flink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Manifest-Version: 1.0
Main-Class: com.censoft.flink.StreamingJob
package com.censoft.flink;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import com.censoft.flink.mapper.SqlFactory;
import com.censoft.flink.mqtt.MqttConsumer;
import com.censoft.flink.transform.AlgorithmBaseFilterFunction;
import com.censoft.flink.transform.AlgorithmTypeFlatMapFunction;
import com.censoft.flink.transform.ResultExistFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
public class StreamingJob {
public static void main(String[] args) throws Exception {
//1.获取对应场景id
ParameterTool parameterTool = ParameterTool.fromArgs(args);
Long sceneId = parameterTool.getLong("sceneId");
//2.获取对应算法参数
//2.1 获取场景数据
SqlFactory sqlFactory = new SqlFactory();
AlgorithmSceneBasePo algorithmSceneBasePo = sqlFactory.getAlgorithmSceneBasePo(sceneId);
//2.2 获取场景算法块数据
List<AlgorithmScenePiecePo> algorithmScenePieceList = sqlFactory.getAlgorithmScenePieceList(sceneId);
//1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2、接收消息并将数据转化为流
//todo 根据场景id 拼接对应的mqtt频道名称
DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/test/" + sceneId));
//3、进行处理
//3、1 默认 是否存在结果集判断
SingleOutputStreamOperator<AlgorithmPushDto> streamOperator = mqttStream
.filter(new ResultExistFilterFunction());
//3、2 默认 根据分类,分解多个推送信息
List<String> algorithmTypes = Arrays.asList(algorithmSceneBasePo.getAlarmTypes().split(","));
streamOperator = streamOperator.flatMap(new AlgorithmTypeFlatMapFunction(algorithmTypes));
//3、3 根据配置算法流程 反射对应对象
for (AlgorithmScenePiecePo algorithmScenePiecePo : algorithmScenePieceList) {
//通过构造函数 获取算法块对象
Class<?> filterClazz = Class.forName(algorithmScenePiecePo.getClazz());
Method method = filterClazz.getMethod("getFilterFunction", AlgorithmSceneBasePo.class, AlgorithmScenePiecePo.class);
AlgorithmBaseFilterFunction filterFunction = (AlgorithmBaseFilterFunction) method.invoke(null, algorithmSceneBasePo, algorithmScenePiecePo);
streamOperator = streamOperator.filter(filterFunction);
}
//3、4 Object -> String
SingleOutputStreamOperator<String> outputStreamOperator = streamOperator.map(algorithmPushDto -> algorithmPushDto.toString());
//3、5 输出kafka
outputStreamOperator.addSink(new FlinkKafkaProducer("192.168.10.137:9092", "test-topic", new SimpleStringSchema()));
//3、6 打印输出
outputStreamOperator.print();
outputStreamOperator.writeAsText("D:/word1.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
//3、7 自动执行
env.execute();
}
}
package com.censoft.flink.domain;
import lombok.Data;
/**
* 帧算法缓存对象
*/
@Data
public class AlgorithmFrameNumberDto {
//上次计数时间戳
private Long timeStamp;
//累计计数数量
private Integer frameNumber;
}
package com.censoft.flink.domain;
import lombok.Data;
import java.util.List;
/**
* 算法计算结果标准接收对象
*/
@Data
public class AlgorithmPushDto {
//摄像头标识符
private Integer sort;
//场景id
private String sceneId;
//图片地址
private String pictureAddress;
//时间戳
private Long timeStamp;
//预警返回值
private List<AlgorithmPushResultBoxDto> result;
//算法类别 不需要算法传 分类后赋值
private String algorithmType;
}
package com.censoft.flink.domain;
import lombok.Data;
/**
* @author 周昊
* @desc ...
* @date 2023-04-11 15:38:31
*/
@Data
public class AlgorithmPushResultBoxDto {
private int x1;
private int x2;
private int y1;
private int y2;
//预警标签
private String label;
}
package com.censoft.flink.domain;
import lombok.Data;
import java.io.Serializable;
/**
* 算法计算结果标准接收对象
*/
@Data
public class AlgorithmSceneBasePo implements Serializable {
private static final long serialVersionUID = 1L;
//主键id
private Long id;
//场景名
private String sceneName;
//算法id
private Long algorithmId;
//处理的预警类型(预警分类算法参数)
private String alarmTypes;
//每秒帧数(ai算法启动参数)
private Integer frameSecond;
//摄像头id列表
private String cameraIdList;
}
package com.censoft.flink.domain;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* 算法计算结果标准接收对象
*/
@Data
public class AlgorithmScenePiecePo implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 场景主键id
*/
private Long sceneId;
/**
* 算法块主键id
*/
private Long pieceId;
/**
* 算法顺序
*/
private Integer sort;
/**
* 算法块参数对象
*/
private List<AlgorithmScenePieceVariablePo> variablePos;
/**
* 算法块对应Class
*/
private String clazz;
}
package com.censoft.flink.domain;
import lombok.Data;
import java.io.Serializable;
/**
* 算法计算结果标准接收对象
*/
@Data
public class AlgorithmScenePieceVariablePo implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 算法块参数唯一标识
*/
private String variableKey;
/**
* 算法块参数值
*/
private String variableValue;
}
package com.censoft.flink.exception;
import org.apache.commons.lang3.StringUtils;
/**
* 基础异常
*
* @author ruoyi
*/
public class BaseException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* 所属模块
*/
private String module;
/**
* 错误码
*/
private String code;
/**
* 错误码对应的参数
*/
private Object[] args;
/**
* 错误消息
*/
private String defaultMessage;
public BaseException(String module, String code, Object[] args, String defaultMessage) {
this.module = module;
this.code = code;
this.args = args;
this.defaultMessage = defaultMessage;
}
public BaseException(String module, String code, Object[] args) {
this(module, code, args, null);
}
public BaseException(String module, String defaultMessage) {
this(module, null, null, defaultMessage);
}
public BaseException(String code, Object[] args) {
this(null, code, args, null);
}
public BaseException(String defaultMessage) {
this(null, null, null, defaultMessage);
}
@Override
public String getMessage() {
String message = null;
if (!StringUtils.isEmpty(code)) {
message = "错误码:" + code + " 参数:" + args;
}
if (message == null) {
message = defaultMessage;
}
return message;
}
public String getModule() {
return module;
}
public String getCode() {
return code;
}
public Object[] getArgs() {
return args;
}
public String getDefaultMessage() {
return defaultMessage;
}
}
package com.censoft.flink.exception;
/**
* 算法块入参异常类
*
* @author ruoyi
*/
public class ParameterTransformException extends TransformException
{
private static final long serialVersionUID = 1L;
public ParameterTransformException()
{
super("transform.parameter", null);
}
}
package com.censoft.flink.exception;
/**
* 算法块异常类
*
* @author ruoyi
*/
public class TransformException extends BaseException
{
private static final long serialVersionUID = 1L;
public TransformException(String code, Object[] args)
{
super("transform", code, args, null);
}
}
package com.censoft.flink.mapper;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface AlgorithmSceneDao {
public AlgorithmSceneBasePo getAlgorithmSceneBasePo(@Param("sceneId") Long sceneId);
public List<AlgorithmScenePiecePo> getAlgorithmScenePieceList(@Param("sceneId") Long sceneId);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.censoft.flink.mapper.AlgorithmSceneDao">
<resultMap type="com.censoft.flink.domain.AlgorithmScenePiecePo" id="AlgorithmScenePiecePoResult">
<result property="sceneId" column="scene_id"/>
<result property="pieceId" column="piece_id"/>
<result property="sort" column="sort"/>
<result property="clazz" column="clazz"/>
<collection property="variablePos" ofType="com.censoft.flink.domain.AlgorithmScenePieceVariablePo">
<result property="variableKey" column="variable_key"/>
<result property="variableValue" column="variable_value"/>
</collection>
</resultMap>
<select id="getAlgorithmSceneBasePo" resultType="com.censoft.flink.domain.AlgorithmSceneBasePo">
SELECT scb.id AS id,
scb.scene_name AS sceneName,
scb.algorithm_id AS algorithmId,
scb.alarm_types AS alarmTypes,
scb.camera_id_list AS cameraIdList,
scb.frame_second AS frameSecond
FROM algorithm_scene_base scb
WHERE scb.id = #{sceneId}
</select>
<select id="getAlgorithmScenePieceList"
resultMap="AlgorithmScenePiecePoResult">
SELECT asp.scene_id,
asp.piece_id,
asp.sort,
asp.variable_key,
asp.variable_value,
apb.clazz
FROM algorithm_scene_piece asp
LEFT JOIN algorithm_piece_base apb ON apb.id = asp.piece_id
WHERE asp.scene_id = #{sceneId}
ORDER BY asp.sort ASC
</select>
</mapper>
\ No newline at end of file
package com.censoft.flink.mapper;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/**
* @author 周昊
* @desc ...
* @date 2023-04-18 13:50:32
*/
public class SqlFactory {
public AlgorithmSceneBasePo getAlgorithmSceneBasePo(Long sceneId) throws IOException {
SqlSession session = initSqlSession();
//指定要执行的SQL语句的id
//SQL语句的id=namespace+"."+SQL语句所在标签的id属性的值
String SQLID = "com.censoft.flink.mapper.AlgorithmSceneDao" + "." + "getAlgorithmSceneBasePo";
//通过SqlSession对象的方法执行SQL语句
AlgorithmSceneBasePo algorithmSceneBasePo = session.selectOne(SQLID,sceneId);
//最后我们关闭SqlSession对象
session.close();
return algorithmSceneBasePo;
}
public List<AlgorithmScenePiecePo> getAlgorithmScenePieceList(Long sceneId) throws IOException {
SqlSession session = initSqlSession();
//指定要执行的SQL语句的id
//SQL语句的id=namespace+"."+SQL语句所在标签的id属性的值
String SQLID = "com.censoft.flink.mapper.AlgorithmSceneDao" + "." + "getAlgorithmScenePieceList";
//通过SqlSession对象的方法执行SQL语句
List<AlgorithmScenePiecePo> algorithmScenePiecePoList = session.selectList(SQLID,sceneId);
//最后我们关闭SqlSession对象
session.close();
return algorithmScenePiecePoList;
}
private SqlSession initSqlSession() throws IOException {
/*
myBatis的核心类是SqlSessionFactory
就是需要我们加载一下主配置文件,有点像我们使用SpringMVC加载spring.xml配置文件或者BeanFactory对象一样。
这些在官网是有的。
*/
//定义主配置文件的目录,从类路径开始:com/**/**这样的,如果是在resources目录下,那就是 **/**/
String config = "mybatis.xml";
//下面读取主配置文件,使用MyBatis框架中的Resources类
InputStream inputStream = Resources.getResourceAsStream(config);
//下面创建SqlSessionFactory对象,使用的是SqlSessionFactoryBuilder类,需要用到上面的inputStream参数
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
//MyBatis最终执行SQL语句,使用的是SqlSession类中的方法。SqlSession实例的获得,依靠的是SqlSessionFactory类。
//下面获取SqlSession对象,SqlSession是接口
return sqlSessionFactory.openSession();
}
}
package com.censoft.flink.mqtt;
import java.io.Serializable;
//该类需要实现序列化所以必须实现Serializable接口
public class MqttConfig implements Serializable {
public MqttConfig(String username, String password, String hostUrl, String clientId, String msgTopic) {
this.username = username;
this.password = password;
this.hostUrl = hostUrl;
this.clientId = clientId;
this.msgTopic = msgTopic;
}
//连接名称
private String username;
//连接密码
private String password;
//ip地址以及端口号
private String hostUrl;
//服务器ID注意不能与其他连接重复,否则会连接失败
private String clientId;
//订阅的主题
private String msgTopic;
//获得用户名
public String getUsername() {
return username;
}
//获得密码
public String getPassword() {
return password;
}
//获得客户端id
public String getClientId() {
return clientId;
}
//获得服务端url
public String getHostUrl() {
return hostUrl;
}
//获得订阅
public String[] getMsgTopic() {
String[] topic = msgTopic.split(",");
return topic;
}
}
\ No newline at end of file
package com.censoft.flink.mqtt;
import com.alibaba.fastjson2.JSON;
import com.censoft.flink.domain.AlgorithmPushDto;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* MQTT客户端订阅消息类
*
* @author zhongyulin
*/
public class MqttConsumer extends RichParallelSourceFunction<AlgorithmPushDto> {
//存储服务
private static MqttClient client;
//存储订阅主题
private static MqttTopic mqttTopic;
//阻塞队列存储订阅的消息
private BlockingQueue<AlgorithmPushDto> queue = new ArrayBlockingQueue(10);
//mqtt对应频道
private String msgTopic;
//包装连接的方法
private void connect() throws MqttException {
//配置连接参数
MqttConfig mqttConfigBean = new MqttConfig("", "", "tcp://192.168.4.221:1883", "DC" + (int) (Math.random() * 100000000), msgTopic);
//连接mqtt服务器
client = new MqttClient(mqttConfigBean.getHostUrl(), mqttConfigBean.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(mqttConfigBean.getUsername());
options.setPassword(mqttConfigBean.getPassword().toCharArray());
options.setCleanSession(false); //是否清除session
// 设置超时时间
options.setConnectionTimeout(30);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try {
String[] msgtopic = mqttConfigBean.getMsgTopic();
//订阅消息
int[] qos = new int[msgtopic.length];
for (int i = 0; i < msgtopic.length; i++) {
qos[i] = 0;
}
client.setCallback(new MsgCallback(client, options, msgtopic, qos) {
});
client.connect(options);
client.subscribe(msgtopic, qos);
System.out.println("MQTT连接成功:" + mqttConfigBean.getClientId() + ":" + client);
} catch (Exception e) {
System.out.println("MQTT连接异常:" + e);
}
}
//实现MqttCallback,内部函数可回调
class MsgCallback implements MqttCallback {
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public MsgCallback() {
}
public MsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
//连接失败回调该函数
@Override
public void connectionLost(Throwable throwable) {
System.out.println("MQTT连接断开,发起重连");
while (true) {
try {
Thread.sleep(1000);
client.connect(options);
//订阅消息
client.subscribe(topic, qos);
System.out.println("MQTT重新连接成功:" + client);
break;
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
//收到消息回调该函数
@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
//订阅消息字符
String msg = new String(message.getPayload());
byte[] bymsg = getBytesFromObject(msg);
AlgorithmPushDto algorithmPushDto = JSON.parseObject(msg, AlgorithmPushDto.class);
if (algorithmPushDto != null) {
queue.put(algorithmPushDto);
}
}
//对象转化为字节码
public byte[] getBytesFromObject(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
//flink线程启动函数
@Override
public void run(final SourceContext<AlgorithmPushDto> ctx) throws Exception {
connect();
//利用死循环使得程序一直监控主题是否有新消息
while (true) {
//使用阻塞队列的好处是队列空的时候程序会一直阻塞到这里不会浪费CPU资源
ctx.collect(queue.take());
}
}
@Override
public void cancel() {
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
System.out.println("topic:" + topic);
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public MqttClient getClient() {
return client;
}
public void setClient(MqttClient client) {
this.client = client;
}
public MqttTopic getMqttTopic() {
return mqttTopic;
}
public void setMqttTopic(MqttTopic mqttTopic) {
this.mqttTopic = mqttTopic;
}
public MqttConsumer() {
}
public MqttConsumer(String msgTopic) {
this.msgTopic = msgTopic;
}
}
\ No newline at end of file
package com.censoft.flink.sink;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
public ProducerStringSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
}
}
package com.censoft.flink.transform;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import com.censoft.flink.domain.AlgorithmScenePieceVariablePo;
import com.censoft.flink.exception.ParameterTransformException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 周昊
* @desc 报警间隔计算筛选
*/
public class AlarmSecondFilterFunction implements AlgorithmBaseFilterFunction {
//alarmSecond 对应报警间隔 毫秒
private final Long alarmSecond;
//用来存储报警间隔算法缓存对象
private final ConcurrentHashMap<String, Long> cache = new ConcurrentHashMap();
//对应报警间隔 参数KEY
private static String alarmSecondKey = "alarm_second";
public AlarmSecondFilterFunction(Long alarmSecond) {
this.alarmSecond = alarmSecond;
}
public static AlarmSecondFilterFunction getFilterFunction(AlgorithmSceneBasePo algorithmSceneBasePo, AlgorithmScenePiecePo algorithmScenePiecePo) {
//参数准备
Optional<AlgorithmScenePieceVariablePo> variablePo = algorithmScenePiecePo.getVariablePos()
.stream()
.filter(po -> alarmSecondKey.equals(po.getVariableKey()))
.findFirst();
//判断参数是否存在,如果不存在抛出异常
if (!variablePo.isPresent()) {
throw new ParameterTransformException();
}
Long alarmSecond = Long.valueOf(variablePo.get().getVariableValue());
return new AlarmSecondFilterFunction(alarmSecond);
}
@Override
public boolean filter(AlgorithmPushDto algorithmPushDto) {
String algorithmType = algorithmPushDto.getAlgorithmType() + "_" + algorithmPushDto.getSort();
Long alarmTimeStamp = cache.get(algorithmType);
if (alarmTimeStamp == null) {
alarmTimeStamp = 0L;
cache.put(algorithmType, alarmTimeStamp);
}
//判断是否超过报警间隔
if (alarmTimeStamp + alarmSecond * 1000 <= algorithmPushDto.getTimeStamp()) {
cache.put(algorithmType, algorithmPushDto.getTimeStamp());
return true;
}
return false;
}
}
package com.censoft.flink.transform;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.domain.AlgorithmSceneBasePo;
import com.censoft.flink.domain.AlgorithmScenePiecePo;
import org.apache.flink.api.common.functions.FilterFunction;
/**
* 编写算法块代码步骤
* 1.继承AlgorithmBaseFilterFunction接口
* 2.加入AlgorithmSceneBasePo、AlgorithmScenePiecePo 默认参数
* 3.编写getFrameFilterFunction静态方法并初始化算法块所需参数
* 4.重写filter过滤方法
*/
public interface AlgorithmBaseFilterFunction extends FilterFunction<AlgorithmPushDto> {
//获取对应算法块过滤器实例
public static AlgorithmBaseFilterFunction getFilterFunction(AlgorithmSceneBasePo algorithmSceneBasePo, AlgorithmScenePiecePo algorithmScenePiecePo) {
return null;
}
}
package com.censoft.flink.transform;
import com.censoft.flink.domain.AlgorithmPushDto;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author 周昊
* @desc ...
*/
public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPushDto, AlgorithmPushDto> {
private List<String> algorithmTypes;
private static Logger logger = LogManager.getLogger(AlgorithmTypeFlatMapFunction.class.getName());
@Override
public void flatMap(AlgorithmPushDto algorithmPushDto, Collector<AlgorithmPushDto> collector) {
List<AlgorithmPushDto> collect = algorithmPushDto.getResult()
.stream()
.filter(result -> {
String label = result.getLabel();
String type = label.substring(0, label.indexOf("_"));
return algorithmTypes.contains(type);
})
.map(result -> {
//复制
AlgorithmPushDto pushDto = new AlgorithmPushDto();
pushDto.setSort(algorithmPushDto.getSort());
pushDto.setSceneId(algorithmPushDto.getSceneId());
pushDto.setPictureAddress(algorithmPushDto.getPictureAddress());
pushDto.setTimeStamp(algorithmPushDto.getTimeStamp());
String label = result.getLabel();
String type = label.substring(0, label.indexOf("_"));
pushDto.setAlgorithmType(type);
pushDto.setResult(Collections.singletonList(result));
return pushDto;
}).collect(Collectors.toList());
for (AlgorithmPushDto pushDto : collect) {
logger.error("分流:" + pushDto);
collector.collect(pushDto);
}
}
public AlgorithmTypeFlatMapFunction(List<String> algorithmTypes) {
this.algorithmTypes = algorithmTypes;
}
public AlgorithmTypeFlatMapFunction() {
}
}
package com.censoft.flink.transform;
import com.censoft.flink.domain.*;
import com.censoft.flink.exception.ParameterTransformException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 周昊
* @desc 帧计算筛选
*/
public class FrameFilterFunction implements AlgorithmBaseFilterFunction {
//frameSecondKey 每秒几帧
private final Integer frameSecond;
//frameNumberKey 连续多少帧算报警
private final Long frameNumber;
//用来存储帧算法缓存对象
private final ConcurrentHashMap<String, AlgorithmFrameNumberDto> cache = new ConcurrentHashMap();
//连续多少帧算报警 参数KEY
private static String frameNumberKey = "frame";
public FrameFilterFunction(Integer frameSecond, Long frameNumber) {
this.frameSecond = frameSecond;
this.frameNumber = frameNumber;
}
public static AlgorithmBaseFilterFunction getFilterFunction(AlgorithmSceneBasePo algorithmSceneBasePo, AlgorithmScenePiecePo algorithmScenePiecePo) {
//参数准备
Optional<AlgorithmScenePieceVariablePo> variablePo = algorithmScenePiecePo.getVariablePos()
.stream()
.filter(po -> frameNumberKey.equals(po.getVariableKey()))
.findFirst();
//判断参数是否存在,如果不存在抛出异常
if (algorithmSceneBasePo.getFrameSecond() == null || !variablePo.isPresent()) {
throw new ParameterTransformException();
}
Integer frameSecond = algorithmSceneBasePo.getFrameSecond();
Long frameNumber = Long.valueOf(variablePo.get().getVariableValue());
return new FrameFilterFunction(frameSecond, frameNumber);
}
@Override
public boolean filter(AlgorithmPushDto algorithmPushDto) {
String algorithmType = algorithmPushDto.getAlgorithmType() + "_" + algorithmPushDto.getSort();
AlgorithmFrameNumberDto frameNumberDto = cache.get(algorithmType);
if (frameNumberDto == null) {
frameNumberDto = new AlgorithmFrameNumberDto();
frameNumberDto.setFrameNumber(0);
frameNumberDto.setTimeStamp(0L);
cache.put(algorithmType, frameNumberDto);
}
//获取对应帧数匹配规则
//3.判断是否符合对应帧数限制
//如果为连续 redis计数器加一 不连续制空
if (frameNumberDto.getTimeStamp() + 1000 / frameSecond >= algorithmPushDto.getTimeStamp()) {
frameNumberDto.setTimeStamp(algorithmPushDto.getTimeStamp());
frameNumberDto.setFrameNumber(frameNumberDto.getFrameNumber() + 1);
cache.put(algorithmType, frameNumberDto);
//连续且超过连续报警帧数 下一步 ,反之退出
if (frameNumberDto.getFrameNumber() < frameNumber) {
return false;
}
} else {
frameNumberDto.setTimeStamp(algorithmPushDto.getTimeStamp());
frameNumberDto.setFrameNumber(0);
cache.put(algorithmType, frameNumberDto);
return false;
}
return true;
}
}
package com.censoft.flink.transform;
import com.censoft.flink.domain.AlgorithmPushDto;
/**
* @author 周昊
* @desc 有无结果筛选
*/
public class ResultExistFilterFunction implements AlgorithmBaseFilterFunction {
@Override
public boolean filter(AlgorithmPushDto algorithmPushDto) {
return algorithmPushDto.getResult() != null && !algorithmPushDto.getResult().isEmpty();
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<environments default="development">
<environment id="development">
<transactionManager type="JDBC"></transactionManager>
<!-- 下面是配置一下数据源,其实后面我们就不这样配置了,都在.yml文件中配置-->
<dataSource type="POOLED">
<property name="driver" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.4.221:3307/ry-vue?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false&amp;serverTimezone=Hongkong&amp;allowMultiQueries=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
</dataSource>
</environment>
</environments>
<!-- 用于指定其它mapper.xml文件的位置,也就是路径
这个路径是从target/classes根路径开始的,也即是com/**/**/这样的
使用注意:使用 / 分隔路径;一个mapper标签指定一个文件;
-->
<mappers>
<mapper resource="com/censoft/flink/mapper/AlgorithmSceneDao.xml"></mapper>
</mappers>
</configuration>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment