Kafka多生產者消費者自動配置

語言: CN / TW / HK

背景

專案中不同的業務可能會使用多個kafka,按預設的Kafka配置,最多是支援消費者和生產者使用不同的Kafka,如果兩個生產者使用不同的Kafka則需要自定義配置,生成對應的bean。

解決方案

多生產者,多消費者,使用不同的字首來區分,根據字首來區分配置,載入配置,例項化對應字首的KafkaProperties kafkaListenerContainerFactory KafkaTemplate ,每個bean的名稱都是帶字首的,使用的時候,按照需要注入對應的bean。

YML配置

yml spring: kafka: product: bootstrap-servers: 55.1.40.231:9091,55.6.70.231:9091,55.5.70.231:9091 properties: sasl: mechanism: PLAIN jaas: config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="xxxx"; security: protocol: SASL_PLAINTEXT producer: retries: 0 acks: -1 batch-size: 16384 linger-ms: 0 buffer-memory: 33554432 consumer: group-id: consumer-group-id enable-auto-commit: true auto-commit-interval-ms: 1000 auto-offset-reset: latest session-timeout-ms: 120000 request-timeout-ms: 180000 order: bootstrap-servers: 55.10.33.132:9091,55.10.33.132:9092,55.10.33.132:9093,55.10.33.132:9094,55.10.33.132:9095,55.10.33.132:9096,55.10.33.132:9097,55.10.33.132:9098,55.10.33.132:9099,55.10.33.132:9100 properties: sasl: mechanism: PLAIN jaas: config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user_order" password="xxxxxxx"; security: protocol: SASL_PLAINTEXT producer: retries: 3 acks: -1 batch-size: 16384 linger-ms: 0 buffer-memory: 33554432 consumer: group-id: order-migration enable-auto-commit: true auto-commit-interval-ms: 1000 auto-offset-reset: latest session-timeout-ms: 120000 request-timeout-ms: 180000

自定義KafkaProperties

使用KafkaProperties接收配置,但是需要修改下字首,但是KafkaProperties原始碼改不了,新寫一個類繼承KafkaProperties

```java @Component @Primary @ConfigurationProperties(prefix = "spring.kafka.order") public class OrderKafkaProperties extends KafkaProperties{

} ```

如果沒有Kafka預設配置,Kafka會自動例項化預設的KafkaProperties,如果有多個KafkaProperties例項,需要指定一個首選的bean,否則KafkaAnnotationDrivenConfiguration類中建構函式會報錯。

所以在其中一個加上@Primary註解

KafkaTemplate和KafkaListenerContainerFactory配置

有了KafkaProperties就可以生成KafkaTemplateKafkaListenerContainerFactory例項

```java @Configuration public class KafkaConfig {

@Autowired
private OrderKafkaProperties orderKafkaProperties;

@Bean("orderKafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

private ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

private Map<String, Object> producerConfigs() {
    return contractKafkaProperties.buildProducerProperties();
}


@Bean("orderKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

private ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

private Map<String, Object> consumerConfigs() {
    return contractKafkaProperties.buildConsumerProperties();
}

} ```

這樣就可以在其他地方直接使用了,生產者就直接@Autowired orderKafkaTemplate,如果是消費者,直接在@KafkaListenercontainerFactory屬性指定orderKafkaListenerContainerFactory

如果有多個生產者消費者,就增加對應的配置即可。這樣簡化了配置的讀取,除了加了字首,其他的配置都是和Kafka預設配置一樣的,複用Springboot的屬性繫結,後續如果有其他配置,加上後能直接生效,無需修改程式碼。如果修改配置的結構需要程式碼中讀取,然後手動設定,後期修改YML配置和程式碼都需要修改,比較麻煩。

方案演進

上述方案,如果需要新增一個Kafka的配置,需要新增一個字首,然後新增對應配置程式碼,來生成KafkaPropertiesKafkaTemplateKafkaListenerContainerFactory例項,但是不同的字首生成不同的例項程式碼都是重複的,而且所有的字首、屬性值都由YML配置可以得到,所以程式碼中生成帶字首的bean可以由程式碼自動生成,並註冊到spring容器中。根據這個思路,寫一個BeanFactoryAware的實現類。(Aware介面是框架提供給使用者使用者獲取框架中一些物件的介面,比如BeanFactoryAware就是獲取BeanFactory,框架會呼叫重寫的setBeanFactory方法,將BeanFactory傳給我們的實現類)

```java @Component @Slf4j public class EmallBeanFactoryAware implements BeanFactoryAware {

@Autowired
private Environment environment;

private static final String SPRING_KAFKA_PREFIX = "spring.kafka";

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    if (beanFactory instanceof DefaultListableBeanFactory) {
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;

        Binder binder = Binder.get(environment);
        //將YML中屬性值對映到MAP中,後面根據配置字首生成bean並註冊到容器中,TODO 繫結可能有異常,加try catch穩一點
        BindResult<Map> bindResultWithPrefix = binder.bind(SPRING_KAFKA_PREFIX, Bindable.of(Map.class));
        if (!bindResultWithPrefix.isBound()) {
            return;
        }

        Map map = bindResultWithPrefix.get();
        Set set = map.keySet();
        Set<String> kafkaPropertyFiledNames = getKafkaPropertyFiledNames();

        //如果配置多個primary, 只設置第一個,TODO專案啟動過程中,這個變數是否有併發問題
        boolean hasSetPrimary = false;
        //例項化每個帶字首的KafkaProperties、KafkaTemplate、
        for (Object object : set) {
            String prefix = object.toString();

            if (kafkaPropertyFiledNames.contains(prefix)) {
                //不帶字首的正常配置忽略
                continue;
            }

            String configPrefix = SPRING_KAFKA_PREFIX + "." + prefix;

            BindResult<KafkaProperties> kafkaPropertiesBindResult;
            try {
                kafkaPropertiesBindResult = binder.bind(configPrefix, Bindable.of(KafkaProperties.class));
                if (!kafkaPropertiesBindResult.isBound()) {
                    continue;
                }
            } catch (Exception e) {
                //一些配置不是在KafkaProperties屬性,但是也不是字首配置,在這一步會繫結失敗,比如spring.kafka.topics配置,
                //一些配置的名稱是帶-,KafkaProperties屬性是駝峰,繫結是會出異常的,異常忽略
                log.error("auto register kafka properties error, prefix is: {}", configPrefix);
                continue;
            }

            //註冊生產者(TODO 沒配置生產者是否會報錯)
            KafkaProperties kafkaProperties = kafkaPropertiesBindResult.get();
            String propertiesBeanName = prefix + "KafkaProperties";
            boolean isBeanExist = defaultListableBeanFactory.containsBean(propertiesBeanName);
            if (!isBeanExist) {
                String primaryConfig = configPrefix + ".primary";
                //沒有預設的kafka配置,需要設定下primary
                BindResult<Boolean> primaryBindResult = binder.bind(primaryConfig, Bindable.of(Boolean.class));
                if (primaryBindResult.isBound() && primaryBindResult.get() && !hasSetPrimary) {
                    BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(KafkaProperties.class);
                    defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition());
                    defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                    defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);
                    hasSetPrimary = true;
                } else {
                    defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties);
                }
            }

            //註冊生產者KafkaTemplate
            String templateBeanName = prefix + "KafkaTemplate";
            if (!defaultListableBeanFactory.containsBean(templateBeanName)) {
                KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(
                        new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()));
                defaultListableBeanFactory.registerSingleton(templateBeanName, kafkaTemplate);
            }

            String beanName = prefix + "KafkaListenerContainerFactory";
            if (!defaultListableBeanFactory.containsBean(beanName)) {
                //註冊消費者listener(TODO 沒配置消費者是否會報錯)
                ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                        new ConcurrentKafkaListenerContainerFactory<>();
                factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
                factory.setConcurrency(10);
                factory.getContainerProperties().setPollTimeout(3000);
                defaultListableBeanFactory.registerSingleton(beanName, factory);
            }
        }
    }
}

private static Set<String> getKafkaPropertyFiledNames () {
    Set<String> names = new HashSet<>();

    Field[] declaredFields = KafkaProperties.class.getDeclaredFields();
    if (declaredFields.length == 0) {
        return names;
    }

    for (Field declaredField : declaredFields) {
        names.add(declaredField.getName());
    }

    return names;
}

} ```

遇到的問題

手動註冊的bean程式碼中@Autowire無法注入

手動註冊的無法@Autowire,直接加@Lazy註解,先忽略bean註冊的先後順序

多個KafkaProperties例項,無法確定使用哪一個

因為使用字首的配置方式,bean名稱也是帶字首的,沒有預設的Kafka配置,框架會自動生成對應的bean,KafkaAnnotationDrivenConfiguration中的KafkaProperties 屬性是根據型別注入的,如果配置有多個字首,注入的時候無法確定使用哪一個,所以增加一個primary配置,自動生成的時候設定下。

既有帶字首,又有不帶字首使用預設配置的

自動配置程式碼中有一段是根據yml中配置的key,判斷是否是KafkaProperties類中的欄位,如果是就忽略,讓框架自動按預設配置,有些欄位yml中是帶-,如bootstrap-serversKafkaProperties中是駝峰命名bootstrapServers,繫結的時候會拋異常,影響應用啟動,這種異常可以忽略,直接用try catch捕獲。

設定Bean為Primary

第二個問題中,多個相同型別的Bean如何設定其中一個bean為Primary,手動註冊bean,如果有例項物件,可以直接使用BeanFactoryregisterSingleton(beanName, object),如果沒有例項物件,可以直接使用類名,通過BeanFactoryregisterBeanDefinition(beanName, beanDefinition)來註冊,如果要設定bean為Primary,必須通過BeanDefinition來設定,但是通過框架的繫結是直接生成例項物件的,如果通過registerSingleton來註冊,通過beanName獲取BeanDefinition是會拋異常的,因為沒有BeanDefinition,所以需要將物件例項和BeanDefinition關聯起來,就是上面這段程式碼

java //註冊BeanDefinition defaultListableBeanFactory.registerBeanDefinition(propertiesBeanName, beanDefinitionBuilder.getBeanDefinition()); //註冊物件例項,使用相同的bean名稱 defaultListableBeanFactory.registerSingleton(propertiesBeanName, kafkaProperties); //再獲取BeanDefinition就能獲取到,而且這個bean就是上面註冊的例項物件 defaultListableBeanFactory.getBeanDefinition(propertiesBeanName).setPrimary(true);