赞
踩
官网上有关于http的例子:
- DataStream<String> input = ...;
-
- List<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
- httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
-
- // use a ElasticsearchSink.Builder to create an ElasticsearchSink
- ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
- httpHosts,
- new ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }
- );

如果只是这样的话,写入elasticsearch时报错,将会导致flink不断的重启重试写入,原因是没有添加一旦错误写入elasticsearch的处理方式。
添加错误处理方式:
- esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
- @Override
- public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
- logger.warn(failure.getMessage());
- logger.warn("write message to es failed and drop the message ");
- }
- });
如果在http基础上添加了用户名和密码:
- final CredentialsProvider credentialsProvider = new SerializableCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(esUsername, esPassword));
-
-
- esSinkBuilder.setRestClientFactory(
- restClientBuilder -> {
- restClientBuilder.setHttpClientConfigCallback(
- httpAsyncClientBuilder -> {
- return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-
- }
- );
-
- })
那如果是https呢?
- List<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("127.0.0.1", 9200, "https"));
- httpHosts.add(new HttpHost("10.2.3.1", 9200, "https"));
-
- // use a ElasticsearchSink.Builder to create an ElasticsearchSink
- ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
- httpHosts,
- new ElasticsearchSinkFunction<String>() {
- public IndexRequest createIndexRequest(String element) {
- Map<String, String> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
-
- @Override
- public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
- indexer.add(createIndexRequest(element));
- }
- }
- );
-
-
- esSinkBuilder.setRestClientFactory(
- restClientBuilder -> {
- restClientBuilder.setHttpClientConfigCallback(
- httpAsyncClientBuilder -> {
- httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- return httpAsyncClientBuilder.setConnectionManager(SslUtil.getConnectionManager());
- }
- );
-
- })

-
- import java.io.Serializable;
- import java.security.cert.CertificateException;
- import java.security.cert.X509Certificate;
- import javax.net.ssl.HostnameVerifier;
- import javax.net.ssl.SSLContext;
- import javax.net.ssl.SSLSession;
-
- import org.apache.http.config.Registry;
- import org.apache.http.config.RegistryBuilder;
-
- import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
- import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
- import org.apache.http.nio.conn.SchemeIOSessionStrategy;
- import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
- import org.apache.http.ssl.SSLContextBuilder;
- import org.apache.http.ssl.SSLContexts;
- import org.apache.http.ssl.TrustStrategy;
-
-
- public class SslUtil implements Serializable {
-
- public static PoolingNHttpClientConnectionManager getConnectionManager() throws Exception {
- SSLContextBuilder builder = SSLContexts.custom();
- builder.loadTrustMaterial(null, new TrustStrategy() {
- @Override
- public boolean isTrusted(X509Certificate[] chain, String authType)
- throws CertificateException {
- return true;
- }
- });
- SSLContext sslContext = builder.build();
- SchemeIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext,
- new HostnameVerifier() {
- @Override
- public boolean verify(String hostname, SSLSession session) {
- return true;
- }
- });
- Registry<SchemeIOSessionStrategy> sslioSessionRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
- .register("https", sslioSessionStrategy).build();
- PoolingNHttpClientConnectionManager ncm =
- new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(), sslioSessionRegistry);
- return ncm;
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。