Commit aa4d8424 authored by 周昊's avatar 周昊

1、添加更新场景存活时间接口

parent e294f93b
...@@ -8,6 +8,7 @@ import com.censoft.flink.mqtt.MqttConsumer; ...@@ -8,6 +8,7 @@ import com.censoft.flink.mqtt.MqttConsumer;
import com.censoft.flink.transform.AlgorithmBaseFilterFunction; import com.censoft.flink.transform.AlgorithmBaseFilterFunction;
import com.censoft.flink.transform.AlgorithmTypeFlatMapFunction; import com.censoft.flink.transform.AlgorithmTypeFlatMapFunction;
import com.censoft.flink.transform.ResultExistFilterFunction; import com.censoft.flink.transform.ResultExistFilterFunction;
import com.censoft.flink.transform.UpdateLiveFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
...@@ -47,8 +48,12 @@ public class StreamingJob { ...@@ -47,8 +48,12 @@ public class StreamingJob {
DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/test/" + sceneId)); DataStream<AlgorithmPushDto> mqttStream = env.addSource(new MqttConsumer("/censoft/test/" + sceneId));
//3、进行处理 //3、进行处理
//3、1 默认 是否存在结果集判断 //3、0 更新场景在线时间
SingleOutputStreamOperator<AlgorithmPushDto> streamOperator = mqttStream SingleOutputStreamOperator<AlgorithmPushDto> streamOperator = mqttStream
.filter(new UpdateLiveFilterFunction(sceneId));
//3、1 默认 是否存在结果集判断
streamOperator = streamOperator
.filter(new ResultExistFilterFunction()); .filter(new ResultExistFilterFunction());
//3、2 默认 根据分类,分解多个推送信息 //3、2 默认 根据分类,分解多个推送信息
...@@ -74,7 +79,7 @@ public class StreamingJob { ...@@ -74,7 +79,7 @@ public class StreamingJob {
//3、6 打印输出 //3、6 打印输出
outputStreamOperator.print(); outputStreamOperator.print();
outputStreamOperator.writeAsText("D:/word1.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1); // outputStreamOperator.writeAsText("D:/word1.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
//3、7 自动执行 //3、7 自动执行
env.execute(); env.execute();
} }
......
...@@ -11,4 +11,6 @@ public interface AlgorithmSceneDao { ...@@ -11,4 +11,6 @@ public interface AlgorithmSceneDao {
public AlgorithmSceneBasePo getAlgorithmSceneBasePo(@Param("sceneId") Long sceneId); public AlgorithmSceneBasePo getAlgorithmSceneBasePo(@Param("sceneId") Long sceneId);
public List<AlgorithmScenePiecePo> getAlgorithmScenePieceList(@Param("sceneId") Long sceneId); public List<AlgorithmScenePiecePo> getAlgorithmScenePieceList(@Param("sceneId") Long sceneId);
public int updateLiveBySceneId(Long sceneId);
} }
...@@ -15,6 +15,11 @@ ...@@ -15,6 +15,11 @@
<result property="variableValue" column="variable_value"/> <result property="variableValue" column="variable_value"/>
</collection> </collection>
</resultMap> </resultMap>
<update id="updateLiveBySceneId">
UPDATE algorithm_scene_base SET live = NOW() WHERE id = #{0}
</update>
<select id="getAlgorithmSceneBasePo" resultType="com.censoft.flink.domain.AlgorithmSceneBasePo"> <select id="getAlgorithmSceneBasePo" resultType="com.censoft.flink.domain.AlgorithmSceneBasePo">
SELECT scb.id AS id, SELECT scb.id AS id,
scb.scene_name AS sceneName, scb.scene_name AS sceneName,
...@@ -39,4 +44,6 @@ ...@@ -39,4 +44,6 @@
WHERE asp.scene_id = #{sceneId} WHERE asp.scene_id = #{sceneId}
ORDER BY asp.sort ASC ORDER BY asp.sort ASC
</select> </select>
</mapper> </mapper>
\ No newline at end of file
...@@ -42,6 +42,20 @@ public class SqlFactory { ...@@ -42,6 +42,20 @@ public class SqlFactory {
return algorithmScenePiecePoList; return algorithmScenePiecePoList;
} }
public int updateLiveBySceneId(Long sceneId) throws IOException {
SqlSession session = initSqlSession();
//指定要执行的SQL语句的id
//SQL语句的id=namespace+"."+SQL语句所在标签的id属性的值
String SQLID = "com.censoft.flink.mapper.AlgorithmSceneDao" + "." + "updateLiveBySceneId";
//通过SqlSession对象的方法执行SQL语句
int result = session.update(SQLID,sceneId);
//提交事务
session.commit();
//最后我们关闭SqlSession对象
session.close();
return result;
}
private SqlSession initSqlSession() throws IOException { private SqlSession initSqlSession() throws IOException {
/* /*
myBatis的核心类是SqlSessionFactory myBatis的核心类是SqlSessionFactory
......
...@@ -47,7 +47,6 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu ...@@ -47,7 +47,6 @@ public class AlgorithmTypeFlatMapFunction implements FlatMapFunction<AlgorithmPu
}).collect(Collectors.toList()); }).collect(Collectors.toList());
for (AlgorithmPushDto pushDto : collect) { for (AlgorithmPushDto pushDto : collect) {
logger.error("分流:" + pushDto);
collector.collect(pushDto); collector.collect(pushDto);
} }
......
package com.censoft.flink.transform;
import com.censoft.flink.domain.AlgorithmPushDto;
import com.censoft.flink.mapper.SqlFactory;
import java.io.IOException;
/**
* @author 周昊
* @desc 更新场景存活时间
*/
public class UpdateLiveFilterFunction implements AlgorithmBaseFilterFunction {
//sceneId 场景id
private final Long sceneId;
public UpdateLiveFilterFunction(Long sceneId) {
this.sceneId = sceneId;
}
@Override
public boolean filter(AlgorithmPushDto algorithmPushDto) throws IOException {
SqlFactory sqlFactory = new SqlFactory();
sqlFactory.updateLiveBySceneId(sceneId);
return true;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment