Flink重写watermark

问题

时间信息是形如”2022/10/22 10:34”的字符串,怎样针对这种时间信息,为Flink重写assignTimestampsAndWatermarks

如果这个字符串已经被解析为一个Java时间对象,例如java.util.Date或java.time.Instant,那么可以在assignTimestampsAndWatermarks函数中将它作为参数传入。例如:

Copy codeDataStream<YourType> stream = ...;

// 假设eventTime是java.util.Date类型
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return element.getEventTime().getTime();
    }
});

如果字符串未被解析为Java时间对象,则需要先将其解析为一个时间对象,然后再在assignTimestampsAndWatermarks函数中使用它。可以使用SimpleDateFormat类来解析时间字符串:

Copy codeDataStream<YourType> stream = ...;

// 假设timeString是"2022/10/22 10:34"这样的字符串,并且它已经被解析为java.util.Date类型
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm");
final Date eventTime = format.parse(timeString);

stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return eventTime.getTime();
    }
});

需要注意的是,在实际的Flink程序中,需要确保这些代码在并行执行的情况下也能正常工作。例如,如果在一个MapFunction中使用了SimpleDateFormat类,那么这个类必须是线程安全的,否则会导致程序出错。