EN
/news/show.php/video/53911167.html

芝法酱学习笔记(2.6)——flink

2025-06-24 12:02:23 来源: 新华社
字号:默认 超大 | 打印 |

一、需求背景

在有的项目中,尤其是进销存类的saas软件,一开始为了快速把产品做出来,并没有考虑缓存问题。而这类软件,有着复杂的业务逻辑。如果想在原先的代码中,添加redis缓存,改动面将非常大,还需要大量的测试工作。有些时候会有更离谱的情况,比如一些一些项目可能用JDK1.6写的,想要在这个框架下接入redis缓存,也会变得十分困难。
这时我们就会想到,能否像mysql的主从复制一样,监听mysql的binlog,对数据进行更新呢?Flink CDC就呼之欲出。

二、mysql环境搭建

需要注意的是,当前的flink-cdc,仅仅支持mysql8.0,8.4是完全不支持的。
由于我的mysql装的是8.4,为了方便起见,我们使用docker安装mysql8.0

2.1 docker-compose.yml

services:master:image:mysql:8.0.41    container_name:mysql-8restart:always    #mem_limit: 512Menvironment:MYSQL_ROOT_PASSWORD:study@2025      TZ:Asia/Shanghai    ports:-"3307:3306"volumes:-./cfg/my.cnf:/etc/my.cnf      -./data:/var/lib/mysql      -./initdb:/docker-entrypoint-initdb.d      -./dump:/var/dump      -./log:/var/log    networks:-mysql-clusternetworks:mysql-cluster:

2.2 初始化sql

-- 创建复制用户createrole role_app;GRANTSELECT,UPDATE,INSERT,DELETEON*.*torole_app;GRANTREPLICATIONSLAVE,REPLICATIONCLIENT ON*.*TOrole_app;CREATEUSER'app'@'%'IDENTIFIED WITHcaching_sha2_password by'study@2025'DEFAULTROLE role_app COMMENT'app user';FLUSH PRIVILEGES;-- 创建两个数据库,用于测试CREATESCHEMA`shop-center`;FLUSH TABLESWITHREADLOCK;

2.3 注意点

首先把容器卷 - ./cfg/my.cnf:/etc/my.cnf的这一句注释掉,启动服务
而后使用下面语句,把配置文件粘出来

dockerexec<id>cp/etc/my.cnf ./cfg/my.cnf

之后把注释打开,再重新启动

三、工程搭建与pom引用

3.1 主模块pom引用

flink程序不需要接入Spring框架,直接一个main就可运行。
但我们还想使用一些我们熟悉的接口,来操作redis和el。

<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-coreartifactId><version>1.20.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-javaartifactId><version>1.20.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clientsartifactId><version>1.20.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-runtimeartifactId><version>1.20.0version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-mysql-cdcartifactId><version>3.3.0version>dependency>

3.2 common-data模块

一些entity数据,为了保持各模块共通,最好独立到一个common模块。
同时,我还会把redis和el-search的操作,在这个模块接入并封装

3.2.1 pom引用

<dependencies><dependency><groupId>org.yamlgroupId><artifactId>snakeyamlartifactId><version>2.3version>dependency><dependency><groupId>co.elastic.clientsgroupId><artifactId>elasticsearch-javaartifactId><version>8.17.0version>dependency><dependency><groupId>org.elasticsearchgroupId><artifactId>elasticsearch-x-contentartifactId><version>8.17.0version>dependency><dependency><groupId>cn.hutoolgroupId><artifactId>hutool-coreartifactId><version>5.8.32version>dependency><dependency><groupId>org.projectlombokgroupId><artifactId>lombokartifactId><optional>trueoptional><scope>providedscope>dependency><dependency><groupId>org.springframework.datagroupId><artifactId>spring-data-redisartifactId><version>3.4.2version>dependency><dependency><groupId>com.alibaba.fastjson2groupId><artifactId>fastjson2-extension-spring6artifactId><version>2.0.54version>dependency><dependency><groupId>org.apache.commonsgroupId><artifactId>commons-pool2artifactId><version>2.12.1version>dependency><dependency><groupId>com.alibaba.fastjson2groupId><artifactId>fastjson2artifactId><version>2.0.54version>dependency><dependency><groupId>io.lettucegroupId><artifactId>lettuce-coreartifactId><version>6.4.2.RELEASEversion>dependency>dependencies>

3.2.2 一些基本的entity类

@DatapublicclassGenItemEntity{ Longid;Stringname;Longprice;Stringbrand;Stringspecification;Integerversion;}

四、 redis操作和elsearch操作的封装

4.1 redis操作的封装

在pom上,接入spring-data-redis
而后,我们可以使用我们熟悉的RedisTemplate来操作redis

publicclassRedisConfig{ publicRedisConfig(){ init();}protectedFastJsonConfigredisFastJson(){ FastJsonConfigconfig =newFastJsonConfig();config.setWriterFeatures(JSONWriter.Feature.WriteNullListAsEmpty,// 写入类名JSONWriter.Feature.WriteClassName,// 将 Boolean 类型的 null 转成 falseJSONWriter.Feature.WriteNullBooleanAsFalse,JSONWriter.Feature.WriteEnumsUsingName);config.setReaderFeatures(JSONReader.Feature.SupportClassForName,// 支持autoTypeJSONReader.Feature.SupportAutoType);returnconfig;}protectedFastJsonRedisSerializerfastJsonRedisSerializer(FastJsonConfigpFastJsonConfig){ FastJsonRedisSerializerfastJsonRedisSerializer =newFastJsonRedisSerializer(Object.class);fastJsonRedisSerializer.setFastJsonConfig(pFastJsonConfig);returnfastJsonRedisSerializer;}protectedRedisConnectionFactoryredisConnectionFactory(){ // 这里最好读配置,我懒得搞了RedisStandaloneConfigurationredisConfiguration =newRedisStandaloneConfiguration("192.168.0.64",6379);redisConfiguration.setPassword("study@2025");GenericObjectPoolConfig<?>poolConfig =newGenericObjectPoolConfig<>();poolConfig.setMaxTotal(2);// 最大连接数poolConfig.setMaxIdle(2);// 最大空闲连接数poolConfig.setMinIdle(2);// 最小空闲连接数poolConfig.setMaxWait(Duration.ofMillis(3000));// 连接等待时间ClientResourcesclientResources =DefaultClientResources.create();LettucePoolingClientConfigurationlettucePoolingClientConfiguration =LettucePoolingClientConfiguration.builder().poolConfig(poolConfig).build();LettucePoolingClientConfigurationclientConfig =LettucePoolingClientConfiguration.builder().clientResources(clientResources).commandTimeout(Duration.ofSeconds(5)).poolConfig(poolConfig).build();LettuceConnectionFactoryredisConnectionFactory =newLettuceConnectionFactory(redisConfiguration,lettucePoolingClientConfiguration);redisConnectionFactory.afterPropertiesSet();// 初始化连接工厂returnredisConnectionFactory;}protectedRedisTemplate<String,Object>redisTemplate(RedisConnectionFactoryfactory,FastJsonRedisSerializerpFastJsonRedisSerializer){ RedisTemplate<String,Object>redisTemplate =newRedisTemplate<String,Object>();redisTemplate.setConnectionFactory(factory);redisTemplate.setEnableTransactionSupport(true);redisTemplate.setKeySerializer(newStringRedisSerializer());redisTemplate.setValueSerializer(pFastJsonRedisSerializer);redisTemplate.setHashKeySerializer(newStringRedisSerializer());redisTemplate.setHashValueSerializer(pFastJsonRedisSerializer);returnredisTemplate;}protectedvoidinit(){ mFastJsonConfig =redisFastJson();mFastJsonRedisSerializer =fastJsonRedisSerializer(mFastJsonConfig);mRedisConnectionFactory =redisConnectionFactory();mRedisTemplate =redisTemplate(mRedisConnectionFactory,mFastJsonRedisSerializer);mRedisTemplate.afterPropertiesSet();}privateFastJsonConfigmFastJsonConfig;privateFastJsonRedisSerializermFastJsonRedisSerializer;privateRedisConnectionFactorymRedisConnectionFactory;privateRedisTemplate<String,Object>mRedisTemplate;publicstaticRedisTemplate<String,Object>redisTemplate(){ returnHolder.INSTANCE.mRedisTemplate;}publicstatic<T>Stringserialize(Tentity){ returnJSON.toJSONString(entity,Holder.INSTANCE.mFastJsonConfig.getWriterFeatures());}privatestaticclassHolder{ privatestaticfinalRedisConfigINSTANCE=newRedisConfig();}}

4.2 elasticsearch操作的封装

由于el-search的连接器,需要配置apikey,以及https,我们最好使用yml配置,并且把http_ca.crt放进该模块的resouce中。
在IDEA环境下,有可能找不到子模块的资源,这时在主模块引入子模块时,只需要这样配置即可:

<dependency><groupId>indi.zhifa.study2025groupId><artifactId>common-dataartifactId><version>${ project.version}version><scope>compilescope>dependency>

注意,重点是compile

publicclassEsClientConfig{ @Setter@GetterprivateStringhost;@Setter@GetterprivateIntegerport;@Setter@GetterprivateStringapiKey;}
publicclassElasticSearchClientProvider{ privateEsClientConfigesClientConfig;privateRestClientBuilderbuilder;publicElasticSearchClientProvider(){ try{ init();}catch(Exceptione){ e.printStackTrace();}}publicvoidinit()throwsIOException{ Yamlyaml =newYaml();try(InputStreaminputStream =FileUtil.class.getClassLoader().getResourceAsStream("el-config.yml")){ if(inputStream ==null){ thrownewIllegalArgumentException("File not found: el-config.yml");}esClientConfig =yaml.loadAs(inputStream,EsClientConfig.class);}catch(Exceptione){ thrownewRuntimeException("Failed to load YAML file",e);}SSLContextsslContext;try(InputStreaminputStream =FileUtil.class.getClassLoader().getResourceAsStream("http_ca.crt")){ sslContext =TransportUtils.sslContextFromHttpCaCrt(inputStream);}catch(Exceptione){ thrownewRuntimeException("Failed to load http_ca.crt",e);}builder =RestClient.builder(newHttpHost(esClientConfig.getHost(),esClientConfig.getPort(),"https")// 替换为你的Elasticsearch地址).setDefaultHeaders(newHeader[]{ newBasicHeader("Authorization","ApiKey "+esClientConfig.getApiKey())}).setFailureListener(newRestClient.FailureListener(){ @OverridepublicvoidonFailure(Nodenode){ super.onFailure(node);}}).setHttpClientConfigCallback(hc->hc.setSSLContext(sslContext));}publicElasticsearchClientget(){ RestClientrestClient =builder.build();ElasticsearchTransporttransport =newRestClientTransport(restClient,newJacksonJsonpMapper());ElasticsearchClientesClient =newElasticsearchClient(transport);returnesClient;}publicstaticElasticSearchClientProvidergetInstance(){ returnHolder.INSTANCE;}privatestaticclassHolder{ privatestaticfinalElasticSearchClientProviderINSTANCE=newElasticSearchClientProvider();}}

五、 redis和elsearch的自定义sink编写

5.1 redis的sink编写

我们希望传入redis时,数据是被处理好的,redis的sink不需要处理任何逻辑,只管更新缓存和删除缓存。

5.1.1 RedisSinkCommand

publicclassRedisSinkCommand<T>{ @Setter@GetterprotectedERedisCommandcommand;@Setter@Getterprotectedlongdua;@Setter@GetterprotectedStringkey;@Setter@GetterprotectedTvalue;publicvoidinitSet(StringpKey,TpValue){ command =ERedisCommand.SET;dua =300;key =pKey;value =pValue;}publicvoidinitDel(StringpKey){ command =ERedisCommand.DEL;key =pKey;}}
publicenumERedisCommand{ SET,DEL}

5.1.2 SpringDataRedisSink

@Slf4jpublicclassSpringDataRedisSink<T>implementsSink<RedisSinkCommand<T>>{ @OverridepublicSinkWriter<RedisSinkCommand<T>>createWriter(InitContextcontext)throwsIOException{ returnnull;}@OverridepublicSinkWriter<RedisSinkCommand<T>>createWriter(WriterInitContextcontext){ returnnewLettuceRedisSinkWriter();}classLettuceRedisSinkWriterimplementsSinkWriter<RedisSinkCommand<T>>{ @Overridepublicvoidwrite(RedisSinkCommand<T>pCmd,Contextcontext)throwsIOException,InterruptedException{ RedisTemplate<String,Object>redisTemplate =RedisConfig.redisTemplate();switch(pCmd.getCommand()){ caseSET->{ redisTemplate.opsForValue().set(pCmd.getKey(),pCmd.getValue(),pCmd.getDua());}caseDEL->{ redisTemplate.delete(pCmd.getKey());}}}@Overridepublicvoidflush(booleanendOfInput)throwsIOException,InterruptedException{ }@Overridepublicvoidclose()throwsException{ }}}

5.2 elasticsearch的sink编写

elasticsearch的sink与redis的要求一致,在sink中不关心业务逻辑

5.2.1 ElCommand

@DatapublicclassElCommand<T>{ protectedEElCommandcommand;protectedStringindex;protectedTentity;protectedStringid;}
publicenumEElCommand{ CREATE,UPDATE,DELETE}

5.2.2 ElSearchSink

publicclassElSearchSink<T>implementsSink<ElCommand<T>>{ @OverridepublicSinkWriter<ElCommand<T>>createWriter(InitContextcontext)throwsIOException{ returnnull;}@OverridepublicSinkWriter<ElCommand<T>>createWriter(WriterInitContextcontext){ returnnewElSearchSink.ElSearchSinkWriter();}classElSearchSinkWriterimplementsSinkWriter<ElCommand<T>>{ @Overridepublicvoidwrite(ElCommand<T>pCmd,Contextcontext)throwsIOException,InterruptedException{ ElasticSearchClientProviderelasticSearchClientProvider =ElasticSearchClientProvider.getInstance();ElasticsearchClientelClient =elasticSearchClientProvider.get();Stringindex =pCmd.getIndex();Stringid =pCmd.getId();Tentity =pCmd.getEntity();switch(pCmd.getCommand()){ caseCREATE,UPDATE->{ elClient.index(i->i.index(index).id(id).document(entity));}caseDELETE->{ elClient.delete(d->d.index(index).id(id));}}}@Overridepublicvoidflush(booleanendOfInput)throwsIOException,InterruptedException{ }@Overridepublicvoidclose()throwsException{ }}}

六、主函数编写

publicclassFlinkMain{ publicstaticvoidmain(String[]args)throwsException{ MySqlSource<String>mySqlSource =MySqlSource.<String>builder().hostname("192.168.0.64").port(3307).databaseList("shop-center")// set captured database.tableList("shop-center.item")// set captured table.username("app").password("study@2025").serverTimeZone("Asia/Shanghai").deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.startupOptions(StartupOptions.latest()).includeSchemaChanges(true).build();//        FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()//                .setHost("192.168.0.64") // 替换为 Redis 主机//                .setPort(6379) // Redis 端口//                .setPassword("ilv0404@1314") // 如果有密码,设置密码//                .build();StreamExecutionEnvironmentenv =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//        DataStream mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source")//                .map(str->{ //                    BinlogInfo res =JSONObject.parseObject(str, BinlogInfo.class);//                    return res;//                    }//                 ).filter(bi->bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"));////        mysqlStream.addSink(new RedisSink(jedisConfig,new RedisItemMapper()));DataStream<RedisSinkCommand<GenItemEntity>>newMysqlStream =env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"Mysql source to redis").map(str->JSONObject.parseObject(str,newTypeReference<BinlogInfo<GenItemEntity>>(){ }),TypeInformation.of(newTypeHint<BinlogInfo<GenItemEntity>>(){ })).filter(bi->bi.getSource().getTable().equals("item")&&(bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"))).map(bi->{ Stringop =bi.getOp();GenItemEntityitemEntity =bi.getAfter();Stringkey ="item:"+itemEntity.getId();switch(op){ case"c","u"->{ RedisSinkCommand<GenItemEntity>redisSinkCommand =newRedisSinkCommand();redisSinkCommand.initSet(key,itemEntity);returnredisSinkCommand;}case"d"->{ RedisSinkCommand<GenItemEntity>redisSinkCommand =newRedisSinkCommand();redisSinkCommand.initDel(key);returnredisSinkCommand;}default->{ RedisSinkCommand<GenItemEntity>redisSinkCommand =newRedisSinkCommand();redisSinkCommand.initDel(key);returnredisSinkCommand;}}},TypeInformation.of(newTypeHint<RedisSinkCommand<GenItemEntity>>(){ }));newMysqlStream.sinkTo(newSpringDataRedisSink<GenItemEntity>());DataStream<ElCommand<GenItemEntity>>mySqlToElStream =env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"Mysql source to el").map(str->JSONObject.parseObject(str,newTypeReference<BinlogInfo<GenItemEntity>>(){ }),TypeInformation.of(newTypeHint<BinlogInfo<GenItemEntity>>(){ })).filter(bi->bi.getSource().getTable().equals("item")&&(bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"))).map(bi->{ ElCommandelCommand =newElCommand();GenItemEntityitemEntity =bi.getAfter();elCommand.setId(itemEntity.getId().toString());elCommand.setEntity(itemEntity);elCommand.setIndex("item_npc");Stringop =bi.getOp();switch(op){ case"c"->elCommand.setCommand(EElCommand.CREATE);case"u"->elCommand.setCommand(EElCommand.UPDATE);case"d"->elCommand.setCommand(EElCommand.DELETE);}returnelCommand;},TypeInformation.of(newTypeHint<ElCommand<GenItemEntity>>(){ }));mySqlToElStream.sinkTo(newElSearchSink());env.execute();}}

七、代码展示

请道友移步码云

八、相关实践的思考

8.1 redis相关

我这里的代码,仅仅是学习用的。在真实项目中,redis缓存的更新,通常源于查询时,如果发现缓存中没有数据,则查mysql,并把缓存数据加入redis。如果监听到表数据的更改或删除,则直接删除相应缓存,等待查询时重新加入缓存。当然,这样做在同一数据并发访问时,会有重复设置缓存的可能性,我们把这种现象叫缓存穿透。可以在更新缓存前,用redisson加个锁,防止重复读取mysql并更新redis。

publicclassCacheService{ @AutowiredprivateRedissonClientredissonClient;@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@AutowiredprivateDataRepositorydataRepository;publicObjectgetData(Stringkey){ // 第一次检查缓存Objectvalue =redisTemplate.opsForValue().get(key);if(value !=null){ returnvalue;}RLocklock =redissonClient.getLock(key +":LOCK");try{ // 尝试加锁,设置锁超时时间防止死锁if(lock.tryLock(5,30,TimeUnit.SECONDS)){ try{ // 双重检查缓存value =redisTemplate.opsForValue().get(key);if(value !=null){ returnvalue;}// 查询数据库ObjectdbData =dataRepository.findById(key);// 更新缓存,设置合理过期时间redisTemplate.opsForValue().set(key,dbData,1,TimeUnit.HOURS);returndbData;}finally{ lock.unlock();}}else{ // 未获取到锁,短暂等待后重试Thread.sleep(100);returnredisTemplate.opsForValue().get(key);}}catch(InterruptedExceptione){ Thread.currentThread().interrupt();thrownewRuntimeException("获取锁失败",e);}}}

8.2 es相关

对于es,其实更新数据不建议采用这种方式。因为es中需要反范式设计,不可能用1张表的数据做es查询数据的。
对于电商系统的商品查询,我们可以在商品上架的时候更新es。并且商品商家状态下,不允许修改商品。商品下架时,删除es的数据。想要修改商品数据,可以先下架,再修改,而后上架。

【我要纠错】责任编辑:新华社