ElasticSearch裡我明明指定了long,為什麼卻變成了keyword

語言: CN / TW / HK

背景

實體類定義屬性idLong型別,但在呼叫 spring-data-elasticsearch:3.2.10.RELEASE中的putMapping(Class<T>)方法時卻被轉換成了keyword型別

原始碼

檢視putMapping方法,可以發現最終呼叫最下邊的過載方法

class ElasticsearchRestTemplate { ...  @Override  public <T> boolean putMapping(Class<T> clazz) {    return putMapping(clazz, buildMapping(clazz)); } ​  @Override  public <T> boolean putMapping(Class<T> clazz, Object mapping) {    return putMapping(getPersistentEntityFor(clazz).getIndexName(), getPersistentEntityFor(clazz).getIndexType(),                      mapping); } ​  @Override  public <T> boolean putMapping(String indexName, String type, Class<T> clazz) {    return putMapping(indexName, type, buildMapping(clazz)); } ​  @Override  public boolean putMapping(String indexName, String type, Object mapping) {    Assert.notNull(indexName, "No index defined for putMapping()");    Assert.notNull(type, "No type defined for putMapping()");    PutMappingRequest request = new PutMappingRequest(indexName).type(type);    if (mapping instanceof String) {      request.source(String.valueOf(mapping), XContentType.JSON);   } else if (mapping instanceof Map) {      request.source((Map) mapping);   } else if (mapping instanceof XContentBuilder) {      request.source((XContentBuilder) mapping);   }    try {      return client.indices().putMapping(request, RequestOptions.DEFAULT).isAcknowledged();   } catch (IOException e) {      throw new ElasticsearchException("Failed to put mapping for " + indexName, e);   } } ... }

檢視buildMapping方法,因為並沒有定義外部mappingPath配置檔案,所以走最下邊的mappingBuilder.buildPropertyMapping(clazz)來進行解析出String型別的json檔案

abstract class AbstractElasticsearchTemplate { ...  protected String buildMapping(Class<?> clazz) {    // load mapping specified in Mapping annotation if present    if (clazz.isAnnotationPresent(Mapping.class)) {      String mappingPath = clazz.getAnnotation(Mapping.class).mappingPath();      if (!StringUtils.isEmpty(mappingPath)) {        String mappings = ResourceUtil.readFileFromClasspath(mappingPath);        if (!StringUtils.isEmpty(mappings)) {          return mappings;       }     } else {        LOGGER.info("mappingPath in @Mapping has to be defined. Building mappings using @Field");     }   } ​    // build mapping from field annotations    try {      MappingBuilder mappingBuilder = new MappingBuilder(elasticsearchConverter);      return mappingBuilder.buildPropertyMapping(clazz);   } catch (Exception e) {      throw new ElasticsearchException("Failed to build mapping for " + clazz.getSimpleName(), e);   } } ... }

檢視buildPropertyMapping方法 ``` class MappingBuilder { ... String buildPropertyMapping(Class<?> clazz) throws IOException { // 提前解析出一些通用屬性,比如indexName,indexType等等 ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext() .getRequiredPersistentEntity(clazz); // 構造一個json構造器,以indexType開始 XContentBuilder builder = jsonBuilder().startObject().startObject(entity.getIndexType());

    // 新增dynamic template
    addDynamicTemplatesMapping(builder, entity);

    // 父子文件判斷
    String parentType = entity.getParentType();
    if (hasText(parentType)) {
        builder.startObject(FIELD_PARENT).field(FIELD_TYPE, parentType).endObject();
    }

    // 屬性解析開始標誌properties
    builder.startObject(FIELD_PROPERTIES);
    // 具體的properties解析,為根物件非nested物件
    mapEntity(builder, entity, true, "", false, FieldType.Auto, null);

    builder.endObject() // FIELD_PROPERTIES
            .endObject() // indexType
            .endObject() // root object
            .close();

    return builder.getOutputStream().toString();

}

private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity entity, boolean isRootObject, String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType, @Nullable Field parentFieldAnnotation) throws IOException {

    boolean writeNestedProperties = !isRootObject && (isAnyPropertyAnnotatedWithField(entity) || nestedOrObjectField);
    if (writeNestedProperties) {

        String type = nestedOrObjectField ? fieldType.toString().toLowerCase()
                : FieldType.Object.toString().toLowerCase();
        builder.startObject(nestedObjectFieldName).field(FIELD_TYPE, type);

        if (nestedOrObjectField && FieldType.Nested == fieldType && parentFieldAnnotation != null
                && parentFieldAnnotation.includeInParent()) {

            builder.field("include_in_parent", parentFieldAnnotation.includeInParent());
        }

        builder.startObject(FIELD_PROPERTIES);
    }
// 物件欄位屬性的解析
    if (entity != null) {

        entity.doWithProperties((PropertyHandler<ElasticsearchPersistentProperty>) property -> {
            try {
                if (property.isAnnotationPresent(Transient.class) || isInIgnoreFields(property, parentFieldAnnotation)) {
                    return;
                }

                buildPropertyMapping(builder, isRootObject, property);
            } catch (IOException e) {
                logger.warn("error mapping property with name {}", property.getName(), e);
            }
        });
    }

    if (writeNestedProperties) {
        builder.endObject().endObject();
    }
}

// 解析每個property的方法 private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject, ElasticsearchPersistentProperty property) throws IOException {

    if (property.isAnnotationPresent(Mapping.class)) {

        String mappingPath = property.getRequiredAnnotation(Mapping.class).mappingPath();
        if (!StringUtils.isEmpty(mappingPath)) {

            ClassPathResource mappings = new ClassPathResource(mappingPath);
            if (mappings.exists()) {
                builder.rawField(property.getFieldName(), mappings.getInputStream(), XContentType.JSON);
                return;
            }
        }
    }
    // geo標識
    boolean isGeoPointProperty = isGeoPointProperty(property);
// completion標識
    boolean isCompletionProperty = isCompletionProperty(property);
// nested object標識
    boolean isNestedOrObjectProperty = isNestedOrObjectProperty(property);
    // 屬性上的Field註解
    Field fieldAnnotation = property.findAnnotation(Field.class);
    if (!isGeoPointProperty && !isCompletionProperty && property.isEntity() && hasRelevantAnnotation(property)) {

        if (fieldAnnotation == null) {
            return;
        }

        Iterator<? extends TypeInformation<?>> iterator = property.getPersistentEntityTypes().iterator();
        ElasticsearchPersistentEntity<?> persistentEntity = iterator.hasNext()
                ? elasticsearchConverter.getMappingContext().getPersistentEntity(iterator.next())
                : null;

        mapEntity(builder, persistentEntity, false, property.getFieldName(), isNestedOrObjectProperty,
                fieldAnnotation.type(), fieldAnnotation);

        if (isNestedOrObjectProperty) {
            return;
        }
    }

    MultiField multiField = property.findAnnotation(MultiField.class);

    if (isGeoPointProperty) {
        applyGeoPointFieldMapping(builder, property);
        return;
    }

    if (isCompletionProperty) {
        CompletionField completionField = property.findAnnotation(CompletionField.class);
        applyCompletionFieldMapping(builder, property, completionField);
    }
    // 判斷是否為id屬性
    if (isRootObject && fieldAnnotation != null && property.isIdProperty()) {
        applyDefaultIdFieldMapping(builder, property);
    } else if (multiField != null) {
        addMultiFieldMapping(builder, property, multiField, isNestedOrObjectProperty);
    } else if (fieldAnnotation != null) {
        addSingleFieldMapping(builder, property, fieldAnnotation, isNestedOrObjectProperty);
    }
}

... } ```

至此可以看到,只要fieldNameiddocument就判定為是id屬性,然後將type設定為keyword並且可被索引。疑問到這裡解決

``` class SimpleElasticsearchPersistentProperty { ...  private static final List SUPPORTED_ID_PROPERTY_NAMES = Arrays.asList("id", "document");  public SimpleElasticsearchPersistentProperty(Property property, PersistentEntity<?, ElasticsearchPersistentProperty> owner, SimpleTypeHolder simpleTypeHolder) { ... this.isId = super.isIdProperty() || SUPPORTED_ID_PROPERTY_NAMES.contains(getFieldName()); ... }

@Override public boolean isIdProperty() { return isId; }  private void applyDefaultIdFieldMapping(XContentBuilder builder, ElasticsearchPersistentProperty property) throws IOException { ​ builder.startObject(property.getFieldName()).field(FIELD_TYPE, TYPE_VALUE_KEYWORD).field(FIELD_INDEX, true) .endObject(); } ... } ```

話外題

專案中使用的ElasticSearch實體類都是採取@Document指定indexName來操作的,但是索引和表都涉及到分庫分表,所以又不能寫死,然後就採取的SpEL配合ThreadLocal從上下文裡setget,其實Springelasticsearch操作類似於關係型資料庫也封裝的有一層Repository抽象,名為ElasticsearchRepository,我們可以直接定義實體類操作介面繼承就可以完成對單索引的CRUD以及Page等操作,但這樣有一個問題,那就是indexName無法動態去調整,所以就放棄了這種,改用更底層的RestHighLevelClient封裝的ElasticSearchRestTemplate模版類,這樣在面對分庫分表時就可以手動去對每個Document進行set不同的indexName,跨索引查詢時也可以指定多個,也可以直接指定索引的alias,需要注意的時,在進行更新時,只指定alias是不被允許的,需要手動查出符合條件的Document在進行索引的分組批量更新,即呼叫ElasticSearchbulk api

在對ElasticSearch和資料庫的一致性問題上,我是通過封裝不同的方法來確保強一致性和最終一致性

強一致性

類似插入、更新、刪除等場景下,都是放在一個事務裡,先操作資料庫,再操作ElasticSearch,這樣可以確保操作ElasticSearch失敗時,資料庫可以成功回滾。一般只運用於對資料實時性要求敏感的場景,並且資料量不大的情況,但即便這樣還是會有至少1s的延遲,這裡就涉及到ElasticSearch的刷盤策略問題上了,這裡不展開研究

最終一致性

批量的插入、更新這些操作,如果放在一個大事務裡,對資料庫也是一種壓力,所以一般是分批操作資料庫,另起一個執行緒池對事務提交進行監聽,將資料庫資料同步到ElasticSearch裡,在同步成功後反轉資料庫的同步狀態欄位。為了確保萬無一失,後臺會啟動一個定時掃描資料庫同步欄位的執行緒去定時掃描同步。這種一般適用於大資料量的場景。當然你也可以去監聽MySQLbinlog日誌來進行同步。