FlinkSQL中Timestamp使⽤的坑
flink版本为1.10。苏梓玲三围
flink sql消费kafka消息,表定义为
CREATE TABLE start_log_source(
mid_id VARCHAR,
user_id INT,
...
app_time TIMESTAMP,-- 13位的时间戳(1587975971431)
WATERMARK FOR app_time AS app_time-INTERVAL'5'SECOND-- 在ts上定义5 秒延迟的 watermark
)WITH(
'pe'='kafka',-- 使⽤ kafka connector
'connector.version'='universal',-- kafka 版本,universal ⽀持 0.11 以上的版本
'pic'='start_log',-- kafka topic
'up.id'='start_log_group',
'connector.startup-mode'='earliest-offset',-- 从起始 offset 开始读取
't'='192.168.1.109:2181',-- zookeeper 地址印小天回应插刀教事件
'connector.properties.bootstrap.servers'='192.168.1.109:9092',-- kafka broker 地址
'pe'='json'-- 数据源格式为 json
);
关之琳喜欢刘德华吗以上代码运⾏时报错,⼤概信息是转换错误,查阅⽂档后发现应该是app_time定义有问题。如果app_time类型定义为TIMESTAMP,那它的值应该为2020-04-06T16:26:11类似的格式。
进⼀步查询发现watermark的⼀些使⽤要求:
WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS
watermark_strategy_expression 。
rowtime_column_name 把⼀个现有的列定义为⼀个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是⼀个计算列。
黄旭熙被抓但是app_time实际数据格式为13位的时间戳,也就是毫秒级的时间戳,TIMESTAMP(3)是秒级时间戳,则必须经过转换才⾏。刚开始使⽤TO_TIMESTAMP内置函数,但是它不⽀持数值型时间戳值转换为TIMESTAMP。经过⼀番折腾,终于成功,SQL如下:
CREATE TABLE start_log_source(
mid_id VARCHAR,
user_id INT,
李晨的石头...
app_time BIGINT,-- 13位的时间戳(1587975971431)
喻可欣个人资料ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time /1000,'yyyy-MM-dd HH:mm:ss')),-- 定义事件时间
WATERMARK FOR ts AS ts -INTERVAL'5'SECOND-- 在ts上定义5 秒延迟的 watermark
)WITH(
'pe'='kafka',-- 使⽤ kafka connector
'connector.version'='universal',-- kafka 版本,universal ⽀持 0.11 以上的版本
'pic'='start_log',-- kafka topic
'up.id'='start_log_group',
'connector.startup-mode'='earliest-offset',-- 从起始 offset 开始读取
't'='192.168.1.109:2181',-- zookeeper 地址
'connector.properties.bootstrap.servers'='192.168.1.109:9092',-- kafka broker 地址
'pe'='json'-- 数据源格式为 json
);
发布评论