加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – 如何使用camel将大文件发送到ActiveMQ

发布时间:2020-12-15 01:08:19 所属栏目:Java 来源:网络整理
导读:我正在尝试将一个X?-GB大文件作为流发送到ActiveMQ队列进行处理. 我知道ActiveMQ supports streams,camel-jms也是如此,但我尝试在队列上设置的任何内容似乎都没有任何区别.唯一改变的是关闭流缓存结果是一个“流关闭”异常. 我愿意使用处理器或自定义类(只

我正在尝试将一个X?-GB大文件作为流发送到ActiveMQ队列进行处理.

我知道ActiveMQ supports streams,camel-jms也是如此,但我尝试在队列上设置的任何内容似乎都没有任何区别.唯一改变的是关闭流缓存结果是一个“流关闭”异常.

我愿意使用处理器或自定义类(只要资源得到清理),但这应该可以从蓝图中获得.如何通过camel-activemq正确处理大文件而不会出现OutOfMemoryError?

运用

> servicemix-7.0.0
>骆驼-2.16.4
> activemq-5.14.3

这是我的骆驼蓝图

这是我不断得到的错误

2017-10-17 08:46:53,859 | ERROR |  - RecipientList | DefaultErrorHandler              | 43 - org.apache.camel.camel-core - 2.16.4 | Failed delivery for (MessageId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-4 on ExchangeId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[file_route        ] [file_route        ] [file://FileUploadBin?delete=false&moveFailed=.error                           ] [      3764]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-DESKTOP-H2O66PO-62468-1508242908251-4-5
        ExchangePattern     InOnly
        Headers             {breadcrumbId=ID-DESKTOP-H2O66PO-62468-1508242908251-4-4,fileName=Die.txt}
        BodyType            org.apache.camel.converter.stream.FileInputStreamCache
        Body                [Body is instance of org.apache.camel.StreamCache]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:610)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:137)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)[40:org.apache.camel.camel-blueprint:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)[43:org.apache.camel.camel-core:2.16.4]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
Caused by: org.apache.camel.RuntimeCamelException: java.lang.OutOfMemoryError: Java heap space
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1652)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1247)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
        ... 24 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)[:1.8.0_121]
        at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)[:1.8.0_121]
        at sun.nio.ch.FileChannelImpl.transferToArbitraryChannel(FileChannelImpl.java:567)[:1.8.0_121]
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:616)[:1.8.0_121]
        at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:108)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)[43:org.apache.camel.camel-core:2.16.4]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_121]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_121]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_121]
        at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_121]
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1243)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)
        at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
2017-10-17 08:46:57,641 | WARN  | ://FileUploadBin | GenericFileOnCompletion          | 43 - org.apache.camel.camel-core - 2.16.4 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@294a3d48 for file: GenericFile[Die.txt]
最佳答案
经典的ActiveMQ代理不支持这种方式.

然而,下一代ActiveMQ Artemis支持大型消息,我们刚刚在camel-jms中添加了对此的支持.我写了一篇博客文章:http://www.davsclaus.com/2017/10/working-with-large-messages-using.html

我们还在camel-jms中添加了对javax.jms.StreamMessage类型的支持.但是,此API不适用于大型邮件,因此使用受限.但是你可以在Camel 2.21以后使用新选项streamMessageTypeEnabled在组件上打开它,然后如果消息体是流类型,那么Camel将作为StreamMessage而不是BytesMessage发送.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读