Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
my-kafka-project
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
AI算法平台
my-kafka-project
Commits
3864c971
Commit
3864c971
authored
Nov 17, 2023
by
周昊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Initial commit
parents
Changes
16
Show whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
985 additions
and
0 deletions
+985
-0
.gitignore
.gitignore
+33
-0
pom.xml
pom.xml
+133
-0
src/main/java/com/example/mykafkaproject/EimpKafkaConsumerListener.java
...com/example/mykafkaproject/EimpKafkaConsumerListener.java
+37
-0
src/main/java/com/example/mykafkaproject/KafkaCondition.java
src/main/java/com/example/mykafkaproject/KafkaCondition.java
+60
-0
src/main/java/com/example/mykafkaproject/KafkaConsumerConfig.java
.../java/com/example/mykafkaproject/KafkaConsumerConfig.java
+85
-0
src/main/java/com/example/mykafkaproject/MyKafkaProjectApplication.java
...com/example/mykafkaproject/MyKafkaProjectApplication.java
+16
-0
src/main/java/com/example/mykafkaproject/bean/IotWarnPushDto.java
.../java/com/example/mykafkaproject/bean/IotWarnPushDto.java
+90
-0
src/main/java/com/example/mykafkaproject/bean/WarnPushBase.java
...in/java/com/example/mykafkaproject/bean/WarnPushBase.java
+167
-0
src/main/java/com/example/mykafkaproject/mapper/WarnPushBaseMapper.java
...com/example/mykafkaproject/mapper/WarnPushBaseMapper.java
+12
-0
src/main/java/com/example/mykafkaproject/server/WarnPushBaseServer.java
...com/example/mykafkaproject/server/WarnPushBaseServer.java
+134
-0
src/main/java/com/example/mykafkaproject/utils/PinYinUtil.java
...ain/java/com/example/mykafkaproject/utils/PinYinUtil.java
+34
-0
src/main/java/com/example/mykafkaproject/utils/RtspToMP4.java
...main/java/com/example/mykafkaproject/utils/RtspToMP4.java
+87
-0
src/main/resources/application.yml
src/main/resources/application.yml
+19
-0
src/main/resources/config/iccSdk.properties
src/main/resources/config/iccSdk.properties
+25
-0
src/main/resources/mapper/WarnPushBaseMapper.xml
src/main/resources/mapper/WarnPushBaseMapper.xml
+40
-0
src/test/java/com/example/mykafkaproject/MyKafkaProjectApplicationTests.java
...xample/mykafkaproject/MyKafkaProjectApplicationTests.java
+13
-0
No files found.
.gitignore
0 → 100644
View file @
3864c971
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
pom.xml
0 → 100644
View file @
3864c971
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
com.example
</groupId>
<artifactId>
my-kafka-project
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
<name>
my-kafka-project
</name>
<description>
my-kafka-project
</description>
<properties>
<java.version>
1.8
</java.version>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<project.reporting.outputEncoding>
UTF-8
</project.reporting.outputEncoding>
<spring-boot.version>
2.6.13
</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-test
</artifactId>
<scope>
test
</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>
org.springframework.kafka
</groupId>
<artifactId>
spring-kafka
</artifactId>
</dependency>
<!--JDBC-->
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-jdbc
</artifactId>
</dependency>
<!--Mybatis-->
<dependency>
<groupId>
org.mybatis.spring.boot
</groupId>
<artifactId>
mybatis-spring-boot-starter
</artifactId>
<version>
2.2.0
</version>
</dependency>
<!--MySQL-->
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<scope>
runtime
</scope>
</dependency>
<!-- 阿里JSON解析器 -->
<dependency>
<groupId>
com.alibaba.fastjson2
</groupId>
<artifactId>
fastjson2
</artifactId>
<version>
2.0.25
</version>
</dependency>
<!-- ICC鉴权 -->
<dependency>
<groupId>
com.dahuatech.icc
</groupId>
<artifactId>
java-sdk-oauth
</artifactId>
<version>
1.0.9.1
</version>
<exclusions>
<exclusion>
<artifactId>
java-sdk-core
</artifactId>
<groupId>
com.dahuatech.icc
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
com.dahuatech.icc
</groupId>
<artifactId>
java-sdk-core
</artifactId>
<version>
1.0.9.1
</version>
</dependency>
<dependency>
<groupId>
cn.hutool
</groupId>
<artifactId>
hutool-all
</artifactId>
<version>
5.8.16
</version>
</dependency>
<dependency>
<groupId>
com.belerweb
</groupId>
<artifactId>
pinyin4j
</artifactId>
<version>
2.5.1
</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-dependencies
</artifactId>
<version>
${spring-boot.version}
</version>
<type>
pom
</type>
<scope>
import
</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.8.1
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<plugin>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-maven-plugin
</artifactId>
<!--<configuration>
<mainClass>com.example.mykafkaproject.MyKafkaProjectApplication</mainClass>
<skip>true</skip>
</configuration>-->
<executions>
<execution>
<id>
repackage
</id>
<goals>
<goal>
repackage
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
src/main/java/com/example/mykafkaproject/EimpKafkaConsumerListener.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
;
import
com.example.mykafkaproject.server.WarnPushBaseServer
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Conditional
;
import
org.springframework.kafka.annotation.KafkaListener
;
import
org.springframework.stereotype.Component
;
import
org.slf4j.Logger
;
/**
* Kafka消费者Listener
*
* @author thcb
* @date 2023-05-24
*/
@Component
@Conditional
(
KafkaCondition
.
class
)
public
class
EimpKafkaConsumerListener
{
public
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
EimpKafkaConsumerListener
.
class
);
public
final
static
String
TOPIC
=
"test-topic"
;
public
final
static
String
GROUP_ID
=
"test_group"
;
@Autowired
private
WarnPushBaseServer
warnPushBaseServer
;
//监听kafka消费
@KafkaListener
(
topics
=
TOPIC
,
groupId
=
GROUP_ID
,
containerFactory
=
"kafkaListenerContainerFactory"
)
@Conditional
(
KafkaCondition
.
class
)
public
void
onMessage
(
String
message
)
{
warnPushBaseServer
.
addMessage
(
message
);
log
.
info
(
"EimpKafkaConsumerListener onMessage:{}"
,
message
);
}
}
src/main/java/com/example/mykafkaproject/KafkaCondition.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.context.annotation.ConditionContext
;
import
org.springframework.core.type.AnnotatedTypeMetadata
;
import
java.io.IOException
;
import
java.net.InetSocketAddress
;
import
java.net.Socket
;
import
java.net.URI
;
import
org.springframework.context.annotation.Condition
;
import
org.slf4j.Logger
;
/**
* kafka动态启动
* kafka代理服务器正常时启动kafka服务
* kafka代理服务器不可用时,不启动kafka服务
*
* @author thcb
* @date 2023-05-24
*/
public
class
KafkaCondition
implements
Condition
{
public
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
KafkaCondition
.
class
);
@Override
public
boolean
matches
(
ConditionContext
context
,
AnnotatedTypeMetadata
metadata
)
{
URI
uri
=
URI
.
create
(
"http://"
+
KafkaConsumerConfig
.
BOOTSTRAP_SERVERS
);
String
host
=
uri
.
getHost
();
int
port1
=
uri
.
getPort
();
boolean
b
=
this
.
isHostConnectable
(
host
,
port1
);
log
.
info
(
"matches:{}"
,
b
);
return
b
;
}
/**
* 判断kafka服务器,能否正常连接
*
* @param host
* @param port
* @return
*/
public
boolean
isHostConnectable
(
String
host
,
int
port
)
{
log
.
info
(
"isHostConnectable:host:{},port:{}"
,
host
,
port
);
Socket
socket
=
new
Socket
();
try
{
//判断kafka网络是否能联通,不能连通则返回false
socket
.
connect
(
new
InetSocketAddress
(
host
,
port
),
2000
);
}
catch
(
IOException
e
)
{
log
.
error
(
"isHostConnectable:{}"
,
e
.
getMessage
());
return
false
;
}
finally
{
try
{
socket
.
close
();
}
catch
(
IOException
e
)
{
log
.
error
(
"isHostConnectable:{}"
,
e
.
getMessage
());
}
}
return
true
;
}
}
src/main/java/com/example/mykafkaproject/KafkaConsumerConfig.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
;
import
org.apache.kafka.clients.consumer.ConsumerConfig
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Conditional
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.kafka.annotation.EnableKafka
;
import
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
;
import
org.springframework.kafka.config.KafkaListenerContainerFactory
;
import
org.springframework.kafka.core.ConsumerFactory
;
import
org.springframework.kafka.core.DefaultKafkaConsumerFactory
;
import
org.springframework.kafka.listener.ConcurrentMessageListenerContainer
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* Kafka消费者配置类
*
* @author thcb
* @date 2023-05-24
*/
@Configuration
@EnableKafka
public
class
KafkaConsumerConfig
{
public
final
static
String
BOOTSTRAP_SERVERS
=
"172.16.20.211:9092"
;
// public final static String BOOTSTRAP_SERVERS = "192.168.10.137:9092";
public
final
static
String
GROUP_ID
=
"test_group"
;
@Bean
@Conditional
(
KafkaCondition
.
class
)
KafkaListenerContainerFactory
<
ConcurrentMessageListenerContainer
<
String
,
String
>>
kafkaListenerContainerFactory
()
{
ConcurrentKafkaListenerContainerFactory
<
String
,
String
>
factory
=
new
ConcurrentKafkaListenerContainerFactory
<>();
// 设置消费者工厂
factory
.
setConsumerFactory
(
consumerFactory
());
// 消费者组中线程数量
factory
.
setConcurrency
(
3
);
// 拉取超时时间
factory
.
getContainerProperties
().
setPollTimeout
(
3000
);
// 当使用批量监听器时需要设置为true
factory
.
setBatchListener
(
true
);
return
factory
;
}
@Bean
@Conditional
(
KafkaCondition
.
class
)
public
ConsumerFactory
<
String
,
String
>
consumerFactory
()
{
return
new
DefaultKafkaConsumerFactory
<>(
consumerConfigs
());
}
@Bean
@Conditional
(
KafkaCondition
.
class
)
public
Map
<
String
,
Object
>
consumerConfigs
()
{
Map
<
String
,
Object
>
propsMap
=
new
HashMap
<>();
// Kafka地址
propsMap
.
put
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
BOOTSTRAP_SERVERS
);
//配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
propsMap
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
GROUP_ID
);
// 是否自动提交offset偏移量(默认true)
propsMap
.
put
(
ConsumerConfig
.
ENABLE_AUTO_COMMIT_CONFIG
,
true
);
// 自动提交的频率(ms)
propsMap
.
put
(
ConsumerConfig
.
AUTO_COMMIT_INTERVAL_MS_CONFIG
,
"100"
);
// Session超时设置
propsMap
.
put
(
ConsumerConfig
.
SESSION_TIMEOUT_MS_CONFIG
,
"15000"
);
//请求超时时间
propsMap
.
put
(
ConsumerConfig
.
REQUEST_TIMEOUT_MS_CONFIG
,
"15000"
);
// 键的反序列化方式
propsMap
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
// 值的反序列化方式
propsMap
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
);
// offset偏移量规则设置:
// (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
propsMap
.
put
(
ConsumerConfig
.
AUTO_OFFSET_RESET_CONFIG
,
"latest"
);
return
propsMap
;
}
}
src/main/java/com/example/mykafkaproject/MyKafkaProjectApplication.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
;
import
org.apache.ibatis.annotations.Mapper
;
import
org.mybatis.spring.annotation.MapperScan
;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
@SpringBootApplication
@MapperScan
(
value
=
"com.example.mykafkaproject.mapper"
)
public
class
MyKafkaProjectApplication
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
.
run
(
MyKafkaProjectApplication
.
class
,
args
);
}
}
src/main/java/com/example/mykafkaproject/bean/IotWarnPushDto.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
.
bean
;
/**
* @author 周昊
* @desc ...
* @date 2023-11-06 16:27:37
*/
public
class
IotWarnPushDto
{
private
String
order_name
;
private
String
alarm_device_name
;
private
String
grade_name
;
private
String
grade_type
;
private
String
grade_type_code
;
private
String
alarm_time
;
private
String
picture
;
private
String
belong
;
private
String
create_time
;
public
String
getOrder_name
()
{
return
order_name
;
}
public
void
setOrder_name
(
String
order_name
)
{
this
.
order_name
=
order_name
;
}
public
String
getAlarm_device_name
()
{
return
alarm_device_name
;
}
public
void
setAlarm_device_name
(
String
alarm_device_name
)
{
this
.
alarm_device_name
=
alarm_device_name
;
}
public
String
getGrade_name
()
{
return
grade_name
;
}
public
void
setGrade_name
(
String
grade_name
)
{
this
.
grade_name
=
grade_name
;
}
public
String
getGrade_type
()
{
return
grade_type
;
}
public
void
setGrade_type
(
String
grade_type
)
{
this
.
grade_type
=
grade_type
;
}
public
String
getAlarm_time
()
{
return
alarm_time
;
}
public
void
setAlarm_time
(
String
alarm_time
)
{
this
.
alarm_time
=
alarm_time
;
}
public
String
getPicture
()
{
return
picture
;
}
public
void
setPicture
(
String
picture
)
{
this
.
picture
=
picture
;
}
public
String
getBelong
()
{
return
belong
;
}
public
void
setBelong
(
String
belong
)
{
this
.
belong
=
belong
;
}
public
String
getCreate_time
()
{
return
create_time
;
}
public
void
setCreate_time
(
String
create_time
)
{
this
.
create_time
=
create_time
;
}
public
String
getGrade_type_code
()
{
return
grade_type_code
;
}
public
void
setGrade_type_code
(
String
grade_type_code
)
{
this
.
grade_type_code
=
grade_type_code
;
}
}
src/main/java/com/example/mykafkaproject/bean/WarnPushBase.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
.
bean
;
import
java.util.Date
;
/**
* 预警记录对象 warn_push_base
*
* @author censoft
* @date 2023-08-08
*/
public
class
WarnPushBase
{
/** null */
private
Long
id
;
/** 所属系统名称 */
private
String
orderName
;
/** 预警设备名称 */
private
String
alarmDeviceName
;
/** 预警等级 */
private
String
gradeName
;
/** 预警类型 */
private
String
gradeType
;
/** 报警时间 */
private
Date
alarmTime
;
/** 图片地址 */
private
String
picture
;
/** 视频地址 */
private
String
video
;
/** 处置状态 */
private
String
status
;
/** 接受发送人 */
private
String
sendUserIds
;
/** 删除标记 */
private
Integer
delFlag
;
/** 备注 */
private
String
remark
;
/** 归属矿场 */
private
String
belong
;
/** 摄像头Id */
private
String
channelId
;
public
Long
getId
()
{
return
id
;
}
public
void
setId
(
Long
id
)
{
this
.
id
=
id
;
}
public
String
getOrderName
()
{
return
orderName
;
}
public
void
setOrderName
(
String
orderName
)
{
this
.
orderName
=
orderName
;
}
public
String
getAlarmDeviceName
()
{
return
alarmDeviceName
;
}
public
void
setAlarmDeviceName
(
String
alarmDeviceName
)
{
this
.
alarmDeviceName
=
alarmDeviceName
;
}
public
String
getGradeName
()
{
return
gradeName
;
}
public
void
setGradeName
(
String
gradeName
)
{
this
.
gradeName
=
gradeName
;
}
public
String
getGradeType
()
{
return
gradeType
;
}
public
void
setGradeType
(
String
gradeType
)
{
this
.
gradeType
=
gradeType
;
}
public
Date
getAlarmTime
()
{
return
alarmTime
;
}
public
void
setAlarmTime
(
Date
alarmTime
)
{
this
.
alarmTime
=
alarmTime
;
}
public
String
getPicture
()
{
return
picture
;
}
public
void
setPicture
(
String
picture
)
{
this
.
picture
=
picture
;
}
public
String
getStatus
()
{
return
status
;
}
public
void
setStatus
(
String
status
)
{
this
.
status
=
status
;
}
public
String
getSendUserIds
()
{
return
sendUserIds
;
}
public
void
setSendUserIds
(
String
sendUserIds
)
{
this
.
sendUserIds
=
sendUserIds
;
}
public
Integer
getDelFlag
()
{
return
delFlag
;
}
public
void
setDelFlag
(
Integer
delFlag
)
{
this
.
delFlag
=
delFlag
;
}
public
String
getRemark
()
{
return
remark
;
}
public
void
setRemark
(
String
remark
)
{
this
.
remark
=
remark
;
}
public
String
getBelong
()
{
return
belong
;
}
public
void
setBelong
(
String
belong
)
{
this
.
belong
=
belong
;
}
public
String
getVideo
()
{
return
video
;
}
public
void
setVideo
(
String
video
)
{
this
.
video
=
video
;
}
public
String
getChannelId
()
{
return
channelId
;
}
public
void
setChannelId
(
String
channelId
)
{
this
.
channelId
=
channelId
;
}
}
src/main/java/com/example/mykafkaproject/mapper/WarnPushBaseMapper.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
.
mapper
;
import
com.example.mykafkaproject.bean.WarnPushBase
;
import
org.springframework.stereotype.Repository
;
@Repository
public
interface
WarnPushBaseMapper
{
public
void
add
(
WarnPushBase
warnPushBase
);
}
src/main/java/com/example/mykafkaproject/server/WarnPushBaseServer.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
.
server
;
import
cn.hutool.http.HttpRequest
;
import
com.alibaba.fastjson2.JSON
;
import
com.dahuatech.hutool.http.Method
;
import
com.dahuatech.hutool.json.JSONObject
;
import
com.dahuatech.hutool.json.JSONUtil
;
import
com.dahuatech.icc.exception.ClientException
;
import
com.dahuatech.icc.oauth.http.DefaultClient
;
import
com.dahuatech.icc.oauth.http.IClient
;
import
com.dahuatech.icc.oauth.model.v202010.GeneralRequest
;
import
com.dahuatech.icc.oauth.model.v202010.GeneralResponse
;
import
com.example.mykafkaproject.bean.IotWarnPushDto
;
import
com.example.mykafkaproject.bean.WarnPushBase
;
import
com.example.mykafkaproject.mapper.WarnPushBaseMapper
;
import
com.example.mykafkaproject.utils.PinYinUtil
;
import
com.example.mykafkaproject.utils.RtspToMP4
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
@Service
(
"WarnPushBaseServer"
)
public
class
WarnPushBaseServer
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
WarnPushBaseServer
.
class
);
@Autowired
WarnPushBaseMapper
warnPushBaseMapper
;
@Autowired
private
RtspToMP4
rtspToMP4
;
public
void
addMessage
(
String
message
)
{
LOGGER
.
info
(
"预警信息:"
+
message
);
WarnPushBase
warnPushBase
=
JSON
.
parseObject
(
message
,
WarnPushBase
.
class
);
//获取视频地址
//获取前后五秒时间戳
Date
alarmTime
=
warnPushBase
.
getAlarmTime
();
String
video
=
getVideo
(
warnPushBase
.
getChannelId
(),
alarmTime
.
getTime
()
/
1000
-
5
,
alarmTime
.
getTime
()
/
1000
+
5
);
warnPushBase
.
setVideo
(
video
);
warnPushBaseMapper
.
add
(
warnPushBase
);
//发送iot的http请求
iotPush
(
warnPushBase
);
}
public
String
getVideo
(
String
channelId
,
Long
startTime
,
Long
endTime
)
{
try
{
//获取录像视频流地址
String
url
=
rtsp1
(
channelId
,
startTime
,
endTime
);
//录像视频流转mp4文件
final
Process
[]
process
=
new
Process
[
1
];
final
String
fileName
=
channelId
+
"_"
+
startTime
+
".mp4"
;
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
process
[
0
]
=
rtspToMP4
.
StartRecord
(
"F:\\ffmpeg\\ffmpeg-2023-10-29-git-2532e832d2-full_build\\bin\\ffmpeg.exe"
,
url
,
"F:\\ruoyi\\uploadPath\\MP4\\"
+
fileName
);
}
}).
start
();
Thread
.
sleep
(
3
*
1000
);
rtspToMP4
.
stopRecord
(
process
[
0
]);
return
"http://172.16.20.211:8080/profile/MP4/"
+
fileName
;
}
catch
(
ClientException
e
)
{
e
.
printStackTrace
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
return
"录像错误"
;
}
public
String
rtsp1
(
String
channelId
,
Long
startTime
,
Long
endTime
)
throws
ClientException
{
IClient
iClient
=
new
DefaultClient
();
GeneralRequest
generalRequest
=
new
GeneralRequest
(
"/evo-apigw/admin/API/SS/Playback/StartPlaybackByTime"
,
Method
.
POST
);
String
body
=
"{\n"
+
" \"data\": {\n"
+
" \"channelId\": \""
+
channelId
+
"\",\n"
+
" \"recordSource\": \"3\",\n"
+
" \"startTime\": \""
+
startTime
+
"\",\n"
+
" \"endTime\": \""
+
endTime
+
"\",\n"
+
" \"streamType\": \"1\",\n"
+
" \"recordType\": \"1\"\n"
+
" }\n"
+
"}"
;
generalRequest
.
body
(
body
);
GeneralResponse
response
=
iClient
.
doAction
(
generalRequest
,
generalRequest
.
getResponseClass
());
String
result
=
response
.
getResult
();
JSONObject
obj
=
JSONUtil
.
parseObj
(
response
.
getResult
());
String
url
=
obj
.
getJSONObject
(
"data"
).
getStr
(
"url"
);
String
token
=
obj
.
getJSONObject
(
"data"
).
getStr
(
"token"
);
String
[]
parts
=
url
.
split
(
"\\|"
);
return
parts
[
0
]
+
"?token="
+
token
+
"&trackID=0"
;
}
public
void
iotPush
(
WarnPushBase
warnPushBase
){
SimpleDateFormat
simpleDateFormat
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
String
url
=
"http://172.16.20.170:8089/api/v1/aqgl/warn"
;
IotWarnPushDto
iotWarnPushDto
=
new
IotWarnPushDto
();
iotWarnPushDto
.
setOrder_name
(
"智能安全系纺"
);
iotWarnPushDto
.
setAlarm_device_name
(
warnPushBase
.
getAlarmDeviceName
());
iotWarnPushDto
.
setGrade_name
(
warnPushBase
.
getGradeName
());
iotWarnPushDto
.
setGrade_type
(
"摄像头预警"
);
iotWarnPushDto
.
setGrade_type_code
(
PinYinUtil
.
getPinYinHeadChar
(
iotWarnPushDto
.
getGrade_type
()));
iotWarnPushDto
.
setAlarm_time
(
simpleDateFormat
.
format
(
warnPushBase
.
getAlarmTime
()));
iotWarnPushDto
.
setPicture
(
warnPushBase
.
getPicture
());
iotWarnPushDto
.
setBelong
(
warnPushBase
.
getBelong
());
iotWarnPushDto
.
setCreate_time
(
simpleDateFormat
.
format
(
new
Date
()));
String
json
=
JSON
.
toJSONString
(
iotWarnPushDto
);
String
result
=
HttpRequest
.
post
(
url
)
.
body
(
json
)
.
execute
().
body
();
System
.
out
.
println
(
"IOT返回结果:"
+
result
);
}
}
src/main/java/com/example/mykafkaproject/utils/PinYinUtil.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
.
utils
;
import
net.sourceforge.pinyin4j.PinyinHelper
;
/**
* @Author: yf
* @Description: 提取每个字符的首字母(大写)
* @DateTime: 17:20 2023/4/27
* @Params:
* @Return
*/
public
class
PinYinUtil
{
public
static
String
getPinYinHeadChar
(
String
str
)
{
if
(
str
==
null
||
str
.
trim
().
equals
(
""
))
{
return
""
;
}
String
convert
=
""
;
for
(
int
j
=
0
;
j
<
str
.
length
();
j
++)
{
char
word
=
str
.
charAt
(
j
);
// 提取字符的首字母
String
[]
pinyinArray
=
PinyinHelper
.
toHanyuPinyinStringArray
(
word
);
if
(
pinyinArray
!=
null
)
{
convert
+=
pinyinArray
[
0
].
charAt
(
0
);
}
else
{
convert
+=
word
;
}
}
// 去除字符中包含的空格
convert
=
convert
.
replace
(
" "
,
""
);
// 字符转小写
// convert.toLowerCase();
return
convert
.
toUpperCase
();
}
}
src/main/java/com/example/mykafkaproject/utils/RtspToMP4.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
.
utils
;
import
org.springframework.stereotype.Component
;
import
java.io.InputStream
;
import
java.io.InputStreamReader
;
import
java.io.OutputStream
;
import
java.text.SimpleDateFormat
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.List
;
@Component
public
class
RtspToMP4
{
public
static
class
In
implements
Runnable
{
private
InputStream
inputStream
;
public
In
(
InputStream
inputStream
)
{
this
.
inputStream
=
inputStream
;
}
@Override
public
void
run
()
{
try
{
//转成字符输入流
InputStreamReader
inputStreamReader
=
new
InputStreamReader
(
inputStream
,
"gbk"
);
int
len
=
-
1
;
char
[]
c
=
new
char
[
1024
];
//读取进程输入流中的内容
while
((
len
=
inputStreamReader
.
read
(
c
))
!=
-
1
)
{
String
s
=
new
String
(
c
,
0
,
len
);
System
.
out
.
print
(
s
);
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
public
Process
StartRecord
(
String
ffmpegPath
,
String
streamUrl
,
String
FilePath
){
ProcessBuilder
processBuilder
=
new
ProcessBuilder
();
//定义命令内容
List
<
String
>
command
=
new
ArrayList
<>();
command
.
add
(
ffmpegPath
);
command
.
add
(
"-i"
);
command
.
add
(
streamUrl
);
command
.
add
(
"-c:v"
);
command
.
add
(
"copy"
);
command
.
add
(
"-an"
);
command
.
add
(
"-reset_timestamps"
);
command
.
add
(
"1"
);
command
.
add
(
"-f"
);
command
.
add
(
"mp4"
);
command
.
add
(
FilePath
);
processBuilder
.
command
(
command
);
System
.
out
.
println
(
"脚本:"
+
command
.
toString
());
//将标准输入流和错误输入流合并,通过标准输入流读取信息
processBuilder
.
redirectErrorStream
(
true
);
try
{
//启动进程
Process
process
=
processBuilder
.
start
();
System
.
out
.
println
(
"开始时间:"
+
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
).
format
(
new
Date
(
System
.
currentTimeMillis
())));
//获取输入流
InputStream
inputStream
=
process
.
getInputStream
();
Thread
inThread
=
new
Thread
(
new
In
(
inputStream
));
inThread
.
start
();
return
process
;
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
null
;
}
public
boolean
stopRecord
(
Process
process
)
{
try
{
OutputStream
os
=
process
.
getOutputStream
();
os
.
write
(
"q"
.
getBytes
());
// 一定要刷新
os
.
flush
();
os
.
close
();
}
catch
(
Exception
err
)
{
err
.
printStackTrace
();
return
false
;
}
return
true
;
}
}
src/main/resources/application.yml
0 → 100644
View file @
3864c971
# 开发环境配置
server
:
# 服务器的HTTP端口,默认为80
port
:
8088
spring
:
datasource
:
driver-class-name
:
com.mysql.cj.jdbc.Driver
url
:
jdbc:mysql://172.16.20.60:3306/whaq?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowMultiQueries=true
username
:
root
password
:
mysql
mybatis
:
mapper-locations
:
classpath:/mapper/*.xml
type-aliases-package
:
com.example.mykafkaproject.bean
logging
:
file
:
path
:
F:/log
src/main/resources/config/iccSdk.properties
0 → 100644
View file @
3864c971
#服务地址 端口默认是443
#icc.sdk.host=115.236.17.59:9021
icc.sdk.host
=
172.16.51.2
#是否启用https访问,默认:是
#icc.sdk.enable.https=false
#认证类型=[client_credentials],[password],client_credentials:客户端鉴权模式;password:用户密码鉴权模式
icc.sdk.grantType
=
password
#-----------客户端鉴权模式--------------
#客户端鉴权模式申请访问凭证id
icc.sdk.clientId
=
CompanyName
#客户端鉴权模式申请访问凭证密钥
icc.sdk.clientSecret
=
67987667-8651-44f1-a3da-f6796e5f23e6
#客户端鉴权模式默认用户,默认是1,超级管理员
#icc.sdk.config.client.userId=1
#-----------用户密码鉴权模式--------------
#用户密码鉴权模式申请访问凭证id
icc.sdk.pwdClientId
=
CompanyName
#用户密码鉴权模式申请访问凭证密钥
icc.sdk.pwdClientSecret
=
67987667-8651-44f1-a3da-f6796e5f23e6
#用户名
icc.sdk.username
=
system
#密码
icc.sdk.password
=
Admin@123
#http调试模式 false:开启,true:关闭
icc.sdk.enable.https
=
true
src/main/resources/mapper/WarnPushBaseMapper.xml
0 → 100644
View file @
3864c971
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace=
"com.example.mykafkaproject.mapper.WarnPushBaseMapper"
>
<insert
id=
"add"
parameterType=
"com.example.mykafkaproject.bean.WarnPushBase"
useGeneratedKeys=
"true"
keyProperty=
"id"
>
insert into warn_push_base
<trim
prefix=
"("
suffix=
")"
suffixOverrides=
","
>
<if
test=
"orderName != null and orderName != ''"
>
order_name,
</if>
<if
test=
"alarmDeviceName != null and alarmDeviceName != ''"
>
alarm_device_name,
</if>
<if
test=
"gradeName != null and gradeName != ''"
>
grade_name,
</if>
<if
test=
"gradeType != null and gradeType != ''"
>
grade_type,
</if>
<if
test=
"alarmTime != null "
>
alarm_time,
</if>
<if
test=
"picture != null and picture != ''"
>
picture,
</if>
<if
test=
"video != null and video != ''"
>
video,
</if>
<if
test=
"belong != null and belong != ''"
>
belong,
</if>
<if
test=
"status != null and status != ''"
>
status,
</if>
<if
test=
"sendUserIds != null and sendUserIds != ''"
>
send_user_ids,
</if>
<if
test=
"delFlag != null "
>
del_flag,
</if>
<if
test=
"remark != null "
>
remark,
</if>
</trim>
<trim
prefix=
"values ("
suffix=
")"
suffixOverrides=
","
>
<if
test=
"orderName != null and orderName != ''"
>
#{orderName},
</if>
<if
test=
"alarmDeviceName != null and alarmDeviceName != ''"
>
#{alarmDeviceName},
</if>
<if
test=
"gradeName != null and gradeName != ''"
>
#{gradeName},
</if>
<if
test=
"gradeType != null and gradeType != ''"
>
#{gradeType},
</if>
<if
test=
"alarmTime != null "
>
#{alarmTime},
</if>
<if
test=
"picture != null and picture != ''"
>
#{picture},
</if>
<if
test=
"video != null and video != ''"
>
#{video},
</if>
<if
test=
"belong != null and belong != ''"
>
#{belong},
</if>
<if
test=
"status != null and status != ''"
>
#{status},
</if>
<if
test=
"sendUserIds != null and sendUserIds != ''"
>
#{sendUserIds},
</if>
<if
test=
"delFlag != null "
>
#{delFlag},
</if>
<if
test=
"remark != null "
>
#{remark},
</if>
</trim>
</insert>
</mapper>
src/test/java/com/example/mykafkaproject/MyKafkaProjectApplicationTests.java
0 → 100644
View file @
3864c971
package
com
.
example
.
mykafkaproject
;
import
org.junit.jupiter.api.Test
;
import
org.springframework.boot.test.context.SpringBootTest
;
@SpringBootTest
class
MyKafkaProjectApplicationTests
{
@Test
void
contextLoads
()
{
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment