加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

码头工人 – 由于Luigi工作分布不均而导致工人死亡(2.6.1)

发布时间:2020-12-16 03:27:08 所属栏目:安全 来源:网络整理
导读:我们正在尝试运行分布在docker swarm集群上的简单管道. luigi worker被部署为复制的docker服务.他们成功启动,在向luigi-server请求工作几秒钟之后,由于没有为他们分配工作而他们开始死亡,所有任务最终都分配给一个工作人员. 我们不得不在我们工人的luigi.cfg

我们正在尝试运行分布在docker swarm集群上的简单管道. luigi worker被部署为复制的docker服务.他们成功启动,在向luigi-server请求工作几秒钟之后,由于没有为他们分配工作而他们开始死亡,所有任务最终都分配给一个工作人员.

我们不得不在我们工人的luigi.cfg中设置keep_alive = True来强迫他们不要死,但是在管道完成后让工人保持身边似乎是一个坏主意.
有没有办法控制工作分配?

我们的测试管道:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i,self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')


class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)

    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd,stderr=subprocess.STDOUT,shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))


if __name__ == "__main__":
    luigi.run()
最佳答案
您的问题是一次产生一个要求的结果,而您希望立即产生所有这些要求,如下所示:

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i,self.sleep_time))
    yield reqs

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读