Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
my-flink-project
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
AI算法平台
my-flink-project
Commits
5fb44595
You need to sign in or sign up before continuing.
Commit
5fb44595
authored
Nov 29, 2023
by
薄玉虎
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1.增加算法场景日志信息
2.增加flink的运行程序jobname 3.调整乌海矿于海南矿摄像头id不同区别的逻辑
parent
a20dfdc7
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
18 additions
and
4 deletions
+18
-4
src/main/java/com/censoft/flink/StreamingJob.java
src/main/java/com/censoft/flink/StreamingJob.java
+13
-4
src/main/java/com/censoft/flink/transform/AlarmSecondFilterFunction.java
...om/censoft/flink/transform/AlarmSecondFilterFunction.java
+1
-0
src/main/java/com/censoft/flink/transform/AlarmThresholdFunction.java
...a/com/censoft/flink/transform/AlarmThresholdFunction.java
+1
-0
src/main/java/com/censoft/flink/transform/AlgorithmTypeFlatMapFunction.java
...censoft/flink/transform/AlgorithmTypeFlatMapFunction.java
+3
-0
No files found.
src/main/java/com/censoft/flink/StreamingJob.java
View file @
5fb44595
...
...
@@ -49,6 +49,7 @@ public class StreamingJob {
//2、接收消息并将数据转化为流
//根据场景id 拼接对应的mqtt频道名称
// DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/cpptest/10"));
DataStream
<
AlgorithmPushDto
>
mqttStream
=
env
.
addSource
(
new
MqttConsumer
(
"/censoft/cpptest/"
+
(
sceneId
-
1
)));
//3、进行处理
...
...
@@ -86,9 +87,14 @@ public class StreamingJob {
warnPushBase
.
setAlarmDeviceName
(
cameraPo
.
getCameraName
());
//获取摄像头归属
warnPushBase
.
setBelong
(
cameraPo
.
getBelong
());
//获取摄像头Id
warnPushBase
.
setChannelId
(
cameraPo
.
getRtsp
()+
"$1$0$0"
);
warnPushBase
.
setGradeName
(
"二级预警"
);
//获取摄像头Id 乌海矿与海南矿实际情况不同
if
(
"乌海矿"
.
equals
(
cameraPo
.
getBelong
())){
warnPushBase
.
setChannelId
(
cameraPo
.
getRtsp
()+
"$1$0$0"
);
}
else
{
warnPushBase
.
setChannelId
(
cameraPo
.
getRtsp
().
replace
(
"_"
,
"$1$0$"
));
}
warnPushBase
.
setGradeName
(
"预警"
);
//获取摄像头名称
String
algorithmName
=
getAlgorithmPoById
(
algorithmPoList
,
Long
.
valueOf
(
algorithmPushDto
.
getAlgorithmName
()))
.
getAlgorithmName
();
...
...
@@ -116,7 +122,10 @@ public class StreamingJob {
outputStreamOperator
.
print
();
outputStreamOperator
.
writeAsText
(
"F:/word"
+
sceneId
+
".txt"
,
FileSystem
.
WriteMode
.
OVERWRITE
).
setParallelism
(
1
);
//3、7 自动执行
env
.
execute
();
env
.
execute
(
sceneId
.
toString
());
}
...
...
src/main/java/com/censoft/flink/transform/AlarmSecondFilterFunction.java
View file @
5fb44595
...
...
@@ -60,6 +60,7 @@ public class AlarmSecondFilterFunction implements AlgorithmBaseFilterFunction {
cache
.
put
(
algorithmType
,
algorithmPushDto
.
getTimeStamp
());
return
true
;
}
System
.
out
.
println
(
algorithmPushDto
.
getCameraName
()+
"--"
+
algorithmPushDto
.
getAlgorithmName
()+
"--"
+
"未超过报警时间间隔"
);
return
false
;
}
}
src/main/java/com/censoft/flink/transform/AlarmThresholdFunction.java
View file @
5fb44595
...
...
@@ -50,6 +50,7 @@ public class AlarmThresholdFunction implements AlgorithmBaseFilterFunction {
.
stream
()
.
map
(
po
->
{
String
label
=
po
.
getLabel
();
System
.
out
.
println
(
label
);
if
(
label
.
contains
(
"_"
))
{
String
thresholdString
=
label
.
substring
(
label
.
indexOf
(
"_"
)
+
1
);
if
(
NumberUtil
.
isInteger
(
thresholdString
))
{
...
...
src/main/java/com/censoft/flink/transform/AlgorithmTypeFlatMapFunction.java
View file @
5fb44595
...
...
@@ -29,6 +29,7 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
//2、将相同label的Key值分出多个预警
List
<
AlgorithmPushResultBoxDto
>
boxDtos
=
algorithmPushDto
.
getResult
();
System
.
out
.
println
(
algorithmPushDto
.
toString
());
List
<
String
>
types
=
boxDtos
.
stream
()
...
...
@@ -53,6 +54,8 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
pushDto
.
setPictureAddress
(
algorithmPushDto
.
getPictureAddress
());
pushDto
.
setSort
(
algorithmPushDto
.
getSort
());
pushDto
.
setTimeStamp
(
algorithmPushDto
.
getTimeStamp
());
pushDto
.
setServerIp
(
algorithmPushDto
.
getServerIp
());
pushDto
.
setRemark
(
algorithmPushDto
.
getRemark
());
pushDto
.
setAlgorithmType
(
type
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment