文章目錄
- 依賴
- 初始化客戶端
- 發起請求
- 請求參數
- 請求頭
- 設置超時時間
- 設置線程數
- 設置用戶名密碼
- 結果解析
- 節點選擇器
- 配置嗅探器
- 整體示例
- 問題
- 參考
OpenSearch開發環境安裝Docker和Docker-Compose兩種方式
依賴
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.13.4</version> <!-- 建議就是這個版本 -->
</dependency>
<!-- 或者 -->
<dependency><groupId>org.opensearch.client</groupId><artifactId>opensearch-java</artifactId><version>2.8.1</version>
</dependency>
初始化客戶端
// 構建客戶端
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9201, "http")).build();
發起請求
- performRequest: 是同步請求方法: 將阻塞調用線程,并在請求成功時返回響應,或在請求失敗時引發異常
- performRequestAsync: 是異步方法:接收一個ResponseListener對象作為參數。如果請求成功,則該參數使用響應進行調用;如果請求失敗,則使用異常進行調用
// 同步請求Request request = new Request("GET","/posts/_search");Response response = restClient.performRequest(request); // 執行同步請求response.toString();// 異步請求Request request = new Request("GET", "/posts/_search");restClient.performRequestAsync(request, new ResponseListener() {@Overridepublic void onSuccess(Response response) {log.info("異步請求成功!" + response.toString());}@Overridepublic void onFailure(Exception e) {log.error("異步請求失敗!");e.printStackTrace();}});
請求參數
// 第一種
request.addParameter("pretty","true");
// 第二種
request.setEntity(new NStringEntity("{\"json\":\"text\"}",ContentType.APPLICATION_JSON));
// 第三種
request.setJsonEntity("{\"json\":\"text\"}");
請求頭
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();builder.addHeader("Authorization", "Bearer " + "my-token");builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));COMMON_OPTIONS = builder.build();
Request request = new Request("GET", "/");
request.setOptions(COMMON_OPTIONS);
設置超時時間
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {return builder.setConnectTimeout(50000) // 連接超時默認1s .setSocketTimeout(10000); // 套接字超時默認30s.setConnectionRequestTimeout(10000);}});
設置線程數
Apache HTTP異常客戶端默認啟動一個調度程序線程,連接管理器使用多個工作線程。
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {return httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadNumber).build());}});
設置用戶名密碼
// 創建憑證提供程序,設置用戶名和密碼
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));// 使用 RestClient 構建器連接到 OpenSearch
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {// 配置連接超時,連接建立后兩個節點之間數據傳輸的套接字超時和連接請求超時// 連接超時:客戶端和服務器建立連接的最長時間RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000) // 連接超時為5秒.setSocketTimeout(10000) // 套接字超時為10秒.setConnectionRequestTimeout(10000); // 連接請求超時為10秒httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());// 設置憑證提供程序httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();
結果解析
Response response = restClient.performRequest(new Request("GET", "/"));// 已執行請求的信息RequestLine requestLine = response.getRequestLine();// Host返回的信息HttpHost httpHost = response.getHost();// 響應狀態行,從中解析狀態代碼int statusCode = response.getStatusLine().getStatusCode();// 響應頭,可以通過getHeader(string)按名稱獲取Header[] headers = response.getHeaders();String responseBody = EntityUtils.toString(response.getEntity());
節點選擇器
在默認情況下,客戶端以輪詢的方式將每個請求發送到配置的各個節點中
ES允許用戶自由選擇要連接的節點,通過初始化客戶端來配置節點選擇器,以便篩選節點。該功能在啟用嗅探器時可以用來防止HTTP請求只命中專用的主節點。
配置后,對于每個請求,客戶端都通過節點選擇器來篩選備選節點。
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost",9200,"http"));builder.setNodeSelector(new NodeSelector(){@Overridepublic void select(Iterable<Node> nodes){boolean foundOne = false;for(Node node : nodes){String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId)){foundOne = true;break;}}if(foundOne){Iterator<Node> nodesIt = nodes.iterator();while(nodesIt.hasNext()){Node node = nodesIt.next();String rackId = node.getAttributes().get("rack_id").get(0);if("targetId".equals(rackId) == false){nodesIt.remove();}}}}
配置嗅探器
嗅探器允許自動發現運行中ES集群中的節點,并將其設置為現有的RestClient實例
默認i情況下,嗅探器使用nodes info API檢索屬于集群的節點并采用jackson解析獲得JSON響應
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client-sniffer</artifactId><version>${elasticsearch.version}</version></dependency>
創建RestClient實例就可以采用嗅探器與其互聯。嗅探器利用RestClient提供的定期機制(默認定期時間為5min),從集群中獲取當前節點的列表,通過調用RestClient類中的setNodes方法來更新。
整體示例
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;import java.io.IOException;public class OpenSearchExample {public static void main(String[] args) throws IOException {// Connect to OpenSearchfinal CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("admin", "admin"));RestClient restClient = RestClient.builder(new HttpHost("10.12.23.1", 9200, "http")).setHttpClientConfigCallback(httpClientBuilder -> {RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(5000).setSocketTimeout(10000).setConnectionRequestTimeout(10000);httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);return httpClientBuilder;}).build();try {// Delete IndexdeleteIndex(restClient, "my_index");// Create IndexcreateIndex(restClient, "my_index");// Index DocumentindexDocument(restClient, "{\"index\":{\"_index\":\"my_index\",\"_id\":1}}\n{ \"field\": \"value\" }\n");// Get DocumentgetDocument(restClient, "my_index");// Delete DocumentdeleteDocument(restClient, "my_index", "1");// Delete IndexdeleteIndex(restClient, "my_index");} catch (ResponseException e) {e.printStackTrace();// Handle response exceptionSystem.err.println("Error: " + e.getResponse().getStatusLine().getReasonPhrase());} finally {// Close the clientrestClient.close();}}private static void createIndex(RestClient restClient, String index) throws IOException {// Create Index requestRequest request = new Request("PUT", "/" + index);// Execute the requestrestClient.performRequest(request);}private static void indexDocument(RestClient restClient, String s ) throws IOException {// Index Document requestRequest request = new Request("POST", "/_bulk" );request.setJsonEntity(s);// Execute the requestrestClient.performRequest(request);}private static void getDocument(RestClient restClient, String index) throws IOException {// Get Document requestRequest request = new Request("GET", "/" + index + "/_search");// Execute the requestResponse response = restClient.performRequest(request);// Handle the responseSystem.out.println("Document found: " + EntityUtils.toString(response.getEntity()));}private static void deleteDocument(RestClient restClient, String index, String id) throws IOException {// Delete Document requestRequest request = new Request("DELETE", "/" + index + "/_doc/" + id);// Execute the requestrestClient.performRequest(request);}private static void deleteIndex(RestClient restClient, String index) throws IOException {// Delete Index requestRequest request = new Request("DELETE", "/" + index);// Execute the requestrestClient.performRequest(request);}
}
問題
異常如下:
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)at java.base/sun.security.validator.Validator.validate(Validator.java:264)at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetat java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:146)at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:127)
解決方案
因為證書問題,我們用的是測試環境,就不要費勁的去下載私有證書再安裝了,直接配置opensearch支持http即可。
opensearch.yml
plugins.security.ssl.http.enabled: false
或者直接禁用安全插件。
參考
- https://www.cnblogs.com/openmind-ink/p/13951767.html