记一次kfd(kafka+flink+doris)的实时操作

如题所述

在一次关于kafka+flink+doris的实时操作项目中,我们的目标是为某市医院构建一个统一的数据中心主索引,同时处理历史离线数据和增量实时数据,确保数据的实时性和准确性。我们的技术栈包括centos 7.2作为操作系统,mysql 5.7.30用于数据存储,kafka 2.11负责数据传输,flink 1.10.1作为流处理引擎,以及doris 0.12.0来高效地进行数据存储和分析。

离线处理策略是利用doris的强大能力,通过创建外部映射表并加载到ods库,生成包含患者身份证号的主索引。这个过程是关键的初始化步骤,为后续实时处理奠定了基础。

在增量处理方面,我们采取了高效的策略:首先,canal监控mysql的binlog,实时捕获数据库的更改,然后将增量数据推送到kafka。flink作为实时数据处理引擎,从kafka中读取这些变更,通过Redis缓存进行实时检查。当新数据的身份证号、档案号和医院代码匹配主索引时,flink会进行数据验证和处理,确保数据一致性,不存在时则生成新的索引,并将更新同步到Redis和doris。这一步骤对于实时索引的维护至关重要,避免了重复数据的插入。

项目依赖的库包括flink的kafka连接器、scala编程语言,以及log4j用于日志管理和Redis缓存。以下是一些关键依赖的配置:

org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}

mysql
mysql-connector-java
5.1.47

redis.clients
jedis
${redis.version}

dorisdb-maven-releases
http://username:[email protected]/repository/maven-releases/

在实施过程中,我们遇到了一些挑战,如FE资源限制和数据格式的兼容性问题。通过不断优化,我们最终选择使用HTTP PUT方法进行数据写入,并采取每100条数据或3秒一次的频率,确保了数据的高效处理。整个流程包括mysql本地数据的写入、canal的实时同步、kafka的数据推送,以及flink和doris之间的无缝对接,实现了实时数据的实时处理和更新。

通过这次实践,我们不仅提升了数据处理的实时性,还锻炼了团队在分布式系统中的协作和问题解决能力。这次kafka+flink+doris的结合,为医院的数据中心管理带来了显著的效率提升和数据一致性保障。
温馨提示:答案为网友推荐,仅供参考