Flink重写watermark
问题
时间信息是形如”2022/10/22 10:34”的字符串,怎样针对这种时间信息,为Flink重写assignTimestampsAndWatermarks
如果这个字符串已经被解析为一个Java时间对象,例如java.util.Date或java.time.Instant,那么可以在assignTimestampsAndWatermarks函数中将它作为参数传入。例如:
Copy codeDataStream<YourType> stream = ...;
// 假设eventTime是java.util.Date类型
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
@Override
public long extractTimestamp(YourType element) {
return element.getEventTime().getTime();
}
});
如果字符串未被解析为Java时间对象,则需要先将其解析为一个时间对象,然后再在assignTimestampsAndWatermarks函数中使用它。可以使用SimpleDateFormat类来解析时间字符串:
Copy codeDataStream<YourType> stream = ...;
// 假设timeString是"2022/10/22 10:34"这样的字符串,并且它已经被解析为java.util.Date类型
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm");
final Date eventTime = format.parse(timeString);
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
@Override
public long extractTimestamp(YourType element) {
return eventTime.getTime();
}
});
需要注意的是,在实际的Flink程序中,需要确保这些代码在并行执行的情况下也能正常工作。例如,如果在一个MapFunction中使用了SimpleDateFormat类,那么这个类必须是线程安全的,否则会导致程序出错。