1 RestHighLevelClient介绍

默认情况下,ElasticSearch使用两个端口来监听外部TCP流量。

9200端口:用于所有通过HTTP协议进行的API调用。包括搜索、聚合、监控、以及其他任何使用HTTP协议的请求。所有的客户端库都会使用该端口与ElasticSearch进行交互。9300端口:是一个自定义的二进制协议,用于集群中各节点之间的通信。用于诸如集群变更、主节点选举、节点加入/离开、分片分配等事项。

RestHighLevelClient是ES的Java客户端,它是通过HTTP与ES集群进行通信。

2 引入ES依赖

org.elasticsearch

elasticsearch

7.10.0

org.elasticsearch.client

elasticsearch-rest-client

7.10.0

org.elasticsearch.client

elasticsearch-rest-high-level-client

7.10.0

3 使用

3.1 es的配置

# es配置

# es用户名

elasticsearch.userName=elastic

# es密码

elasticsearch.password=elastic

# es host ip 地址(集群),多个以","间隔

elasticsearch.hosts=127.0.0.1:9200

# es 请求方式

elasticsearch.scheme=http

# es 连接超时时间(ms)

elasticsearch.connectTimeOut=1000

# es socket 连接超时时间(ms)

elasticsearch.socketTimeOut=30000

# es 请求超时时间(ms)

elasticsearch.connectionRequestTimeOut=500

# es 最大连接数

elasticsearch.maxConnectNum=100

# es 每个路由的最大连接数

elasticsearch.maxConnectNumPerRoute=100

3.2 es客户端配置类

/**

* restHighLevelClient 客户端配置类

*

*/

@Slf4j

@Data

@Configuration

@ConfigurationProperties(prefix = "elasticsearch")

public class ElasticsearchConfig {

/**

* es host ip 地址(集群)

*/

private String hosts;

/**

* es用户名

*/

private String userName;

/**

* es密码

*/

private String password;

/**

* es 请求方式

*/

private String scheme;

/**

* es 连接超时时间

*/

private int connectTimeOut;

/**

* es socket 连接超时时间

*/

private int socketTimeOut;

/**

* es 请求超时时间

*/

private int connectionRequestTimeOut;

/**

* es 最大连接数

*/

private int maxConnectNum;

/**

* es 每个路由的最大连接数

*/

private int maxConnectNumPerRoute;

/**

* 如果@Bean没有指定bean的名称,那么方法名就是bean的名称

*/

@Bean(name = "restHighLevelClient")

public RestHighLevelClient restHighLevelClient() {

// 构建连接对象

RestClientBuilder builder = RestClient.builder(getEsHost());

// 连接延时配置

builder.setRequestConfigCallback(requestConfigBuilder -> {

requestConfigBuilder.setConnectTimeout(connectTimeOut);

requestConfigBuilder.setSocketTimeout(socketTimeOut);

requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);

return requestConfigBuilder;

});

// 连接数配置

builder.setHttpClientConfigCallback(httpClientBuilder -> {

httpClientBuilder.setMaxConnTotal(maxConnectNum);

httpClientBuilder.setMaxConnPerRoute(maxConnectNumPerRoute);

httpClientBuilder.setDefaultCredentialsProvider(getCredentialsProvider());

return httpClientBuilder;

});

return new RestHighLevelClient(builder);

}

private HttpHost[] getEsHost() {

// 拆分地址(es为多节点时,不同host以逗号间隔)

List hostLists = new ArrayList<>();

String[] hostList = hosts.split(",");

for (String addr : hostList) {

String host = addr.split(":")[0];

String port = addr.split(":")[1];

hostLists.add(new HttpHost(host, Integer.parseInt(port), scheme));

}

// 转换成 HttpHost 数组

return hostLists.toArray(new HttpHost[]{});

}

private CredentialsProvider getCredentialsProvider() {

// 设置用户名、密码

CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));

return credentialsProvider;

}

}

3.3 es的使用

3.3.1 创建es索引

3.3.1.1 创建es索引的工具类

创建es索引的工具类如下所示。

/**

* 操作ES索引

*

*/

@Slf4j

@Service

public class EsIndexOperation {

@Resource

private RestHighLevelClient restHighLevelClient;

private final RequestOptions options = RequestOptions.DEFAULT;

/**

* 判断索引是否存在

*/

public boolean checkIndex(String index) {

try {

return restHighLevelClient.indices().exists(new GetIndexRequest(index), options);

} catch (Exception e) {

log.error("EsIndexOperation checkIndex error.", e);

}

return Boolean.FALSE;

}

/**

* 创建索引

*

* @param indexName es索引名

* @param esSettingFilePath es索引的alias、settings和mapping的配置文件

*/

public boolean createIndex(String indexName, String esSettingFilePath) {

String aliases = null;

String mappings = null;

String settings = null;

if (StringUtils.isNotBlank(esSettingFilePath)) {

try {

String fileContent = FileUtils.readFileContent(esSettingFilePath);

if (StringUtils.isNotBlank(fileContent)) {

JSONObject jsonObject = JSON.parseObject(fileContent);

aliases = jsonObject.getString("aliases");

mappings = jsonObject.getString("mappings");

settings = jsonObject.getString("settings");

}

} catch (Exception e) {

log.error("createIndex error.", e);

return false;

}

}

if (checkIndex(indexName)) {

log.error("createIndex indexName:[{}]已存在", indexName);

return false;

}

CreateIndexRequest request = new CreateIndexRequest(indexName);

if ((StringUtils.isNotBlank(aliases))) {

request.aliases(aliases, XContentType.JSON);

}

if (StringUtils.isNotBlank(mappings)) {

request.mapping(mappings, XContentType.JSON);

}

if (StringUtils.isNotBlank(settings)) {

request.settings(settings, XContentType.JSON);

}

try {

this.restHighLevelClient.indices().create(request, options);

return true;

} catch (IOException e) {

log.error("EsIndexOperation createIndex error.", e);

return false;

}

}

/**

* 删除索引

*/

public boolean deleteIndex(String indexName) {

try {

if (checkIndex(indexName)) {

DeleteIndexRequest request = new DeleteIndexRequest(indexName);

AcknowledgedResponse response = restHighLevelClient.indices().delete(request, options);

return response.isAcknowledged();

}

} catch (Exception e) {

log.error("EsIndexOperation deleteIndex error.", e);

}

return Boolean.FALSE;

}

}

3.3.1.2 读取文件的工具类

/**

* 文件操作类

*/

@Slf4j

public class FileUtils {

/**

* 读取项目resources文件夹下的文件

*

* @param filePath 文件路径

* @return 文件内容

*/

public static String readFileContent(String filePath) {

try {

BufferedReader reader = new BufferedReader(new FileReader(filePath));

String line;

StringBuilder stringBuilder = new StringBuilder();

while ((line = reader.readLine()) != null) {

stringBuilder.append(line);

}

reader.close();

return stringBuilder.toString();

} catch (IOException e) {

log.error("readFileContent error.", e);

}

return null;

}

public static void main(String[] args) {

String filePath = "src/main/resources/es/mappings_test20231216.txt";

String fileContent = readFileContent(filePath);

}

}

3.3.1.3 测试创建es索引

(1)在“resources”文件夹下创建es索引的配置文件

配置文件内容如下所示。

{

"aliases": {

"test": {}

},

"mappings": {

"properties": {

"name": {

"type": "text",

"fields": {

"keyword": {

"type": "keyword",

"ignore_above": 256

}

}

},

"address": {

"type": "text",

"fields": {

"keyword": {

"type": "keyword",

"ignore_above": 256

}

}

}

}

},

"settings": {

"index": {

"number_of_shards": "1",

"number_of_replicas": "1"

}

}

}

(2)读取es索引的配置文件,创建es索引

@Test

public void createIndex() {

String indexName = "test_1216";

String filePath = "src/main/resources/es/mappings_test20231216.txt";

boolean b = esIndexOperation.createIndex(indexName, filePath);

Assert.assertTrue(b);

}

(3)查看创建结果

通过命令 GET /test 查看es索引创建结果,结果如下所示。

{

"test_1216" : {

"aliases" : {

"test" : { }

},

"mappings" : {

"properties" : {

"address" : {

"type" : "text",

"fields" : {

"keyword" : {

"type" : "keyword",

"ignore_above" : 256

}

}

},

"name" : {

"type" : "text",

"fields" : {

"keyword" : {

"type" : "keyword",

"ignore_above" : 256

}

}

}

}

},

"settings" : {

"index" : {

"routing" : {

"allocation" : {

"include" : {

"_tier_preference" : "data_content"

}

}

},

"number_of_shards" : "1",

"provided_name" : "test_1216",

"creation_date" : "1702723364945",

"number_of_replicas" : "1",

"uuid" : "RCAhqjPZSG-n4fse3cot4A",

"version" : {

"created" : "7100099"

}

}

}

}

}

3.3.2 查询操作

3.3.2.1 常用查询

/**

* 查询操作

*

*/

@Slf4j

@Service

public class EsQueryOperation {

@Resource

private RestHighLevelClient client;

private final RequestOptions options = RequestOptions.DEFAULT;

/**

* 查询总数

*/

public Long count(String indexName) {

CountRequest countRequest = new CountRequest(indexName);

try {

CountResponse countResponse = client.count(countRequest, options);

return countResponse.getCount();

} catch (Exception e) {

log.error("EsQueryOperation count error.", e);

}

return 0L;

}

/**

* 查询数据集

*/

public List> list(String indexName, SearchSourceBuilder sourceBuilder) {

SearchRequest searchRequest = new SearchRequest(indexName);

searchRequest.source(sourceBuilder);

try {

SearchResponse searchResp = client.search(searchRequest, options);

List> data = new ArrayList<>();

SearchHit[] searchHitArr = searchResp.getHits().getHits();

for (SearchHit searchHit : searchHitArr) {

Map temp = searchHit.getSourceAsMap();

temp.put("id", searchHit.getId());

data.add(temp);

}

return data;

} catch (Exception e) {

log.error("EsQueryOperation list error.", e);

}

return null;

}

}

3.3.2.2 测试

@Test

public void list() {

String indexName = "test";

// 查询条件

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();

queryBuilder.must(QueryBuilders.termQuery("address", "hunan"));

queryBuilder.mustNot(QueryBuilders.matchQuery("name", "Jack"));

sourceBuilder.query(queryBuilder);

// 分页查询

sourceBuilder.from(0);

sourceBuilder.size(1);

List> list = esQueryOperation.list(indexName, sourceBuilder);

Assert.assertTrue(true);

}

3.3.3 增删改操作

3.3.3.1 常用增删改操作

/**

* 增删改数据

*

*/

@Slf4j

@Service

public class EsDataOperation {

@Resource

private RestHighLevelClient client;

private final RequestOptions options = RequestOptions.DEFAULT;

/**

* 写入数据

*/

public boolean insert(String indexName, Map dataMap) {

try {

BulkRequest request = new BulkRequest();

request.add(new IndexRequest(indexName).opType("create")

.id(dataMap.get("id").toString())

.source(dataMap, XContentType.JSON));

this.client.bulk(request, options);

return Boolean.TRUE;

} catch (Exception e) {

log.error("EsDataOperation insert error.", e);

}

return Boolean.FALSE;

}

/**

* 批量写入数据

*/

public boolean batchInsert(String indexName, List> userIndexList) {

try {

BulkRequest request = new BulkRequest();

for (Map dataMap : userIndexList) {

request.add(new IndexRequest(indexName).opType("create")

.id(dataMap.get("id").toString())

.source(dataMap, XContentType.JSON));

}

this.client.bulk(request, options);

return Boolean.TRUE;

} catch (Exception e) {

log.error("EsDataOperation batchInsert error.", e);

}

return Boolean.FALSE;

}

/**

* 根据id更新数据,可以直接修改索引结构

*

* @param refreshPolicy 数据刷新策略

*/

public boolean update(String indexName, Map dataMap, WriteRequest.RefreshPolicy refreshPolicy) {

try {

UpdateRequest updateRequest = new UpdateRequest(indexName, dataMap.get("id").toString());

updateRequest.setRefreshPolicy(refreshPolicy);

updateRequest.doc(dataMap);

this.client.update(updateRequest, options);

return Boolean.TRUE;

} catch (Exception e) {

log.error("EsDataOperation update error.", e);

}

return Boolean.FALSE;

}

/**

* 删除数据

*/

public boolean delete(String indexName, String id) {

try {

DeleteRequest deleteRequest = new DeleteRequest(indexName, id);

this.client.delete(deleteRequest, options);

return Boolean.TRUE;

} catch (Exception e) {

log.error("EsDataOperation delete error.", e);

}

return Boolean.FALSE;

}

}

3.3.3.2 测试

@Test

public void insert(){

String indexName = "test";

HashMap hashMap = new HashMap<>();

hashMap.put("id",4);

hashMap.put("name","tom");

hashMap.put("address","Jiangsu");

boolean flag = esDataOperation.insert(indexName, hashMap);

Assert.assertTrue(true);

}

@Test

public void update(){

String indexName = "test";

HashMap hashMap = new HashMap<>();

hashMap.put("id", 5);

hashMap.put("name", "jack7");

boolean update = esDataOperation.update(indexName, hashMap, WriteRequest.RefreshPolicy.WAIT_UNTIL);

Assert.assertTrue(true);

}

4 参考文献

(1)elasticsearch学习(七):es客户端RestHighLevelClient_炎升的博客

(2)中间件:ElasticSearch组件RestHighLevelClient用法详解

(3)java api 实现es中的索引管理_createindexrequest

精彩链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。