流式处理和批处理

Kafka不光是一个消息引擎系统,同时也是一个分布式流式处理框架.

批处理

批量的查询,统计数据.比如分页从mysql获取数据,进行统计.

优势:数据精确.

劣势:速度慢.

流式处理

细粒度的对数据的统计,比如监控某个服务器运行情况统计,实时计算每秒产生的参数对统计的影响.

优势:速度快.

劣势:数据会趋于正确性.

流表二元性

流就是一个永不停止的事件序列,用KStream表示.

表是事件的聚合,一组行记录,用KTable表示.

流在时间维度上不断聚合形成表,表在时间维度上不断更新形成流,这就是流表二元性.

时间窗口

固定时间窗口,滑动时间窗口,会话窗口.

实例:统计名称出现次数

package kafkalearn.demo.wordcount;
 
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
 
 
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
 
public final class WordCountDemo {
    public static void main(final String[] args) {
        //设置
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-demo");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         //构建流计算
        final StreamsBuilder builder = new StreamsBuilder();
         //监听事件,构建流
        final KStream<String, String> source = builder.stream("wordcount-input-topic");
         //聚合流转化为表
        final KTable<String, Long> counts = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            .groupBy((key, value) -> value)
            .count();
         //表转化为流
        counts.toStream().to("wordcount-output-topic", Produced.with(Serdes.String(), Serdes.Long()));
 
         //运行流计算
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);
 
         //钩子
        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-stream-demo-jvm-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
 
 
        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0)

增加时间窗口

  //聚合流转化为表,时间窗口
        final KTable<Windowed, Long> counts = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
            .count();

Kafka流式处理优势

1.能容易实现端到端的正确性.

常见的大数据流式框架,更多是的对其框架内部的处理.

2.流式计算的定位

不需要部署太大的规模,对于一些只需要部署十几台的机器的数据上面来说,更有优势.

扩展

大数据领域中,map-reduce是批处理框架,spark和flink是流计算框架.

批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。

流处理的特点是无界、实时,无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在Spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。

而在Flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个无界限的流,这就是所谓额有界流和无界流

Last updated

Was this helpful?