加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Elasticsearch-Hadoop库无法连接到docker容器

发布时间:2020-12-16 18:28:57 所属栏目:安全 来源:网络整理
导读:我有从Cassandra读取的spark作业,处理/转换/过滤数据,并将结果写入Elasticsearch.我使用docker进行集成测试,而且我遇到了从spark写入Elasticsearch的麻烦. 依赖关系: "joda-time" % "joda-time" % "2.9.4","javax.servlet" % "javax.servlet-api" % "3.1.0"
我有从Cassandra读取的spark作业,处理/转换/过滤数据,并将结果写入Elasticsearch.我使用docker进行集成测试,而且我遇到了从spark写入Elasticsearch的麻烦.

依赖关系:

"joda-time"              % "joda-time"          % "2.9.4","javax.servlet"          %  "javax.servlet-api" % "3.1.0","org.elasticsearch"      %  "elasticsearch"     % "2.3.2","org.scalatest"          %% "scalatest"         % "2.2.1","com.github.nscala-time" %% "nscala-time"       % "2.10.0","cascading"              %   "cascading-hadoop" % "2.6.3","cascading"              %   "cascading-local"  % "2.6.3","com.datastax.spark"     %% "spark-cassandra-connector" % "1.4.2","com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5","org.elasticsearch"      %  "elasticsearch-hadoop"      % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),"org.apache.spark"       %% "spark-catalyst"            % "1.4.0" % "provided"

在我的单元测试中,我可以使用TransportClient连接到elasticsearch来设置我的模板和索引

又名.这有效

val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
  .set("spark.cassandra.input.split.size_in_mb","67108864")
  .set("spark.cassandra.connection.host",cassandraHostString)
  .set("es.nodes",elasticsearchHostString)
  .set("es.port","9200")
  .set("http.publish_host","")
sc = new SparkContext(conf)
esClient = TransportClient.builder().build()
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString),9300))
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet()
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet()
esClient.admin().indices().prepareAliases().addAlias(esTestIndex,"hot").execute().actionGet()

但是,当我试图跑

EsSpark.saveToEs(
  myRDD,"hot/mytype",Map("es.mapping.id" -> "id","es.mapping.parent" -> "parent_id")
)

我收到这个堆栈跟踪

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我可以验证使用’docker network inspect bridge它正在尝试连接到正确的IP地址.

docker network inspect bridge
[
{
    "Name": "bridge","Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1","Scope": "local","Driver": "bridge","EnableIPv6": false,"IPAM": {
        "Driver": "default","Options": null,"Config": [
            {
                "Subnet": "172.17.0.0/16","Gateway": "172.17.0.1"
            }
        ]
    },"Internal": false,"Containers": {
        "0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": {
            "Name": "analytics_rabbitmq_1","EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4","MacAddress": "02:42:ac:11:00:04","IPv4Address": "172.17.0.4/16","IPv6Address": ""
        },"9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": {
            "Name": "analytics_elasticsearch_1","EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56","MacAddress": "02:42:ac:11:00:02","IPv4Address": "172.17.0.2/16","ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": {
            "Name": "analytics_cassandra_1","EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6","MacAddress": "02:42:ac:11:00:03","IPv4Address": "172.17.0.3/16","IPv6Address": ""
        }
    },"Options": {
        "com.docker.network.bridge.default_bridge": "true","com.docker.network.bridge.enable_icc": "true","com.docker.network.bridge.enable_ip_masquerade": "true","com.docker.network.bridge.host_binding_ipv4": "0.0.0.0","com.docker.network.bridge.name": "docker0","com.docker.network.driver.mtu": "1500"
    },"Labels": {}
}
]

我在macbook / osx上本地运行所有东西.我不知道为什么我可以使用TransportClient和浏览器连接到docker容器,但函数EsSpark.saveToES(…)总是失败.

解决方法

通过设置

.config("es.nodes.wan.only","true")

可以解决这个问题

es.nodes.ingest.only

(default false) Whether to use Elasticsearch ingest nodes only. When enabled,elasticsearch-hadoop will route all of its requests (after nodes discovery,if enabled) through the ingest nodes within the cluster. The purpose of this configuration setting is to avoid incurring the cost of forwarding data meant for a pipeline from non-ingest nodes; Really only useful when writing data to an Ingest Pipeline (see es.ingest.pipeline above).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读