Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
my-flink-project
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
AI算法平台
my-flink-project
Commits
7f44f535
Commit
7f44f535
authored
Nov 08, 2023
by
周昊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1、添加对于数据库无摄像头数据报错校验
parent
66eae195
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
26 additions
and
6 deletions
+26
-6
src/main/java/com/censoft/flink/StreamingJob.java
src/main/java/com/censoft/flink/StreamingJob.java
+26
-6
No files found.
src/main/java/com/censoft/flink/StreamingJob.java
View file @
7f44f535
...
@@ -80,6 +80,9 @@ public class StreamingJob {
...
@@ -80,6 +80,9 @@ public class StreamingJob {
warnPushBase
.
setOrderName
(
"AI预警平台"
);
warnPushBase
.
setOrderName
(
"AI预警平台"
);
//获取摄像头名称
//获取摄像头名称
AlgorithmCameraPo
cameraPo
=
getAlgorithmCameraPoByRtsp
(
algorithmCameraPoList
,
algorithmPushDto
.
getCameraName
());
AlgorithmCameraPo
cameraPo
=
getAlgorithmCameraPoByRtsp
(
algorithmCameraPoList
,
algorithmPushDto
.
getCameraName
());
if
(
cameraPo
==
null
){
return
null
;
}
warnPushBase
.
setAlarmDeviceName
(
cameraPo
.
getCameraName
());
warnPushBase
.
setAlarmDeviceName
(
cameraPo
.
getCameraName
());
//获取摄像头归属
//获取摄像头归属
warnPushBase
.
setBelong
(
cameraPo
.
getBelong
());
warnPushBase
.
setBelong
(
cameraPo
.
getBelong
());
...
@@ -87,8 +90,12 @@ public class StreamingJob {
...
@@ -87,8 +90,12 @@ public class StreamingJob {
warnPushBase
.
setChannelId
(
cameraPo
.
getRtsp
()+
"$1$0$0"
);
warnPushBase
.
setChannelId
(
cameraPo
.
getRtsp
()+
"$1$0$0"
);
warnPushBase
.
setGradeName
(
"二级预警"
);
warnPushBase
.
setGradeName
(
"二级预警"
);
//获取摄像头名称
//获取摄像头名称
warnPushBase
.
setGradeType
(
getAlgorithmPoById
(
algorithmPoList
,
Long
.
valueOf
(
algorithmPushDto
.
getAlgorithmName
()))
String
algorithmName
=
getAlgorithmPoById
(
algorithmPoList
,
Long
.
valueOf
(
algorithmPushDto
.
getAlgorithmName
()))
.
getAlgorithmName
());
.
getAlgorithmName
();
warnPushBase
.
setGradeType
(
algorithmName
);
if
(
cameraPo
==
null
){
return
null
;
}
warnPushBase
.
setAlarmTime
(
new
Date
(
algorithmPushDto
.
getTimeStamp
()
*
1000
));
warnPushBase
.
setAlarmTime
(
new
Date
(
algorithmPushDto
.
getTimeStamp
()
*
1000
));
//获取图片地址
//获取图片地址
warnPushBase
.
setPicture
(
"http://"
+
algorithmPushDto
.
getServerIp
()+
algorithmPushDto
.
getPictureAddress
());
warnPushBase
.
setPicture
(
"http://"
+
algorithmPushDto
.
getServerIp
()+
algorithmPushDto
.
getPictureAddress
());
...
@@ -99,6 +106,9 @@ public class StreamingJob {
...
@@ -99,6 +106,9 @@ public class StreamingJob {
return
JSON
.
toJSONString
(
warnPushBase
);
return
JSON
.
toJSONString
(
warnPushBase
);
});
});
//去空
outputStreamOperator
=
outputStreamOperator
.
filter
(
algorithmPushString
->
algorithmPushString
!=
null
&&
""
.
equals
(
algorithmPushString
));
//3、5 输出kafka
//3、5 输出kafka
outputStreamOperator
.
addSink
(
new
FlinkKafkaProducer
(
"192.168.10.137:9092"
,
"test-topic"
,
new
SimpleStringSchema
()));
outputStreamOperator
.
addSink
(
new
FlinkKafkaProducer
(
"192.168.10.137:9092"
,
"test-topic"
,
new
SimpleStringSchema
()));
...
@@ -111,15 +121,25 @@ public class StreamingJob {
...
@@ -111,15 +121,25 @@ public class StreamingJob {
private
static
AlgorithmCameraPo
getAlgorithmCameraPoByRtsp
(
List
<
AlgorithmCameraPo
>
cameraPos
,
String
rtsp
)
{
private
static
AlgorithmCameraPo
getAlgorithmCameraPoByRtsp
(
List
<
AlgorithmCameraPo
>
cameraPos
,
String
rtsp
)
{
return
cameraPos
Optional
<
AlgorithmCameraPo
>
first
=
cameraPos
.
stream
()
.
stream
()
.
filter
(
cameraPo
->
rtsp
.
equals
(
cameraPo
.
getRtsp
())).
findFirst
().
get
();
.
filter
(
cameraPo
->
rtsp
.
equals
(
cameraPo
.
getRtsp
())).
findFirst
();
if
(
first
.
isPresent
())
{
return
first
.
get
();
}
else
{
return
null
;
}
}
}
private
static
AlgorithmPo
getAlgorithmPoById
(
List
<
AlgorithmPo
>
algorithmPos
,
Long
id
)
{
private
static
AlgorithmPo
getAlgorithmPoById
(
List
<
AlgorithmPo
>
algorithmPos
,
Long
id
)
{
return
algorithmPos
Optional
<
AlgorithmPo
>
first
=
algorithmPos
.
stream
()
.
stream
()
.
filter
(
algorithmPo
->
id
.
equals
(
algorithmPo
.
getId
())).
findFirst
().
get
();
.
filter
(
algorithmPo
->
id
.
equals
(
algorithmPo
.
getId
())).
findFirst
();
if
(
first
.
isPresent
())
{
return
first
.
get
();
}
else
{
return
null
;
}
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment