ElasticSearch裡我明明指定了long,為什麼卻變成了keyword
背景
實體類定義屬性id
為Long
型別,但在呼叫 spring-data-elasticsearch:3.2.10.RELEASE
中的putMapping(Class<T>)
方法時卻被轉換成了keyword
型別
原始碼
檢視putMapping
方法,可以發現最終呼叫最下邊的過載方法
class ElasticsearchRestTemplate {
...
@Override
public <T> boolean putMapping(Class<T> clazz) {
return putMapping(clazz, buildMapping(clazz));
}
@Override
public <T> boolean putMapping(Class<T> clazz, Object mapping) {
return putMapping(getPersistentEntityFor(clazz).getIndexName(), getPersistentEntityFor(clazz).getIndexType(),
mapping);
}
@Override
public <T> boolean putMapping(String indexName, String type, Class<T> clazz) {
return putMapping(indexName, type, buildMapping(clazz));
}
@Override
public boolean putMapping(String indexName, String type, Object mapping) {
Assert.notNull(indexName, "No index defined for putMapping()");
Assert.notNull(type, "No type defined for putMapping()");
PutMappingRequest request = new PutMappingRequest(indexName).type(type);
if (mapping instanceof String) {
request.source(String.valueOf(mapping), XContentType.JSON);
} else if (mapping instanceof Map) {
request.source((Map) mapping);
} else if (mapping instanceof XContentBuilder) {
request.source((XContentBuilder) mapping);
}
try {
return client.indices().putMapping(request, RequestOptions.DEFAULT).isAcknowledged();
} catch (IOException e) {
throw new ElasticsearchException("Failed to put mapping for " + indexName, e);
}
}
...
}
檢視buildMapping
方法,因為並沒有定義外部mappingPath
配置檔案,所以走最下邊的mappingBuilder.buildPropertyMapping(clazz)
來進行解析出String
型別的json
檔案
abstract class AbstractElasticsearchTemplate {
...
protected String buildMapping(Class<?> clazz) {
// load mapping specified in Mapping annotation if present
if (clazz.isAnnotationPresent(Mapping.class)) {
String mappingPath = clazz.getAnnotation(Mapping.class).mappingPath();
if (!StringUtils.isEmpty(mappingPath)) {
String mappings = ResourceUtil.readFileFromClasspath(mappingPath);
if (!StringUtils.isEmpty(mappings)) {
return mappings;
}
} else {
LOGGER.info("mappingPath in @Mapping has to be defined. Building mappings using @Field");
}
}
// build mapping from field annotations
try {
MappingBuilder mappingBuilder = new MappingBuilder(elasticsearchConverter);
return mappingBuilder.buildPropertyMapping(clazz);
} catch (Exception e) {
throw new ElasticsearchException("Failed to build mapping for " + clazz.getSimpleName(), e);
}
}
...
}
檢視buildPropertyMapping
方法
```
class MappingBuilder {
...
String buildPropertyMapping(Class<?> clazz) throws IOException {
// 提前解析出一些通用屬性,比如indexName,indexType等等
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);
// 構造一個json構造器,以indexType開始
XContentBuilder builder = jsonBuilder().startObject().startObject(entity.getIndexType());
// 新增dynamic template
addDynamicTemplatesMapping(builder, entity);
// 父子文件判斷
String parentType = entity.getParentType();
if (hasText(parentType)) {
builder.startObject(FIELD_PARENT).field(FIELD_TYPE, parentType).endObject();
}
// 屬性解析開始標誌properties
builder.startObject(FIELD_PROPERTIES);
// 具體的properties解析,為根物件非nested物件
mapEntity(builder, entity, true, "", false, FieldType.Auto, null);
builder.endObject() // FIELD_PROPERTIES
.endObject() // indexType
.endObject() // root object
.close();
return builder.getOutputStream().toString();
}
private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity entity, boolean isRootObject, String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType, @Nullable Field parentFieldAnnotation) throws IOException {
boolean writeNestedProperties = !isRootObject && (isAnyPropertyAnnotatedWithField(entity) || nestedOrObjectField);
if (writeNestedProperties) {
String type = nestedOrObjectField ? fieldType.toString().toLowerCase()
: FieldType.Object.toString().toLowerCase();
builder.startObject(nestedObjectFieldName).field(FIELD_TYPE, type);
if (nestedOrObjectField && FieldType.Nested == fieldType && parentFieldAnnotation != null
&& parentFieldAnnotation.includeInParent()) {
builder.field("include_in_parent", parentFieldAnnotation.includeInParent());
}
builder.startObject(FIELD_PROPERTIES);
}
// 物件欄位屬性的解析
if (entity != null) {
entity.doWithProperties((PropertyHandler<ElasticsearchPersistentProperty>) property -> {
try {
if (property.isAnnotationPresent(Transient.class) || isInIgnoreFields(property, parentFieldAnnotation)) {
return;
}
buildPropertyMapping(builder, isRootObject, property);
} catch (IOException e) {
logger.warn("error mapping property with name {}", property.getName(), e);
}
});
}
if (writeNestedProperties) {
builder.endObject().endObject();
}
}
// 解析每個property的方法 private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject, ElasticsearchPersistentProperty property) throws IOException {
if (property.isAnnotationPresent(Mapping.class)) {
String mappingPath = property.getRequiredAnnotation(Mapping.class).mappingPath();
if (!StringUtils.isEmpty(mappingPath)) {
ClassPathResource mappings = new ClassPathResource(mappingPath);
if (mappings.exists()) {
builder.rawField(property.getFieldName(), mappings.getInputStream(), XContentType.JSON);
return;
}
}
}
// geo標識
boolean isGeoPointProperty = isGeoPointProperty(property);
// completion標識
boolean isCompletionProperty = isCompletionProperty(property);
// nested object標識
boolean isNestedOrObjectProperty = isNestedOrObjectProperty(property);
// 屬性上的Field註解
Field fieldAnnotation = property.findAnnotation(Field.class);
if (!isGeoPointProperty && !isCompletionProperty && property.isEntity() && hasRelevantAnnotation(property)) {
if (fieldAnnotation == null) {
return;
}
Iterator<? extends TypeInformation<?>> iterator = property.getPersistentEntityTypes().iterator();
ElasticsearchPersistentEntity<?> persistentEntity = iterator.hasNext()
? elasticsearchConverter.getMappingContext().getPersistentEntity(iterator.next())
: null;
mapEntity(builder, persistentEntity, false, property.getFieldName(), isNestedOrObjectProperty,
fieldAnnotation.type(), fieldAnnotation);
if (isNestedOrObjectProperty) {
return;
}
}
MultiField multiField = property.findAnnotation(MultiField.class);
if (isGeoPointProperty) {
applyGeoPointFieldMapping(builder, property);
return;
}
if (isCompletionProperty) {
CompletionField completionField = property.findAnnotation(CompletionField.class);
applyCompletionFieldMapping(builder, property, completionField);
}
// 判斷是否為id屬性
if (isRootObject && fieldAnnotation != null && property.isIdProperty()) {
applyDefaultIdFieldMapping(builder, property);
} else if (multiField != null) {
addMultiFieldMapping(builder, property, multiField, isNestedOrObjectProperty);
} else if (fieldAnnotation != null) {
addSingleFieldMapping(builder, property, fieldAnnotation, isNestedOrObjectProperty);
}
}
... } ```
至此可以看到,只要fieldName
為id
或document
就判定為是id
屬性,然後將type
設定為keyword
並且可被索引。疑問到這裡解決
```
class SimpleElasticsearchPersistentProperty {
...
private static final List
@Override public boolean isIdProperty() { return isId; } private void applyDefaultIdFieldMapping(XContentBuilder builder, ElasticsearchPersistentProperty property) throws IOException { builder.startObject(property.getFieldName()).field(FIELD_TYPE, TYPE_VALUE_KEYWORD).field(FIELD_INDEX, true) .endObject(); } ... } ```
話外題
專案中使用的ElasticSearch
實體類都是採取@Document
指定indexName
來操作的,但是索引和表都涉及到分庫分表,所以又不能寫死,然後就採取的SpEL
配合ThreadLocal
從上下文裡set
後get
,其實Spring
對elasticsearch
操作類似於關係型資料庫也封裝的有一層Repository
抽象,名為ElasticsearchRepository
,我們可以直接定義實體類操作介面繼承就可以完成對單索引的CRUD以及Page等操作,但這樣有一個問題,那就是indexName
無法動態去調整,所以就放棄了這種,改用更底層的RestHighLevelClient
封裝的ElasticSearchRestTemplate
模版類,這樣在面對分庫分表時就可以手動去對每個Document
進行set
不同的indexName
,跨索引查詢時也可以指定多個,也可以直接指定索引的alias
,需要注意的時,在進行更新時,只指定alias
是不被允許的,需要手動查出符合條件的Document
在進行索引的分組批量更新,即呼叫ElasticSearch
的bulk api
在對ElasticSearch
和資料庫的一致性問題上,我是通過封裝不同的方法來確保強一致性和最終一致性
強一致性
類似插入、更新、刪除等場景下,都是放在一個事務裡,先操作資料庫,再操作ElasticSearch
,這樣可以確保操作ElasticSearch
失敗時,資料庫可以成功回滾。一般只運用於對資料實時性要求敏感的場景,並且資料量不大的情況,但即便這樣還是會有至少1s
的延遲,這裡就涉及到ElasticSearch
的刷盤策略問題上了,這裡不展開研究
最終一致性
批量的插入、更新這些操作,如果放在一個大事務裡,對資料庫也是一種壓力,所以一般是分批操作資料庫,另起一個執行緒池對事務提交進行監聽,將資料庫資料同步到ElasticSearch
裡,在同步成功後反轉資料庫的同步狀態欄位。為了確保萬無一失,後臺會啟動一個定時掃描資料庫同步欄位的執行緒去定時掃描同步。這種一般適用於大資料量的場景。當然你也可以去監聽MySQL
的binlog
日誌來進行同步。