赞
踩
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.gridsum.mediad3</groupId> <artifactId>estest</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.2.4</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.2.4</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --> </dependencies> </project>
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="warn"> <Appenders> <!-- 这个输出控制台的配置 --> <Console name="Console" target="SYSTEM_OUT"> <!-- 控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch) --> <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/> <!-- 这个都知道是输出日志的格式 --> <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/> </Console> <!-- 文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,这个也挺有用的,适合临时测试用 --> <!-- append为TRUE表示消息增加到指定文件中,false表示消息覆盖指定的文件内容,默认值是true --> <File name="log" fileName="log/test.log" append="false"> <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/> </File> <!-- 添加过滤器ThresholdFilter,可以有选择的输出某个级别以上的类别 onMatch="ACCEPT" onMismatch="DENY"意思是匹配就接受,否则直接拒绝 --> <File name="ERROR" fileName="logs/error.log"> <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/> </File> <!-- 这个会打印出所有的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档 --> <RollingFile name="RollingFile" fileName="logs/web.log" filePattern="logs/$${date:yyyy-MM}/web-%d{MM-dd-yyyy}-%i.log.gz"> <PatternLayout pattern="%d{yyyy-MM-dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/> <SizeBasedTriggeringPolicy size="2MB"/> </RollingFile> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="Console" /> </Root> </Loggers> </Configuration>
ESTest.java
package com.gridsum.mediad3; import java.io.IOException; import java.net.InetAddress; import java.util.Date; import java.text.SimpleDateFormat; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.search.SearchHit; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.action.get.GetResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ESTest { private final static Logger logger = LogManager.getLogger(ESTest.class); public final static String HOST = "52.231.155.124"; public final static int PORT = 9300; static public TransportClient client; private static byte[] LOCK = new byte[0]; private final static String article="article"; private final static String content="content"; public static void main(String[] args) throws Exception { initElasticSearchService(); deleteIndex(article); createIndexAndMapping(); addIndexAndDocument(article,content); bulkRequest(article,content); multiSearchResponse(article); searchById(article,content,"101"); updateDocument(article,content,"101"); searchById(article,content,"101"); deleteById(article,content,"101"); deleteIndex(article); } public static void initElasticSearchService() throws Exception { synchronized (LOCK) { if (client == null) { Settings settings = Settings.builder().put("cluster.name", "my-application") // .put("client.transport.sniff", true) .build(); try { client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName(HOST), PORT)); //logger.info("This is info message."); } catch (Exception e) { throw new Exception("es init failed!", e); } } } } public static void createIndex(String indexName) { client.admin().indices().create(new CreateIndexRequest(indexName)).actionGet(); } public static void createIndex(String index, String type) { client.prepareIndex(index, type).setSource().get(); } public static void createIndexAndMapping() throws IOException { CreateIndexRequestBuilder cib=client.admin().indices().prepareCreate(article); XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() .startObject("properties") //设置之定义字段 .startObject("author") .field("type","text") //设置数据类型 .endObject() .startObject("title") .field("type","text") .endObject() .startObject("content") .field("type","text") .endObject() .startObject("price") .field("type","text") .endObject() .startObject("view") .field("type","text") .endObject() .startObject("tag") .field("type","text") .endObject() .startObject("date") .field("type","date") //设置Date类型 .field("format","yyyy-MM-dd HH:mm:ss") //设置Date的格式 .endObject() .endObject() .endObject(); cib.addMapping(content, mapping); CreateIndexResponse res=cib.execute().actionGet(); logger.info("----------添加映射成功----------"); logger.info(res.isAcknowledged()); } public static void addIndexAndDocument(String index, String type) throws Exception{ Date time = new Date(); IndexResponse response = client.prepareIndex(index, type) .setSource(XContentFactory.jsonBuilder().startObject() .field("id","447") .field("author","fendo") .field("title","192.138.1.2") .field("content","这是JAVA有关的书籍") .field("price","20") .field("view","100") .field("tag","a,b,c,d,e,f") .field("date",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time)) .endObject()) .get(); logger.info("添加索引成功,版本号:"+response.getVersion()); } public static void bulkRequest(String index,String type) throws Exception { BulkRequestBuilder bulkRequest = client.prepareBulk(); Date time = new Date(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex(index, type, "199") .setSource(XContentFactory.jsonBuilder() .startObject() .field("id","199") .field("author","fendo") .field("title","BULK") .field("content","这是BULK有关的书籍") .field("price","40") .field("view","300") .field("tag","a,b,c") .field("date",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time)) .endObject() ) ); bulkRequest.add(client.prepareIndex(index,type, "101") .setSource(XContentFactory.jsonBuilder() .startObject() .field("id","101") .field("author","fendo") .field("title","ACKSE") .field("content","这是ACKSE有关的书籍") .field("price","50") .field("view","200") .field("tag","a,b,c") .field("date",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time)) .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item logger.error(bulkResponse.getTook()); } } public static void updateDocument(String index, String type, String id) throws Exception{ Date time = new Date(); //创建修改请求 UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index(index); updateRequest.type(type); updateRequest.id(id); updateRequest.doc(XContentFactory.jsonBuilder() .startObject() .field("author","FKSE") .field("title","JAVA思想") .field("content","注意:这是JAVA有关的书籍") .field("date",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time)) .endObject()); UpdateResponse response = client.update(updateRequest).get(); logger.info("更新索引成功",response.toString()); } public static void searchById(String index, String type, String id){ GetResponse response = client.prepareGet(index,type,id).execute() .actionGet(); String json = response.getSourceAsString(); if (null != json) { logger.info(json); } else { logger.info("未查询到任何结果!"); } } public static void deleteById(String index, String type, String id){ DeleteResponse dResponse = client.prepareDelete(index,type, id).execute().actionGet(); if (DocWriteResponse.Result.NOT_FOUND.equals(dResponse.getResult())) { logger.info("删除失败"); } else { logger.info("删除成功"); } } public static void deleteIndex(String index){ IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(index); IndicesExistsResponse inExistsResponse = client.admin().indices() .exists(inExistsRequest).actionGet(); if(inExistsResponse.isExists()){ logger.info("index:"+index+"是否存在:"+inExistsResponse.isExists()); DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index) .execute().actionGet(); logger.info("index:"+index+"是否删除成功:"+dResponse.isAcknowledged()); } else{ logger.info("index:"+index+"是否存在:"+inExistsResponse.isExists()); } } public static void multiSearchResponse(String index){ logger.info("multiSearch Query Start!"); SearchRequestBuilder srb1 = client.prepareSearch(index).setQuery(QueryBuilders.matchQuery("price","50")); logger.info(QueryBuilders.matchQuery("price","50").toString()); SearchRequestBuilder srb2 = client.prepareSearch(index).setQuery(QueryBuilders.matchQuery("title", "C")); logger.info(QueryBuilders.matchQuery("title", "C").toString()); MultiSearchResponse sr = client.prepareMultiSearch().add(srb1).get(); for (MultiSearchResponse.Item item : sr.getResponses()) { SearchResponse response = item.getResponse(); for (SearchHit searchHit : response.getHits()) { logger.info(searchHit); } } logger.info("multiSearch Query End!"); } }
访问HTTP端口,Java封装的API
package com.gridsum.mediad3; import org.apache.http.HttpHost; //import org.apache.http.protocol.HTTP; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.index.VersionType; import org.elasticsearch.common.Strings; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.Map; import java.util.Date; import java.util.HashMap; import java.util.Collections; public class ESRestFulAPI { private static final String HOST = "52.231.155.124"; private static final int HTTP_PORT = 9200; private static final String PROTOCOL = "HTTP"; private final static Logger logger = LogManager.getLogger(ESRestFulAPI.class); private static RestHighLevelClient client; public static void main(String[] args){ init(); createIndex("twitter_two"); openIndex("twitter_two"); deleteIndex("twitter_two"); } public static void init() { logger.info("REST API Start!"); client = new RestHighLevelClient( RestClient.builder( new HttpHost(HOST,HTTP_PORT,PROTOCOL) ) ); logger.info("REST API End!"); } public static void createIndex(String index) { CreateIndexRequest request = new CreateIndexRequest(index);//创建索引 //创建的每个索引都可以有与之关联的特定设置。 request.settings(Settings.builder() .put("index.number_of_shards", 3) .put("index.number_of_replicas", 2) ); //创建索引时创建文档类型映射 request.mapping("tweet",//类型定义 " {\n" + " \"tweet\": {\n" + " \"properties\": {\n" + " \"message\": {\n" + " \"type\": \"text\"\n" + " }\n" + " }\n" + " }\n" + " }",//类型映射,需要的是一个JSON字符串 XContentType.JSON); //为索引设置一个别名 request.alias( new Alias(index+"_alias") ); //可选参数 request.timeout(TimeValue.timeValueMinutes(2));//超时,等待所有节点被确认(使用TimeValue方式) //request.timeout("2m");//超时,等待所有节点被确认(使用字符串方式) request.masterNodeTimeout(TimeValue.timeValueMinutes(1));//连接master节点的超时时间(使用TimeValue方式) //request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式) request.waitForActiveShards(2);//在创建索引API返回响应之前等待的活动分片副本的数量,以int形式表示。 //request.waitForActiveShards(ActiveShardCount.DEFAULT);//在创建索引API返回响应之前等待的活动分片副本的数量,以ActiveShardCount形式表示。 //同步执行 try { CreateIndexResponse createIndexResponse = client.indices().create(request); //返回的CreateIndexResponse允许检索有关执行的操作的信息,如下所示: boolean acknowledged = createIndexResponse.isAcknowledged();//指示是否所有节点都已确认请求 boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();//指示是否在超时之前为索引中的每个分片启动了必需的分片副本数 logger.info("acknowledged:"+acknowledged); logger.info("shardsAcknowledged:"+shardsAcknowledged); } catch (IOException IOe) { logger.error("IOException!"+IOe.toString()); } //异步执行 //异步执行创建索引请求需要将CreateIndexRequest实例和ActionListener实例传递给异步方法: //CreateIndexResponse的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { //如果执行成功,则调用onResponse方法; logger.info("createIndex successful!"); } @Override public void onFailure(Exception e) { //如果失败,则调用onFailure方法。 logger.info("createIndex failure!"); } }; client.indices().createAsync(request, listener);//要执行的CreateIndexRequest和执行完成时要使用的ActionListener } public static void deleteIndex(String index) { DeleteIndexRequest request = new DeleteIndexRequest(index);//指定要删除的索引名称 //可选参数: request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引删除(使用TimeValue形式) // request.timeout("2m"); //设置超时,等待所有节点确认索引删除(使用字符串形式) request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式) // request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式) //设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式 request.indicesOptions(IndicesOptions.lenientExpandOpen()); try { //同步执行 DeleteIndexResponse deleteIndexResponse = client.indices().delete(request); //Delete Index Response //返回的DeleteIndexResponse允许检索有关执行的操作的信息,如下所示: boolean acknowledged = deleteIndexResponse.isAcknowledged();//是否所有节点都已确认请求 logger.info("acknowledged"+acknowledged); } catch (IOException IOe){ logger.error(IOe.toString()); } /* //异步执行删除索引请求需要将DeleteIndexRequest实例和ActionListener实例传递给异步方法: //DeleteIndexResponse的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<DeleteIndexResponse> listener = new ActionListener<DeleteIndexResponse>() { @Override public void onResponse(DeleteIndexResponse deleteIndexResponse) { //如果执行成功,则调用onResponse方法; } @Override public void onFailure(Exception e) { //如果失败,则调用onFailure方法。 } }; client.indices().deleteAsync(request, listener);*/ //如果找不到索引,则会抛出ElasticsearchException: try { request = new DeleteIndexRequest("does_not_exist"); client.indices().delete(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.NOT_FOUND) { //如果没有找到要删除的索引,要执行某些操作 logger.error(exception.toString()); } } catch (IOException IOe) { logger.error(IOe.toString()); } } public static void openIndex(String index){ OpenIndexRequest request = new OpenIndexRequest("twitter");//打开索引 //可选参数: request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引已打开(使用TimeValue形式) // request.timeout("2m"); //设置超时,等待所有节点确认索引已打开(使用字符串形式) request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式) // request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式) request.waitForActiveShards(2);//在打开索引API返回响应之前等待的活动分片副本的数量,以int形式表示。 //request.waitForActiveShards(ActiveShardCount.ONE);//在打开索引API返回响应之前等待的活动分片副本的数量,以ActiveShardCount形式表示。 //设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式 request.indicesOptions(IndicesOptions.strictExpandOpen()); try { //同步执行 OpenIndexResponse openIndexResponse = client.indices().open(request); //Open Index Response //返回的OpenIndexResponse允许检索有关执行的操作的信息,如下所示: boolean acknowledged = openIndexResponse.isAcknowledged();//指示是否所有节点都已确认请求 boolean shardsAcknowledged = openIndexResponse.isShardsAcknowledged();//指示是否在超时之前为索引中的每个分片启动了必需的分片副本数 logger.info("acknowledged: " + acknowledged); logger.info("shardsAcknowledged: " + shardsAcknowledged); } catch (IOException IOe){ logger.error(IOe.toString()); } /*//异步执行打开索引请求需要将OpenIndexRequest实例和ActionListener实例传递给异步方法: //OpenIndexResponse的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<OpenIndexResponse> listener = new ActionListener<OpenIndexResponse>() { @Override public void onResponse(OpenIndexResponse openIndexResponse) { //如果执行成功,则调用onResponse方法; } @Override public void onFailure(Exception e) { //如果失败,则调用onFailure方法。 } }; client.indices().openAsync(request, listener);*/ } public static void closeIndex(String index){ CloseIndexRequest request = new CloseIndexRequest("index");//关闭索引 //可选参数: request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引已关闭(使用TimeValue形式) // request.timeout("2m"); //设置超时,等待所有节点确认索引已关闭(使用字符串形式) request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式) // request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式) //设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式 request.indicesOptions(IndicesOptions.lenientExpandOpen()); try { //同步执行 CloseIndexResponse closeIndexResponse = client.indices().close(request); //Close Index Response //返回的CloseIndexResponse 允许检索有关执行的操作的信息,如下所示: boolean acknowledged = closeIndexResponse.isAcknowledged(); //指示是否所有节点都已确认请求 logger.info("acknowledged: " + acknowledged); } catch (IOException IOe){ logger.error(IOe.toString()); } /*//异步执行打开索引请求需要将CloseIndexRequest实例和ActionListener实例传递给异步方法: //CloseIndexResponse的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<CloseIndexResponse> listener = new ActionListener<CloseIndexResponse>() { @Override public void onResponse(CloseIndexResponse closeIndexResponse) { //如果执行成功,则调用onResponse方法; } @Override public void onFailure(Exception e) { //如果失败,则调用onFailure方法。 } }; client.indices().closeAsync(request, listener); */ } public static void createDoc(){ IndexRequest indexRequest1 = new IndexRequest( "posts",//索引名称 "doc",//类型名称 "1");//文档ID //==============================提供文档源======================================== //方式1:以字符串形式提供 String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; indexRequest1.source(jsonString, XContentType.JSON); //方式2:以Map形式提供 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); //Map会自动转换为JSON格式的文档源 IndexRequest indexRequest2 = new IndexRequest("posts", "doc", "1") .source(jsonMap); // 方式3:文档源以XContentBuilder对象的形式提供,Elasticsearch内部会帮我们生成JSON内容 try { XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.field("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest3 = new IndexRequest("posts", "doc", "1") .source(builder); }catch (IOException IOe){ logger.error(IOe.toString()); } //方式4:以Object key-pairs提供的文档源,它会被转换为JSON格式 IndexRequest indexRequest4 = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch"); //===============================可选参数start==================================== indexRequest1.routing("routing");//设置路由值 indexRequest1.parent("parent");//设置parent值 //设置超时:等待主分片变得可用的时间 indexRequest1.timeout(TimeValue.timeValueSeconds(1));//TimeValue方式 indexRequest1.timeout("1s");//字符串方式 //刷新策略 indexRequest1.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy实例方式 indexRequest1.setRefreshPolicy("wait_for");//字符串方式 indexRequest1.version(2);//设置版本 indexRequest1.versionType(VersionType.EXTERNAL);//设置版本类型 //操作类型 indexRequest1.opType(DocWriteRequest.OpType.CREATE);//DocWriteRequest.OpType方式 indexRequest1.opType("create");//字符串方式, 可以是 create 或 update (默认) //The name of the ingest pipeline to be executed before indexing the document indexRequest1.setPipeline("pipeline"); //===============================执行==================================== //同步执行 try { IndexResponse indexResponse = client.index(indexRequest1); //Index Response //返回的IndexResponse允许检索有关执行操作的信息,如下所示: String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { //处理(如果需要)第一次创建文档的情况 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { //处理(如果需要)文档被重写的情况 } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //处理成功分片数量少于总分片数量的情况 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason();//处理潜在的失败 } } }catch(IOException IOe){ logger.error(IOe.toString()); } //异步执行 //IndexResponse 的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { //执行成功时调用。 Response以参数方式提供 } @Override public void onFailure(Exception e) { //在失败的情况下调用。 引发的异常以参数方式提供 } }; //异步执行索引请求需要将IndexRequest实例和ActionListener实例传递给异步方法: client.indexAsync(indexRequest2, listener); //如果存在版本冲突,则会抛出ElasticsearchException: IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //引发的异常表示返回了版本冲突错误 } }catch (IOException IOe){ logger.error(IOe.toString()); } //如果opType设置为创建但是具有相同索引,类型和ID的文档已存在,则也会发生同样的情况: request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //引发的异常表示返回了版本冲突错误 } } catch (IOException IOe){ logger.error(IOe.toString()); } } public static void getDoc(){ GetRequest getRequest = new GetRequest( "posts",//索引 "doc",//类型 "1");//文档ID //===============================可选参数start==================================== //禁用_source检索,默认为启用 getRequest.fetchSourceContext(new FetchSourceContext(false)); //为特定字段配置_source_include String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); getRequest.fetchSourceContext(fetchSourceContext); //为指定字段配置_source_exclude String[] includes1 = Strings.EMPTY_ARRAY; String[] excludes1 = new String[]{"message"}; FetchSourceContext fetchSourceContext1 = new FetchSourceContext(true, includes, excludes); getRequest.fetchSourceContext(fetchSourceContext); //配置指定stored_fields的检索(要求字段在映射中单独存储) getRequest.storedFields("message"); try { GetResponse getResponse = client.get(getRequest); //检索message 存储字段(要求将字段分开存储在映射中) String message = getResponse.getField("message").getValue(); //Get Response //返回的GetResponse允许检索请求的文档及其元数据和最终存储的字段。 String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString();//检索文档(String形式) Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();//检索文档(Map<String, Object>形式) byte[] sourceAsBytes = getResponse.getSourceAsBytes();//检索文档(byte[]形式) } else { /* 处理找不到文档的情况。 请注意,尽管返回404状态码, 但返回的是有效的GetResponse,而不是抛出的异常。 此类Response不包含任何源文档,并且其isExists方法返回false。*/ } }catch(IOException IOe){ logger.error(IOe.toString()); } getRequest.routing("routing");//设置routing值 getRequest.parent("parent");//设置parent值 getRequest.preference("preference");//设置preference值 getRequest.realtime(false);//设置realtime为false,默认是true getRequest.refresh(true);//在检索文档之前执行刷新(默认为false) getRequest.version(2);//设置版本 getRequest.versionType(VersionType.EXTERNAL);//设置版本类型 //===============================可选参数end==================================== //同步执行 try { GetResponse getResponse1 = client.get(getRequest); }catch(IOException IOe){ logger.error(IOe.toString()); } //异步执行 //GetResponse 的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<GetResponse> listener = new ActionListener<GetResponse>() { @Override public void onResponse(GetResponse getResponse) { //执行成功时调用。 Response以参数方式提供 } @Override public void onFailure(Exception e) { //在失败的情况下调用。 引发的异常以参数方式提供 } }; //异步执行获取索引请求需要将GetRequest 实例和ActionListener实例传递给异步方法: client.getAsync(getRequest, listener); //当针对不存在的索引执行获取请求时,响应404状态码,将引发ElasticsearchException,需要按如下方式处理: GetRequest request = new GetRequest("does_not_exist", "doc", "1"); try { GetResponse getResponse2 = client.get(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { //处理因为索引不存在而抛出的异常情况 } }catch(IOException IOe){ logger.error(IOe.toString()); } //如果请求了特定的文档版本,并且现有文档具有不同的版本号,则会引发版本冲突: try { GetRequest request1 = new GetRequest("posts", "doc", "1").version(2); GetResponse getResponse3 = client.get(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { //引发的异常表示返回了版本冲突错误 } } catch(IOException IOe) { logger.error(IOe.toString()); } } public static void deleteDoc(){ DeleteRequest request = new DeleteRequest ( "posts",//索引 "doc",//类型 "1");//文档ID //===============================可选参数==================================== request.routing("routing");//设置routing值 request.parent("parent");//设置parent值 //设置超时:等待主分片变得可用的时间 request.timeout(TimeValue.timeValueMinutes(2));//TimeValue方式 request.timeout("1s");//字符串方式 //刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy实例方式 request.setRefreshPolicy("wait_for");//字符串方式 request.version(2);//设置版本 request.versionType(VersionType.EXTERNAL);//设置版本类型 //同步执行 try { DeleteResponse deleteResponse = client.delete(request); //Delete Response //返回的DeleteResponse允许检索有关执行操作的信息,如下所示: String index = deleteResponse.getIndex(); String type = deleteResponse.getType(); String id = deleteResponse.getId(); long version = deleteResponse.getVersion(); ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //处理成功分片数量少于总分片数量的情况 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason();//处理潜在的失败 } } //还可以检查文档是否被找到: DeleteRequest request1 = new DeleteRequest("posts", "doc", "does_not_exist"); DeleteResponse deleteResponse1 = client.delete(request); if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { //如果找不到要删除的文档,执行某些操作 } }catch(IOException IOe){ logger.error(IOe.toString()); } //异步执行 //DeleteResponse 的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<DeleteResponse > listener = new ActionListener<DeleteResponse >() { @Override public void onResponse(DeleteResponse getResponse) { //执行成功时调用。 Response以参数方式提供 } @Override public void onFailure(Exception e) { //在失败的情况下调用。 引发的异常以参数方式提供 } }; //异步执行获取索引请求需要将DeleteRequest 实例和ActionListener实例传递给异步方法: client.deleteAsync(request, listener); //如果存在版本冲突,则会抛出ElasticsearchException: try { DeleteRequest request2 = new DeleteRequest("posts", "doc", "1").version(2); DeleteResponse deleteResponse2 = client.delete(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { //引发的异常表示返回了版本冲突错误 } } catch(IOException IOe){ logger.error(IOe.toString()); } } public static void updateDoc(){ UpdateRequest request = new UpdateRequest ( "test",//索引 "_doc",//类型 "1");//文档ID //更新API允许通过使用脚本或传递部分文档来更新现有文档。 //使用脚本 //方式1:该脚本可以作为内联脚本提供: Map<String, Object> parameters = Collections.singletonMap("count", 4);//脚本参数 //使用painless语言和上面的参数创建一个内联脚本 Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters); request.script(inline); //方式2:引用名称为increment-field的脚本,改脚本定义的位置还没搞清楚。 Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters); request.script(stored); //只更新部分 //更新部分文档时,更新的部分文档将与现有文档合并。 //方式1:使用字符串形式 UpdateRequest request1 = new UpdateRequest("posts", "doc", "1"); String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request1.doc(jsonString, XContentType.JSON); //方式2:使用Map形式,会被自动转为json格式 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request2 = new UpdateRequest("posts", "doc", "1") .doc(jsonMap); //方式3:使用XContentBuilder形式 try { XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request3 = new UpdateRequest("posts", "doc", "1") .doc(builder); }catch(IOException IOe){ logger.error(IOe.toString()); } //方式4:使用Object key-pairs形式 UpdateRequest request4 = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update"); //如果文档尚不存在,则可以使用upsert方法定义一些将作为新文档插入的内容: //与部分文档更新类似,可以使用接受String,Map,XContentBuilder或Object key-pairs的方式来定义upsert文档的内容。 String jsonString1 = "{\"created\":\"2017-01-01\"}"; request.upsert(jsonString1, XContentType.JSON); //=========================可选参数=========================== request.routing("routing");//设置routing值 request.parent("parent");//设置parent值 //设置超时:等待主分片变得可用的时间 request.timeout(TimeValue.timeValueSeconds(1));//TimeValue方式 request.timeout("1s");//字符串方式 //刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy实例方式 request.setRefreshPolicy("wait_for");//字符串方式 //如果要更新的文档在获取或者索引阶段已被另一操作更改,则重试更新操作的次数 request.retryOnConflict(3); request.version(2);//设置版本 request.fetchSource(true); //启用_source检索,默认为禁用 //为特定字段配置_source_include String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource(new FetchSourceContext(true, includes, excludes)); //为指定字段配置_source_exclude String[] includes1 = Strings.EMPTY_ARRAY; String[] excludes1 = new String[]{"updated"}; request.fetchSource(new FetchSourceContext(true, includes1, excludes1)); request.detectNoop(false);//禁用noop检测 //无论文档是否存在,脚本都必须运行,即如果脚本尚不存在,则脚本负责创建文档。 request.scriptedUpsert(true); //如果不存在,则表明部分文档必须用作upsert文档。 request.docAsUpsert(true); //设置在继续更新操作之前必须激活的分片副本的数量。 request.waitForActiveShards(2); //使用ActiveShardCount方式,可以是ActiveShardCount.ALL,ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默认值) request.waitForActiveShards(ActiveShardCount.ALL); //同步执行 try { UpdateResponse updateResponse = client.update(request); //Update Response //返回的UpdateResponse允许检索有关执行操作的信息,如下所示: String index = updateResponse.getIndex(); String type = updateResponse.getType(); String id = updateResponse.getId(); long version = updateResponse.getVersion(); if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { //处理第一次创建文档的情况(upsert) } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { //处理文档被更新的情况 } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { //处理文档已被删除的情况 } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { //处理文档未受更新影响的情况,即文档上未执行任何操作(noop) } //当通过fetchSource方法在UpdateRequest中启用源检索时,响应会包含已更新文档: GetResult result = updateResponse.getGetResult();//获取已更新的文档 if (result.isExists()) { String sourceAsString = result.sourceAsString();//获取已更新的文档源(String方式) Map<String, Object> sourceAsMap = result.sourceAsMap();//获取已更新的文档源(Map方式) byte[] sourceAsBytes = result.source();//获取已更新的文档源(byte[]方式) } else { //处理不返回文档源的场景(默认就是这种情况) } //也可以检查分片失败: ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //处理成功分片数量少于总分片数量的情况 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason();//处理潜在的失败 } } }catch(IOException IOe){ logger.error(IOe.toString()); } //异步执行 //DeleteResponse 的典型监听器如下所示: //异步方法不会阻塞并立即返回。 ActionListener<UpdateResponse > listener = new ActionListener<UpdateResponse >() { @Override public void onResponse(UpdateResponse updateResponse) { //执行成功时调用。 Response以参数方式提供 } @Override public void onFailure(Exception e) { //在失败的情况下调用。 引发的异常以参数方式提供 } }; //异步执行获取索引请求需要将UpdateRequest 实例和ActionListener实例传递给异步方法: client.updateAsync(request, listener); //当针对文档不存在时,响应404状态码,将引发ElasticsearchException,需要按如下方式处理: UpdateRequest request5 = new UpdateRequest("posts", "type", "does_not_exist").doc("field", "value"); try { UpdateResponse updateResponse5 = client.update(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { //处理由于文档不存在抛出的异常 } } catch(IOException IOe){ logger.error(IOe.toString()); } //如果存在版本冲突,则会抛出ElasticsearchException: UpdateRequest request6 = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse6 = client.update(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //引发的异常表示返回了版本冲突错误 } } catch(IOException IOe){ logger.error(IOe.toString()); } } }
直接通过Java HTTP接口访问ES
package com.gridsum.mediad3; import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.HttpHost; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import java.io.IOException; import java.util.Collections; public class ESHTTPAPI { private static RestClient restClient; private final static String HOST = "52.231.155.124"; private final static int PORT = 9200; public static void main(String[] args) { getRest(); try { catApi(); createIndex(); createDocument(); getDocument(); queryAll(); queryByField(); updateByScript(); geoBoundingBox(); } catch (Exception e){ System.out.println(e.toString()); } } public static void getRest(){ restClient = RestClient.builder(new HttpHost(HOST, PORT, "http")).build(); } public static void catApi() throws Exception{ String method = "GET"; String endpoint = "/_cat"; Response response = restClient.performRequest(method,endpoint); System.out.println(EntityUtils.toString(response.getEntity())); } public static void createIndex() throws Exception{ String method = "PUT"; String endpoint = "/test-index"; Response response = restClient.performRequest(method,endpoint); System.out.println(EntityUtils.toString(response.getEntity())); } public static void createDocument()throws Exception{ String method = "PUT"; String endpoint = "/test-index/test/1"; HttpEntity entity = new NStringEntity( "{\n" + " \"user\" : \"kimchy\",\n" + " \"post_date\" : \"2009-11-15T14:12:12\",\n" + " \"message\" : \"trying out Elasticsearch\"\n" + "}", ContentType.APPLICATION_JSON); Response response = restClient.performRequest(method,endpoint, Collections.<String, String>emptyMap(),entity); System.out.println(EntityUtils.toString(response.getEntity())); } public static void getDocument()throws Exception{ String method = "GET"; String endpoint = "/test-index/test/1"; Response response = restClient.performRequest(method,endpoint); System.out.println(EntityUtils.toString(response.getEntity())); } public static void queryAll() throws Exception { String method = "POST"; String endpoint = "/test-index/test/_search"; HttpEntity entity = new NStringEntity("{\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " }\n" + "}", ContentType.APPLICATION_JSON); Response response = restClient.performRequest(method,endpoint,Collections.<String, String>emptyMap(),entity); System.out.println(EntityUtils.toString(response.getEntity())); } public static void queryByField() throws Exception { String method = "POST"; String endpoint = "/test-index/test/_search"; HttpEntity entity = new NStringEntity("{\n" + " \"query\": {\n" + " \"match\": {\n" + " \"user\": \"kimchy\"\n" + " }\n" + " }\n" + "}", ContentType.APPLICATION_JSON); Response response = restClient.performRequest(method,endpoint,Collections.<String, String>emptyMap(),entity); System.out.println(EntityUtils.toString(response.getEntity())); } public static void updateByScript() throws Exception { String method = "POST"; String endpoint = "/test-index/test/1/_update"; HttpEntity entity = new NStringEntity("{\n" + " \"doc\": {\n" + " \"user\":\"大美女\"\n" + " }\n" + "}", ContentType.APPLICATION_JSON); Response response = restClient.performRequest(method,endpoint,Collections.<String, String>emptyMap(),entity); System.out.println(EntityUtils.toString(response.getEntity())); } public static void geoBoundingBox() throws IOException { String method = "POST"; String endpoint = "/attractions/restaurant/_search"; HttpEntity entity = new NStringEntity("{\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " },\n" + " \"post_filter\": {\n" + " \"geo_bounding_box\": {\n" + " \"location\": {\n" + " \"top_left\": {\n" + " \"lat\": 39.990481,\n" + " \"lon\": 116.277144\n" + " },\n" + " \"bottom_right\": {\n" + " \"lat\": 39.927323,\n" + " \"lon\": 116.405638\n" + " }\n" + " }\n" + " }\n" + " }\n" + "}", ContentType.APPLICATION_JSON); Response response = restClient.performRequest(method,endpoint,Collections.<String, String>emptyMap(),entity); System.out.println(EntityUtils.toString(response.getEntity())); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。