加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – newFixedThreadPool与对象池无法正常工作

发布时间:2020-12-15 02:33:56 所属栏目:Java 来源:网络整理
导读:我正在努力弄清楚如何集中资源,我开始怀疑我的线程可能是问题(不是100%,而是在试验它).我要做的就是为服务器创建一个通道池,然后查看线程是否正在使用它们.我已经成功地获得了为我上传的多个项目创建的频道数量(即它没有汇集,只是在每个线程中创建新频道)并
我正在努力弄清楚如何集中资源,我开始怀疑我的线程可能是问题(不是100%,而是在试验它).我要做的就是为服务器创建一个通道池,然后查看线程是否正在使用它们.我已经成功地获得了为我上传的多个项目创建的频道数量(即它没有汇集,只是在每个线程中创建新频道)并成功创建了一个频道(即没有汇集或创建新频道)根据需要的渠道).

我想也许线程与池进行交互的方式是,所以我试图创造newCachedThreadPool从而有工作,但是当我做,我得到用作被关闭的错误说法通道中的线程不只要死的问题.我的池中有一个destroyObject方法,但我从来没有调用它,所以我不明白为什么它会被触发(如果我将它注释掉然后它可以工作但只创建一个通道并且上传速度非常慢,大约300次操作/秒与没有线程池我得到30k /秒).我怀疑它的终止,有没有办法验证这个,如果它终止,我可以使用的替代品吗?

这是代码(忽略所有rabbitmq的东西,它只是这样我可以监视结果):

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

    private static ExecutorService executor_worker;

    static {
        final int numberOfThreads_ThreadPoolExecutor = 20;
        executor_worker = Executors.newCachedThreadPool();
        executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor,numberOfThreads_ThreadPoolExecutor,1000,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>());
    }

    private static ObjectPool<Channel> pool;

    public static void main(String[] args) throws Exception {
        System.out.println("starting..");           
        ObjectPool<Channel> pool =
                new GenericObjectPool<Channel>(
                new ConnectionPoolableObjectFactory(),50);
        for (int x = 0; x<500000000; x++) {
            executor_worker.submit(new MyRunnable(x,pool));
        }
        //executor_worker.shutdown();
        //pool.close();
    }
}

 class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
     Channel channel;
     Connection connection;

    public ConnectionPoolableObjectFactory() throws IOException {
        System.out.println("hello world");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        channel = connection.createChannel(); 
    }

    @Override
    public Channel makeObject() throws Exception {  
        //channel = connection.createChannel(); 
        return channel; 
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

class MyRunnable implements Runnable{  
    protected int x = 0;
    protected ObjectPool<Channel> pool;

    public MyRunnable(int x,ObjectPool<Channel> pool) {
        // TODO Auto-generated constructor stub
        this.x = x;
        this.pool = pool;
    }

    public void run(){
        try {
                Channel channel = pool.borrowObject();
                String message = Integer.toString(x);
                channel.basicPublish( "","task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
                pool.returnObject(channel);
        } catch (NoSuchElementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

附:我基本上问了几个问题并阅读了文档并尝试解决这个问题,并且我可能完全走错了方向,所以如果你看到任何问题或提示,请按我的方式发送.

情节变浓:

在main方法的for循环中(我将工作提交给线程)我添加了:

Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
    System.out.println(threadSet.size()); //number of threads
    System.out.println(pool.getNumActive());

它向我显示了25个线程(虽然我说20个)和池中的20个项目.但是当我查看rabbitmq UI时,我看到一个只有一个通道的连接.如果我创建频道并提交给runnable,那么它会创建许多频道(但它永远不会关闭它们).我不明白发生了什么,为什么结果不如预期.

解决方法

我认为问题是你的ConnectionPoolableObjectFactory只创建一个Channel对象.似乎每次调用makeObject时都应该创建一个新的Channel.

所以也许应该实现这样的事情:

public class ConnectionPoolableObjectFactory
        extends BasePoolableObjectFactory<Channel> {

    private final Connection connection;

    private ConnectionPoolableObjectFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
    }

    @Override
    public Channel makeObject() throws Exception {
        return connection.createChannel();
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

这假设每个工厂从单个连接创建多个通道.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读