加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – 缓存流处理大文件

发布时间:2020-12-15 02:14:11 所属栏目:Java 来源:网络整理
导读:我正在研究一个主题“使用Apache Camel和如何处理大文件”. 目的是使用camel处理大文件而不将文件加载到内存中,因为它是一个超过5个GO的大文件. 我们找到了几个轨道,第一个轨道是使用分离器组件,允许我们逐行或逐块读取文件,但是如果我们使用分离器,我们无法
我正在研究一个主题“使用Apache Camel和如何处理大文件”.

目的是使用camel处理大文件而不将文件加载到内存中,因为它是一个超过5个GO的大文件.

我们找到了几个轨道,第一个轨道是使用分离器组件,允许我们逐行或逐块读取文件,但是如果我们使用分离器,我们无法再从头开始读取文件,功能需求是即使分割完成也能够读取文件的某些部分.

因此,我们必须使用缓存系统,将块放入缓存中以重用它们.

所以我们认为必须使用类CachedOutputStream在分割器之后在磁盘上写入文件的某些部分,这个类还提供了加密磁盘上数据的能力.

示例如下:

<camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" streamCache="true">

    <streamCaching id="myCacheConfig"  spoolDirectory="target/cachedir" spoolThreshold="16"/>

    <route id="SPLIT-FLOW" streamCache="true">
        <from uri="file:src/data/forSplitCachingSimpleRecord?noop=true"/>
        <split streaming="true">
            <tokenize token="n"/>
            <to uri="direct:PROCESS-BUSINESS"/>
        </split>
    </route>

    <route id="PROCESS-BUSINESS" streamCache="true">
        <from uri="direct:PROCESS-BUSINESS"/>
        <bean ref="ProcessBusiness" method="dealRecord"/>
        <choice>
            <when>
                <simple>${in.header.CamelSplitComplete} == "true"</simple>
                <to uri="direct:STREAM-CACHING"/>
            </when>
        </choice>
    </route>

    <route id="STREAM-CACHING">
        <from uri="direct:STREAM-CACHING"/>
        <bean ref="ProcessStreamCaching" method="usingStream"/>
        <setHeader headerName="CamelFileName">
            <simple>${header.CamelFileName}.${header.CamelSplitIndex}</simple>
        </setHeader>
        <to uri="file:src/out"/>
    </route>

</camelContext>

方法dealRecord将每行拆分为一个缓存:

public void dealRecord(Exchange exchange) throws Exception { 

   String body; 
   File file; 
   String[] files; 
   boolean isSplitComplete; 

   body = (String) exchange.getIn().getBody(); 
   isSplitComplete = (boolean) exchange.getProperties().get("CamelSplitComplete"); 

   CachedOutputStream cos = new CachedOutputStream(exchange,false); 
   cos.write(body.getBytes("UTF-8")); 

   file = new File("target/cachedir"); 
   files = file.list(); 
   for (String nameTmpfile : files) { 
      LOG.info("Genered File [" + nameTmpfile + "]"); 
   } 

   lstCache.add(cos); 

   if(isSplitComplete){ 
      exchange.getIn().setHeader("Cached",lstCache); 
   } 
}

usingStream方法可以使用标头中存在的每个缓存

public byte[] usingStream(Exchange exchange) throws InputStreamException { 

   final ArrayList<CachedOutputStream> lstcache; 
   byte[] bytesMessage; 
   StringBuilder messageCompleteOut = new StringBuilder(); 
   InputStream is = null; 

   lstcache = (ArrayList<CachedOutputStream>) exchange.getIn().getHeader("Cached"); 
   for (CachedOutputStream oneCache : lstcache) { 
      try { 
         is = oneCache.getWrappedInputStream(); 
         String messageInputstream = toString(is); 
         LOG.info("Message of Cache ["+ messageInputstream +"]"); 
         messageCompleteOut.append(messageInputstream); 
         messageCompleteOut.append(System.lineSeparator()); 
      } catch (IOException e) { 
         LOG.error(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL); 
         throw new InputStreamException(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL,e); 
      } 
      // On ferme le flux 
      IOHelper.close(is); 
   } 
   bytesMessage = messageCompleteOut.toString().getBytes(Charset.forName("UTF-8")); 
   return bytesMessage; 
}

这个解决方案好吗?或者有更好的方法?

thxs

解决方法

GenericFileMessage(文件组件使用的消息实现)不会将文件内容加载到所需的内存中.所以实际上你只需要确保你不会以强制转换它的方式访问正文.您还可以编写自己的消息(继承自GenericFileMessage)并阻止此类转换,或返回不同的内容(某种“摘要”).

沿途的处理器可以在文件系统中获取文件的位置(来自消息头)并直接打开它,可能用其他消息替换文件消息.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读