Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
my-flink-project
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
AI算法平台
my-flink-project
Commits
f0e5bef6
Commit
f0e5bef6
authored
Sep 15, 2023
by
周昊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1、修改传输kafka数据
parent
62e34938
Changes
7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
203 additions
and
6 deletions
+203
-6
src/main/java/com/censoft/flink/StreamingJob.java
src/main/java/com/censoft/flink/StreamingJob.java
+45
-6
src/main/java/com/censoft/flink/domain/AlgorithmCameraPo.java
...main/java/com/censoft/flink/domain/AlgorithmCameraPo.java
+32
-0
src/main/java/com/censoft/flink/domain/AlgorithmPo.java
src/main/java/com/censoft/flink/domain/AlgorithmPo.java
+31
-0
src/main/java/com/censoft/flink/domain/AlgorithmWarnPushDto.java
...n/java/com/censoft/flink/domain/AlgorithmWarnPushDto.java
+39
-0
src/main/java/com/censoft/flink/mapper/AlgorithmSceneDao.java
...main/java/com/censoft/flink/mapper/AlgorithmSceneDao.java
+6
-0
src/main/java/com/censoft/flink/mapper/AlgorithmSceneDao.xml
src/main/java/com/censoft/flink/mapper/AlgorithmSceneDao.xml
+24
-0
src/main/java/com/censoft/flink/mapper/SqlFactory.java
src/main/java/com/censoft/flink/mapper/SqlFactory.java
+26
-0
No files found.
src/main/java/com/censoft/flink/StreamingJob.java
View file @
f0e5bef6
package
com
.
censoft
.
flink
;
package
com
.
censoft
.
flink
;
import
com.censoft.flink.domain.AlgorithmPushDto
;
import
com.alibaba.fastjson2.JSON
;
import
com.censoft.flink.domain.AlgorithmSceneBasePo
;
import
com.censoft.flink.domain.*
;
import
com.censoft.flink.domain.AlgorithmScenePiecePo
;
import
com.censoft.flink.mapper.SqlFactory
;
import
com.censoft.flink.mapper.SqlFactory
;
import
com.censoft.flink.mqtt.MqttConsumer
;
import
com.censoft.flink.mqtt.MqttConsumer
;
import
com.censoft.flink.transform.AlgorithmBaseFilterFunction
;
import
com.censoft.flink.transform.AlgorithmBaseFilterFunction
;
import
com.censoft.flink.transform.AlgorithmTypeFlatMapFunction
;
import
com.censoft.flink.transform.AlgorithmTypeFlatMapFunction
;
import
com.censoft.flink.transform.ResultExistFilterFunction
;
import
com.censoft.flink.transform.ResultExistFilterFunction
;
import
com.censoft.flink.transform.UpdateLiveFilterFunction
;
import
com.censoft.flink.transform.UpdateLiveFilterFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.core.fs.FileSystem
;
import
org.apache.flink.core.fs.FileSystem
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
;
import
java.lang.reflect.Method
;
import
java.lang.reflect.Method
;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Optional
;
public
class
StreamingJob
{
public
class
StreamingJob
{
...
@@ -35,6 +38,12 @@ public class StreamingJob {
...
@@ -35,6 +38,12 @@ public class StreamingJob {
//2.2 获取场景算法块数据
//2.2 获取场景算法块数据
List
<
AlgorithmScenePiecePo
>
algorithmScenePieceList
=
sqlFactory
.
getAlgorithmScenePieceList
(
sceneId
);
List
<
AlgorithmScenePiecePo
>
algorithmScenePieceList
=
sqlFactory
.
getAlgorithmScenePieceList
(
sceneId
);
//2.3 获取算法列表
List
<
AlgorithmPo
>
algorithmPoList
=
sqlFactory
.
getAlgorithmPoList
();
//2.4 获取摄像头列表
List
<
AlgorithmCameraPo
>
algorithmCameraPoList
=
sqlFactory
.
getAlgorithmCameraPoList
();
//1、创建流式执行环境
//1、创建流式执行环境
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
...
@@ -69,15 +78,45 @@ public class StreamingJob {
...
@@ -69,15 +78,45 @@ public class StreamingJob {
}
}
//3、4 Object -> String
//3、4 Object -> String
SingleOutputStreamOperator
<
String
>
outputStreamOperator
=
streamOperator
.
map
(
AlgorithmPushDto:
:
toString
);
SingleOutputStreamOperator
<
String
>
outputStreamOperator
=
streamOperator
.
map
(
algorithmPushDto
->
{
AlgorithmWarnPushDto
warnPushBase
=
new
AlgorithmWarnPushDto
();
warnPushBase
.
setOrderName
(
"AI预警平台"
);
//获取摄像头名称
warnPushBase
.
setAlarmDeviceName
(
getAlgorithmCameraPoByRtsp
(
algorithmCameraPoList
,
algorithmPushDto
.
getCameraName
())
.
getCameraName
());
warnPushBase
.
setGradeName
(
"二级预警"
);
//获取摄像头名称
warnPushBase
.
setGradeType
(
getAlgorithmPoById
(
algorithmPoList
,
Long
.
valueOf
(
algorithmPushDto
.
getAlgorithmName
()))
.
getAlgorithmName
());
warnPushBase
.
setAlarmTime
(
new
Date
(
algorithmPushDto
.
getTimeStamp
()
*
1000
));
warnPushBase
.
setPicture
(
"http://172.16.21.3"
+
algorithmPushDto
.
getPictureAddress
());
warnPushBase
.
setStatus
(
"wait"
);
warnPushBase
.
setSendUserIds
(
"|1|"
);
warnPushBase
.
setDelFlag
(
0
);
return
JSON
.
toJSONString
(
warnPushBase
);
});
//3、5 输出kafka
//3、5 输出kafka
// outputStreamOperator.addSink(new FlinkKafkaProducer("172.16.33.152
:9092", "test-topic", new SimpleStringSchema()));
outputStreamOperator
.
addSink
(
new
FlinkKafkaProducer
(
"192.168.10.137
:9092"
,
"test-topic"
,
new
SimpleStringSchema
()));
//3、6 打印输出
//3、6 打印输出
outputStreamOperator
.
print
();
outputStreamOperator
.
print
();
outputStreamOperator
.
writeAsText
(
"D:/word"
+
sceneId
+
".txt"
,
FileSystem
.
WriteMode
.
OVERWRITE
).
setParallelism
(
1
);
outputStreamOperator
.
writeAsText
(
"D:/word"
+
sceneId
+
".txt"
,
FileSystem
.
WriteMode
.
OVERWRITE
).
setParallelism
(
1
);
//3、7 自动执行
//3、7 自动执行
env
.
execute
();
env
.
execute
();
}
}
private
static
AlgorithmCameraPo
getAlgorithmCameraPoByRtsp
(
List
<
AlgorithmCameraPo
>
cameraPos
,
String
rtsp
)
{
return
cameraPos
.
stream
()
.
filter
(
cameraPo
->
rtsp
.
equals
(
cameraPo
.
getRtsp
())).
findFirst
().
get
();
}
private
static
AlgorithmPo
getAlgorithmPoById
(
List
<
AlgorithmPo
>
algorithmPos
,
Long
id
)
{
return
algorithmPos
.
stream
()
.
filter
(
algorithmPo
->
id
.
equals
(
algorithmPo
.
getId
())).
findFirst
().
get
();
}
}
}
src/main/java/com/censoft/flink/domain/AlgorithmCameraPo.java
0 → 100644
View file @
f0e5bef6
package
com
.
censoft
.
flink
.
domain
;
import
lombok.Data
;
import
java.io.Serializable
;
/**
* @author 周昊
* @desc ...
* @date 2023-09-15 16:43:51
*/
@Data
public
class
AlgorithmCameraPo
implements
Serializable
{
/** 主键id */
private
Long
id
;
/** 摄像头名 */
private
String
cameraName
;
/** 摄像头视频流地址 */
private
String
rtsp
;
/** 排序 */
private
Long
sort
;
/** 帐号状态(0正常 */
private
String
status
;
/** 删除标志(0代表存在 */
private
String
delFlag
;
}
src/main/java/com/censoft/flink/domain/AlgorithmPo.java
0 → 100644
View file @
f0e5bef6
package
com
.
censoft
.
flink
.
domain
;
import
lombok.Data
;
import
java.io.Serializable
;
/**
* @author 周昊
* @desc ...
* @date 2023-09-15 16:47:21
*/
@Data
public
class
AlgorithmPo
implements
Serializable
{
/** 主键id */
private
Long
id
;
/** 算法名 */
private
String
algorithmName
;
/** 算法唯一标识 */
private
String
algorithmKey
;
/** 排序 */
private
Long
sort
;
/** 帐号状态(0正常 */
private
String
status
;
/** 删除标志(0代表存在 */
private
String
delFlag
;
}
src/main/java/com/censoft/flink/domain/AlgorithmWarnPushDto.java
0 → 100644
View file @
f0e5bef6
package
com
.
censoft
.
flink
.
domain
;
import
lombok.Data
;
import
java.util.Date
;
/**
* 算法计算结果标准接收对象
*/
@Data
public
class
AlgorithmWarnPushDto
{
/** 所属系统名称 */
private
String
orderName
;
/** 预警设备名称 */
private
String
alarmDeviceName
;
/** 预警等级 */
private
String
gradeName
;
/** 预警类型 */
private
String
gradeType
;
/** 报警时间 */
private
Date
alarmTime
;
/** 图片地址 */
private
String
picture
;
/** 处置状态 */
private
String
status
;
/** 接受发送人 */
private
String
sendUserIds
;
/** 删除标记 */
private
Integer
delFlag
;
}
src/main/java/com/censoft/flink/mapper/AlgorithmSceneDao.java
View file @
f0e5bef6
package
com
.
censoft
.
flink
.
mapper
;
package
com
.
censoft
.
flink
.
mapper
;
import
com.censoft.flink.domain.AlgorithmCameraPo
;
import
com.censoft.flink.domain.AlgorithmPo
;
import
com.censoft.flink.domain.AlgorithmSceneBasePo
;
import
com.censoft.flink.domain.AlgorithmSceneBasePo
;
import
com.censoft.flink.domain.AlgorithmScenePiecePo
;
import
com.censoft.flink.domain.AlgorithmScenePiecePo
;
import
org.apache.ibatis.annotations.Param
;
import
org.apache.ibatis.annotations.Param
;
...
@@ -13,4 +15,8 @@ public interface AlgorithmSceneDao {
...
@@ -13,4 +15,8 @@ public interface AlgorithmSceneDao {
public
List
<
AlgorithmScenePiecePo
>
getAlgorithmScenePieceList
(
@Param
(
"sceneId"
)
Long
sceneId
);
public
List
<
AlgorithmScenePiecePo
>
getAlgorithmScenePieceList
(
@Param
(
"sceneId"
)
Long
sceneId
);
public
int
updateLiveBySceneId
(
Long
sceneId
);
public
int
updateLiveBySceneId
(
Long
sceneId
);
public
List
<
AlgorithmCameraPo
>
getAlgorithmCameraPoList
();
public
List
<
AlgorithmPo
>
getAlgorithmPoList
();
}
}
src/main/java/com/censoft/flink/mapper/AlgorithmSceneDao.xml
View file @
f0e5bef6
...
@@ -45,5 +45,29 @@
...
@@ -45,5 +45,29 @@
ORDER BY asp.sort ASC
ORDER BY asp.sort ASC
</select>
</select>
<select
id=
"getAlgorithmCameraPoList"
resultType=
"com.censoft.flink.domain.AlgorithmCameraPo"
>
SELECT
id,
camera_name as cameraName,
rtsp,
sort,
STATUS,
del_flag as delFlag
FROM
algorithm_camera_base
</select>
<select
id=
"getAlgorithmPoList"
resultType=
"com.censoft.flink.domain.AlgorithmPo"
>
SELECT
id,
algorithm_name AS algorithmName,
algorithm_key AS algorithmKey,
sort,
STATUS,
del_flag AS delFlag
FROM
algorithm_base
</select>
</mapper>
</mapper>
\ No newline at end of file
src/main/java/com/censoft/flink/mapper/SqlFactory.java
View file @
f0e5bef6
package
com
.
censoft
.
flink
.
mapper
;
package
com
.
censoft
.
flink
.
mapper
;
import
com.censoft.flink.domain.AlgorithmCameraPo
;
import
com.censoft.flink.domain.AlgorithmPo
;
import
com.censoft.flink.domain.AlgorithmSceneBasePo
;
import
com.censoft.flink.domain.AlgorithmSceneBasePo
;
import
com.censoft.flink.domain.AlgorithmScenePiecePo
;
import
com.censoft.flink.domain.AlgorithmScenePiecePo
;
import
org.apache.ibatis.io.Resources
;
import
org.apache.ibatis.io.Resources
;
...
@@ -42,6 +44,30 @@ public class SqlFactory {
...
@@ -42,6 +44,30 @@ public class SqlFactory {
return
algorithmScenePiecePoList
;
return
algorithmScenePiecePoList
;
}
}
public
List
<
AlgorithmCameraPo
>
getAlgorithmCameraPoList
()
throws
IOException
{
SqlSession
session
=
initSqlSession
();
//指定要执行的SQL语句的id
//SQL语句的id=namespace+"."+SQL语句所在标签的id属性的值
String
SQLID
=
"com.censoft.flink.mapper.AlgorithmSceneDao"
+
"."
+
"getAlgorithmCameraPoList"
;
//通过SqlSession对象的方法执行SQL语句
List
<
AlgorithmCameraPo
>
algorithmCameraPos
=
session
.
selectList
(
SQLID
);
//最后我们关闭SqlSession对象
session
.
close
();
return
algorithmCameraPos
;
}
public
List
<
AlgorithmPo
>
getAlgorithmPoList
()
throws
IOException
{
SqlSession
session
=
initSqlSession
();
//指定要执行的SQL语句的id
//SQL语句的id=namespace+"."+SQL语句所在标签的id属性的值
String
SQLID
=
"com.censoft.flink.mapper.AlgorithmSceneDao"
+
"."
+
"getAlgorithmPoList"
;
//通过SqlSession对象的方法执行SQL语句
List
<
AlgorithmPo
>
algorithmPos
=
session
.
selectList
(
SQLID
);
//最后我们关闭SqlSession对象
session
.
close
();
return
algorithmPos
;
}
public
int
updateLiveBySceneId
(
Long
sceneId
)
throws
IOException
{
public
int
updateLiveBySceneId
(
Long
sceneId
)
throws
IOException
{
SqlSession
session
=
initSqlSession
();
SqlSession
session
=
initSqlSession
();
//指定要执行的SQL语句的id
//指定要执行的SQL语句的id
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment