我正在尝试在ES中执行操作,到目前为止,我相信我已经能够使用Jest(http request)正确建立连接,现在我正在尝试创建一个新主题并发布一些信息,以便可以看到它并抛出elasticsearch head插件,我运行代码时没有收到任何异常,但也没有任何反应,
public class ElasticSearch {
private String ES_HOST = "localhost";
private String ES_PORT = "9200";
private static JestClient jestClient = null;
public JestClient getElasticSearchClient() {
return jestClient;
}
public void connectToElasticSearch() {
try {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig.Builder("http://" + ES_HOST + ":" + ES_PORT)
.multiThreaded(true)
// //Per default this implementation will create no more than 2 concurrent
// connections per given route
// .defaultMaxTotalConnectionPerRoute(<YOUR_DESIRED_LEVEL_OF_CONCURRENCY_PER_ROUTE>)
// // and no more 20 connections in total
// .maxTotalConnection(<YOUR_DESIRED_LEVEL_OF_CONCURRENCY_TOTAL>)
.build());
jestClient = factory.getObject();
} catch (Exception e) {
e.printStackTrace();
}
}
public void createIndex(String indexName, String indexType) throws IOException {
// jestClient.execute(new CreateIndex.Builder(indexName).build());
PutMapping putMapping = new PutMapping.Builder(
indexName,
indexType,
"{ \"my_type\" : { \"properties\" : { \"message\" : {\"type\" : \"string\", \"store\" : \"yes\"} } } }"
).build();
jestClient.execute(putMapping);
}
public void postInES() throws IOException {
String source = jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", "date")
.field("message", "trying out Elastic Search")
.endObject().string();
}
public static void main(String[] args) throws IOException {
ElasticSearch es = new ElasticSearch();
es.connectToElasticSearch();
es.getElasticSearchClient();
es.createIndex("ES TEST", "TEST");
es.postInES();
}
我在用:
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.2.4</version>
</dependency>`enter code here`
感谢您的帮助
谢谢
最佳答案
谢谢。
我在上面的代码中发现了很少的问题,并且能够修复它,首先使用java时端口必须是9300,而不是9200,实际上我更改了整个代码,并决定使用TransportClient代替JestClient,这对我有所帮助。万一其他人需要或有类似问题,我会在这里分享我的代码,希望对其他人有帮助
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
/**
* @author YoavT @Date 6/26/2018 @Time 9:20 AM
*/
public class ElasticSearch{
private String ES_HOST = "localhost";
private int ES_PORT = 9300;
private TransportClient client = null;
protected boolean connectToElasticSearch(String clusterName) {
boolean flag = false;
try {
Settings settings =
Settings.builder()
.put("cluster.name", clusterName)
.put("client.transport.ignore_cluster_name", true)
.put("client.transport.sniff", true)
.build();
// create connection
client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new TransportAddress(InetAddress.getByName(ES_HOST), ES_PORT));
System.out.println(
"Connection " + clusterName + "@" + ES_HOST + ":" + ES_PORT + " established!");
flag = true;
} catch (Exception e) {
e.printStackTrace();
flag = false;
}
return flag;
}
/**
* Check the health status of the cluster
*/
public boolean isClusterHealthy(String clusterName) {
connectToElasticSearch(clusterName);
final ClusterHealthResponse response =
client
.admin()
.cluster()
.prepareHealth()
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
if (response.isTimedOut()) {
System.out.println("The cluster is unhealthy: " + response.getStatus());
return false;
}
System.out.println("The cluster is healthy: " + response.getStatus());
return true;
}
/**
* Previous step is (check if cluster is healthy) The cluster is ready now and we can start with
* creating an index. Before that, we check that the same index was not created previously.
*/
public boolean isIndexRegistered(String indexName, String clusterName) {
connectToElasticSearch(clusterName);
// check if index already exists
final IndicesExistsResponse ieResponse =
client.admin().indices().prepareExists(indexName).get(TimeValue.timeValueSeconds(1));
// index not there
if (!ieResponse.isExists()) {
return false;
}
System.out.println("Index already created!");
return true;
}
/**
* If the index does not exist already, we create the index. *
*/
public boolean createIndex(String indexName, String numberOfShards, String numberOfReplicas, String clusterName) {
connectToElasticSearch(clusterName);
try {
CreateIndexResponse createIndexResponse =
client
.admin()
.indices()
.prepareCreate(indexName.toLowerCase())
.setSettings(
Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", numberOfReplicas))
.get();
if (createIndexResponse.isAcknowledged()) {
System.out.println(
"Created Index with "
+ numberOfShards
+ " Shard(s) and "
+ numberOfReplicas
+ " Replica(s)!");
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) throws IOException {
ElasticSearch elasticSearch = new ElasticSearch();
elasticSearch.connectToElasticSearch("elasticsearch");
boolean isHealthy = elasticSearch.isClusterHealthy("elasticsearch");
System.out.println("is cluster healthy= " + isHealthy);
boolean isIndexExsist = elasticSearch.isIndexRegistered("Test", "elasticsearch");
System.out.println("is index exsist = " + isIndexExsist);
boolean createIndex = elasticSearch.createIndex("TestIndex", "3", "1", "elasticsearch");
System.out.println("Is index created = " + createIndex);
boolean bulkInsert = elasticSearch.bulkInsert("TestIndex", "Json", "elasticsearch");
System.out.println("Bulk insert = " + bulkInsert);
long deleteBulk = elasticSearch.deleteBulk("TestIndex", "name", "Mark Twain", "elasticsearch");
System.out.println("Delete bulk = " + deleteBulk);
}
/**
* We basically want to index a JSON array consisting of objects with the properties name and age. We use a bulk insert to insert all the data at once.
* In our tests it happened that the cluster health status was not ready when we tried to run a search/delete query directly after the insert. Consequently,
* we added the setRefreshPolicy( RefreshPolicy.IMMEDIATE ) method to signalize the server to refresh the index after the specified request.
* The data can now be queried directly after.
*
* @param indexName
* @param indexType
* @return
* @throws IOException
*/
public boolean bulkInsert(String indexName, String indexType, String clusterName) throws IOException {
connectToElasticSearch(clusterName);
boolean flag = true;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// for (int i = 0; i < listOfParametersForInsertion.length; i++) {
bulkRequest
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(
client
.prepareIndex(indexName, indexType, null)
.setSource(
XContentFactory.jsonBuilder()
.startObject()
.field("name", "Mark Twain")
.field("age", 75)
.endObject()));
// }
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
System.out.println("Bulk insert failed!");
flag = false;
}
return flag;
}
/**
* After successfully querying data, we try to delete documents using a key-value pair to get
* deeper into the Elasticsearch behavior.
*/
public long deleteBulk(String indexName, String key, String value, String clusterName) {
connectToElasticSearch(clusterName);
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE
.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery(key, value))
.source(indexName)
.refresh(true)
.get();
System.out.println("Deleted " + response.getDeleted() + " element(s)!");
return response.getDeleted();
}
/**
* To query the data, we use a SearchResponse in combination with a scroll. A scroll is basically
* the Elasticsearch counterpart to a cursor in a traditional SQL database. Using that sort of
* query is quite an overkill for our example and just for demonstration purposes. It is rather
* used to query large amounts of data (not like five documents in our case) and not intended for
* real-time user requests.
*
* @param indexName
* @param from
* @param to
*/
public void queryResultsWithFilter(String indexName, int from, int to, String clusterName, String filterField) {
connectToElasticSearch(clusterName);
SearchResponse scrollResp =
client
.prepareSearch(indexName)
// sort order
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
// keep results for 60 seconds
.setScroll(new TimeValue(60000))
// filter for age
.setPostFilter(QueryBuilders.rangeQuery(filterField).from(from).to(to))
// maximum of 100 hits will be returned for each scroll
.setSize(100)
.get();
// scroll until no hits are returned
do {
int count = 1;
for (SearchHit hit : scrollResp.getHits().getHits()) {
Map<String, Object> res = hit.getSourceAsMap();
// print results
for (Map.Entry<String, Object> entry : res.entrySet()) {
System.out.println("[" + count + "] " + entry.getKey() + " --> " + entry.getValue());
}
count++;
}
scrollResp =
client
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(60000))
.execute()
.actionGet();
// zero hits mark the end of the scroll and the while loop.
} while (scrollResp.getHits().getHits().length != 0);
}
}