1.前言

大家好,我是楚生辉,在未来的日子里我们一起来学习大数据相关的技术,一起努力奋斗,遇见更好的自己!

本文详细的介绍了如何使用Scala语言连接上Elasticsearch客户端,有需要的小伙伴可以自行获取与学习~

2.ES工具类

package com.xxxx

import com.alibaba.fastjson.JSON

import com.alibaba.fastjson.serializer.SerializeConfig

import org.apache.http.HttpHost

import org.elasticsearch.action.bulk.BulkRequest

import org.elasticsearch.action.index.IndexRequest

import org.elasticsearch.action.search.{SearchRequest, SearchResponse}

import org.elasticsearch.client.indices.GetIndexRequest

import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}

import org.elasticsearch.common.xcontent.XContentType

import org.elasticsearch.search.{SearchHit, SearchHits}

import org.elasticsearch.search.builder.SearchSourceBuilder

import java.util

import scala.collection.mutable.ListBuffer

/**

* ES工具类

* 用于对ES读写操作

*/

object MyEsutils {

def searchField(indexName: String, fieldName: String): List[String] = {

// 先判断索引是否存在

val request = new GetIndexRequest(indexName)

val bool: Boolean = esClient.indices().exists(request, RequestOptions.DEFAULT)

if (!bool){

return null

}

// 正常从ES中提取指定的字段

val mids: ListBuffer[String] = ListBuffer[String]()

val searchRequest = new SearchRequest(indexName)

val searchSourceBuilder = new SearchSourceBuilder()

searchSourceBuilder.fetchSource(fieldName,null).size(10000)

searchRequest.source(searchSourceBuilder)

val searchResponse: SearchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT)

val hits: Array[SearchHit] = searchResponse.getHits.getHits

for (hit <- hits) {

val sourceAsMap: util.Map[String, AnyRef] = hit.getSourceAsMap

val mid: String = sourceAsMap.get(fieldName).toString

mids.append(mid)

}

mids.toList

}

// ES客户端对象

val esClient : RestHighLevelClient = build()

// 创建ES客户端

def build():RestHighLevelClient = {

val host: String = "localhost"

val port: String = "9200"

val builder: RestClientBuilder = RestClient.builder(new HttpHost(host,port.toInt))

val client = new RestHighLevelClient(builder)

client

}

// 关闭ES对象

def close():Unit={

if (esClient != null){

esClient.close()

}

}

/*

* 1.批量写

* 2.幂等写

*/

def bulkSave(indexName:String,docs: List[(String,AnyRef)]):Unit = {

val bulkRequest = new BulkRequest()

for ((docId,docObj) <- docs) {

val indexRequest = new IndexRequest(indexName)

val dataJson: String = JSON.toJSONString(docObj, new SerializeConfig(true))

indexRequest.source(dataJson,XContentType.JSON)

indexRequest.id(docId)

bulkRequest.add(indexRequest)

}

esClient.bulk(bulkRequest,RequestOptions.DEFAULT)

}

}

使用方法

MyEsutils.bulkSave(indexName,orderWides)

MyEsutils.searchField(indexName,orderWides)

写入ES中,就要规划,是写入到一个索引中,还是分割索引(依据什么进行分割),建索引字段的类型,模板是什么,以及方便后续查询起索引别名

3.ES客户端常见API使用

package com.xxxx

import com.alibaba.fastjson.JSON

import com.alibaba.fastjson.serializer.SerializeConfig

import org.apache.http.HttpHost

import org.apache.lucene.search.TotalHits

import org.elasticsearch.action.bulk.BulkRequest

import org.elasticsearch.action.delete.DeleteRequest

import org.elasticsearch.action.get.{GetRequest, GetResponse}

import org.elasticsearch.action.index.IndexRequest

import org.elasticsearch.action.search.{SearchRequest, SearchResponse}

import org.elasticsearch.action.update.UpdateRequest

import org.elasticsearch.client.{RequestOptions, RestClient, RestClientBuilder, RestHighLevelClient}

import org.elasticsearch.common.text.Text

import org.elasticsearch.common.xcontent.XContentType

import org.elasticsearch.index.query.{BoolQueryBuilder, MatchQueryBuilder, QueryBuilder, QueryBuilders, RangeQueryBuilder, TermQueryBuilder}

import org.elasticsearch.index.reindex.UpdateByQueryRequest

import org.elasticsearch.script.{Script, ScriptType}

import org.elasticsearch.search.SearchHit

import org.elasticsearch.search.aggregations.bucket.terms.{ParsedTerms, Terms, TermsAggregationBuilder}

import org.elasticsearch.search.aggregations.metrics.{AvgAggregationBuilder, ParsedAvg}

import org.elasticsearch.search.aggregations.{Aggregation, AggregationBuilder, AggregationBuilders, Aggregations, BucketOrder}

import org.elasticsearch.search.builder.SearchSourceBuilder

import org.elasticsearch.search.fetch.subphase.highlight.{HighlightBuilder, HighlightField}

import org.elasticsearch.search.sort.SortOrder

import org.elasticsearch.search.suggest.term.TermSuggestion.Score

import java.util

/**

* 测试ES客户端

*/

object EsTest {

def main(args: Array[String]): Unit = {

// post()

// bulk()

// getById()

// searchByFilter()

searchByAggs()

close()

/*

增:幂等

*/

def put(): Unit = {

val indexRequest = new IndexRequest()

// 指定索引

indexRequest.index("movie_test")

// 指定doc

val movie: Movie = Movie("1001", "速度与激情1")

val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))

indexRequest.source(movieJson, XContentType.JSON)

indexRequest.index("movie_test")

indexRequest.id("1001")

client.index(indexRequest, RequestOptions.DEFAULT)

}

/*

增:非幂等写,不指定id

*/

def post(): Unit = {

val indexRequest = new IndexRequest()

// 指定索引

indexRequest.index("movie_test")

// 指定doc

val movie: Movie = Movie("1002", "速度与激情2")

val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))

indexRequest.source(movieJson, XContentType.JSON)

indexRequest.index("movie_test")

client.index(indexRequest, RequestOptions.DEFAULT)

}

/*

批量写

*/

def bulk(): Unit = {

val bulkRequest = new BulkRequest()

val movies: List[Movie] = List[Movie](

Movie("1002", "长津湖"),

Movie("1003", "熊出没"),

Movie("1004", "狙击手"),

Movie("1005", "长门桥")

)

for (movie <- movies) {

// 指定索引

val indexRequest = new IndexRequest("movie_test")

val movieJson: String = JSON.toJSONString(movie, new SerializeConfig(true))

indexRequest.source(movieJson, XContentType.JSON)

// 如果是幂等,就指定id,不是幂等就不指定

indexRequest.id(movie.id)

// 将indexRequest加入到bulk批次中

bulkRequest.add(indexRequest)

}

// 最后一次批次执行

client.bulk(bulkRequest, RequestOptions.DEFAULT)

}

/*

修改:单条修改

*/

def update(): Unit = {

val updateRequest = new UpdateRequest()

updateRequest.index("movie_test")

updateRequest.id("1001")

// 把docid为1001的数据,修改movie_name的值

updateRequest.doc("movie_name", "功夫")

client.update(updateRequest, RequestOptions.DEFAULT)

}

/*

修改:条件修改 把电影名为速度与激情的都修改

*/

def updateByQuery(): Unit = {

val updateByQueryRequest = new UpdateByQueryRequest("movie_test")

// query

val boolQueryBuilder: BoolQueryBuilder = QueryBuilders.boolQuery()

val termQueryBuilder: TermQueryBuilder = QueryBuilders.termQuery("movie_name.keyword", "速度与激情")

boolQueryBuilder.filter(termQueryBuilder)

updateByQueryRequest.setQuery(boolQueryBuilder)

// update

val params = new util.HashMap[String, AnyRef]()

params.put("newName", "湄公河行动")

val script = new Script(

ScriptType.INLINE,

Script.DEFAULT_SCRIPT_LANG,

"ctx._source['movie_name']=params.newName",

params

)

updateByQueryRequest.setScript(script)

client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT)

}

/*

删除

*/

def delete(): Unit = {

val deleteRequest = new DeleteRequest("movie_test", "1001")

client.delete(deleteRequest, RequestOptions.DEFAULT)

}

/*

查询:单条查询

*/

def getById(): Unit = {

val getRequest = new GetRequest("movie_test", "1001")

val response: GetResponse = client.get(getRequest, RequestOptions.DEFAULT)

println(response)

// {"_index":"movie_test","_type":"_doc","_id":"1001","_version":2,"_seq_no":1,"_primary_term":1,"found":true,"_source":{"id":"1001","movie_name":"速度与激情1"}}

val dataStr: String = response.getSourceAsString

println(dataStr)

}

/*

查询:条件查询

查询doubanScore>=5.0,关键词搜索red sea

关键词高亮显示,显示第一页,每页2条,按照查询doubanScore从大到小排序

*/

def searchByFilter(): Unit = {

// 这样复杂的条件搜索,先写DSL

/*

POST movie_index/_update_by_query{"query":{"bool":{"filter":[{"range":{"doubanScore":{"gte":5.0}}}],"must":[{"match":{"name":"redsea"}}]}},"highlight":{"fields":{"name":{}}},"from":0,"size":2,"sort":[{"doubanScore":{"order":"desc"}}]}

*/

val searchRequest = new SearchRequest("movie_index")

val searchSourceBuilder = new SearchSourceBuilder()

// bool

val boolQueryBuilder: BoolQueryBuilder = QueryBuilders.boolQuery()

// filter

val rangeQueryBuilder: RangeQueryBuilder = QueryBuilders.rangeQuery("doubanScore").gte(5.0)

boolQueryBuilder.filter(rangeQueryBuilder)

// must

val matchQueryBuilder: MatchQueryBuilder = QueryBuilders.matchQuery("name", "red sea")

boolQueryBuilder.must(matchQueryBuilder)

searchSourceBuilder.query(boolQueryBuilder)

// 高亮

val highlightBuilder = new HighlightBuilder()

highlightBuilder.field("name")

searchSourceBuilder.highlighter(highlightBuilder)

// 分页

searchSourceBuilder.from(0)

searchSourceBuilder.size(2)

// 排序

searchSourceBuilder.sort("doubanScore", SortOrder.DESC)

searchRequest.source(searchSourceBuilder)

val searchResponse: SearchResponse = client.search(searchRequest, RequestOptions.DEFAULT)

// 获取总条数

val totalDocs: Long = searchResponse.getHits.getTotalHits.value

// 获取明细数据

val hits: Array[SearchHit] = searchResponse.getHits.getHits

for (hit <- hits) {

// 提取数据

val dataJson: String = hit.getSourceAsString()

// 提取高亮

val highlightFields: util.Map[String, HighlightField] = hit.getHighlightFields

val highlightField: HighlightField = highlightFields.get("name")

val fragments: Array[Text] = highlightField.getFragments

val highlightValue: String = fragments(0).toString

println("明细数据" + dataJson)

println("高亮数据" + highlightValue)

}

}

/*

查询:聚合查询

查询每位演员参演的电影的平均分,倒叙排列

*/

def searchByAggs(): Unit = {

// GET/movie_index/_search{"aggs":{"groupbyactorname":{"terms":{"field":"actorList.name.keyword","size":10,"order":{"doubanscoreavg":"desc"}},"aggs":{"doubanscoreavg":{"avg":{"field":"doubanScore"}}}}},"size":0}

val searchRequest = new SearchRequest("movie_index")

val searchSourceBuilder = new SearchSourceBuilder()

// 不要明细

searchSourceBuilder.size(0)

// group

val termsAggregationBuilder: TermsAggregationBuilder = AggregationBuilders

.terms("groupbyactorname")

.field("actorList.name.keyword")

.size(10)

.order(BucketOrder.aggregation("doubanscoreavg", false))

// avg

val avgAggregationBuilder: AvgAggregationBuilder = AggregationBuilders.avg("doubanscoreavg").field("doubanScore")

termsAggregationBuilder.subAggregation(avgAggregationBuilder)

searchSourceBuilder.aggregation(termsAggregationBuilder)

searchRequest.source(searchSourceBuilder)

val searchResponse: SearchResponse = client.search(searchRequest, RequestOptions.DEFAULT)

// 拿到演员与平均分

val aggregations: Aggregations = searchResponse.getAggregations

val groupbyactornameParsedTerms: ParsedTerms = aggregations.get[ParsedTerms]("groupbyactorname")

val buckets: util.List[_ <: Terms.Bucket] = groupbyactornameParsedTerms.getBuckets

import scala.collection.JavaConverters._

for (bucket <- buckets.asScala) {

// 演员名字

val actorName: String = bucket.getKeyAsString

// 电影个数

val movieCount: Long = bucket.getDocCount

// 平均分

val aggregations: Aggregations = bucket.getAggregations

val doubanscoreavgParsedAvg: ParsedAvg = aggregations.get[ParsedAvg]("doubanscoreavg")

val avgScore: Double = doubanscoreavgParsedAvg.getValue

println(s"${actorName} 共参演了 ${movieCount} 部,平均分为 ${avgScore}")

}

}

}

// 声明客户端对象

var client: RestHighLevelClient = create()

// 创建客户端对象

def create(): RestHighLevelClient = {

val builder: RestClientBuilder = RestClient.builder(new HttpHost("127.0.0.1", 9200))

val esClient = new RestHighLevelClient(builder)

esClient

}

// 关闭客户端对象

def close(): Unit = {

client.close()

client = null

}

case class Movie(id: String, movie_name: String)

}

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。