当前位置:   article > 正文

Flink elasticsearch-sink by http and https_elasticsearch7sinkbuilder

elasticsearch7sinkbuilder

官网上有关于http的例子:

  1. DataStream<String> input = ...;
  2. List<HttpHost> httpHosts = new ArrayList<>();
  3. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
  4. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
  5. // use a ElasticsearchSink.Builder to create an ElasticsearchSink
  6. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  7. httpHosts,
  8. new ElasticsearchSinkFunction<String>() {
  9. public IndexRequest createIndexRequest(String element) {
  10. Map<String, String> json = new HashMap<>();
  11. json.put("data", element);
  12. return Requests.indexRequest()
  13. .index("my-index")
  14. .type("my-type")
  15. .source(json);
  16. }
  17. @Override
  18. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  19. indexer.add(createIndexRequest(element));
  20. }
  21. }
  22. );

     如果只是这样的话,写入elasticsearch时报错,将会导致flink不断的重启重试写入,原因是没有添加一旦错误写入elasticsearch的处理方式。 

     添加错误处理方式:

  1. esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
  2. @Override
  3. public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
  4. logger.warn(failure.getMessage());
  5. logger.warn("write message to es failed and drop the message ");
  6. }
  7. });

  如果在http基础上添加了用户名和密码:

  1. final CredentialsProvider credentialsProvider = new SerializableCredentialsProvider();
  2. credentialsProvider.setCredentials(AuthScope.ANY,
  3. new UsernamePasswordCredentials(esUsername, esPassword));
  4. esSinkBuilder.setRestClientFactory(
  5. restClientBuilder -> {
  6. restClientBuilder.setHttpClientConfigCallback(
  7. httpAsyncClientBuilder -> {
  8. return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  9. }
  10. );
  11. })

那如果是https呢?

  1. List<HttpHost> httpHosts = new ArrayList<>();
  2. httpHosts.add(new HttpHost("127.0.0.1", 9200, "https"));
  3. httpHosts.add(new HttpHost("10.2.3.1", 9200, "https"));
  4. // use a ElasticsearchSink.Builder to create an ElasticsearchSink
  5. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  6. httpHosts,
  7. new ElasticsearchSinkFunction<String>() {
  8. public IndexRequest createIndexRequest(String element) {
  9. Map<String, String> json = new HashMap<>();
  10. json.put("data", element);
  11. return Requests.indexRequest()
  12. .index("my-index")
  13. .type("my-type")
  14. .source(json);
  15. }
  16. @Override
  17. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  18. indexer.add(createIndexRequest(element));
  19. }
  20. }
  21. );
  22. esSinkBuilder.setRestClientFactory(
  23. restClientBuilder -> {
  24. restClientBuilder.setHttpClientConfigCallback(
  25. httpAsyncClientBuilder -> {
  26. httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  27. return httpAsyncClientBuilder.setConnectionManager(SslUtil.getConnectionManager());
  28. }
  29. );
  30. })
  1. import java.io.Serializable;
  2. import java.security.cert.CertificateException;
  3. import java.security.cert.X509Certificate;
  4. import javax.net.ssl.HostnameVerifier;
  5. import javax.net.ssl.SSLContext;
  6. import javax.net.ssl.SSLSession;
  7. import org.apache.http.config.Registry;
  8. import org.apache.http.config.RegistryBuilder;
  9. import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
  10. import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
  11. import org.apache.http.nio.conn.SchemeIOSessionStrategy;
  12. import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
  13. import org.apache.http.ssl.SSLContextBuilder;
  14. import org.apache.http.ssl.SSLContexts;
  15. import org.apache.http.ssl.TrustStrategy;
  16. public class SslUtil implements Serializable {
  17. public static PoolingNHttpClientConnectionManager getConnectionManager() throws Exception {
  18. SSLContextBuilder builder = SSLContexts.custom();
  19. builder.loadTrustMaterial(null, new TrustStrategy() {
  20. @Override
  21. public boolean isTrusted(X509Certificate[] chain, String authType)
  22. throws CertificateException {
  23. return true;
  24. }
  25. });
  26. SSLContext sslContext = builder.build();
  27. SchemeIOSessionStrategy sslioSessionStrategy = new SSLIOSessionStrategy(sslContext,
  28. new HostnameVerifier() {
  29. @Override
  30. public boolean verify(String hostname, SSLSession session) {
  31. return true;
  32. }
  33. });
  34. Registry<SchemeIOSessionStrategy> sslioSessionRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
  35. .register("https", sslioSessionStrategy).build();
  36. PoolingNHttpClientConnectionManager ncm =
  37. new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(), sslioSessionRegistry);
  38. return ncm;
  39. }
  40. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/822493
推荐阅读
相关标签
  

闽ICP备14008679号