加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – Reader#lines()由于其拼接器中的不可配置的批量大小策略

发布时间:2020-12-14 17:41:23 所属栏目:Java 来源:网络整理
导读:当流源是Reader时,我无法实现流处理的良好并行化.在四核CPU上运行下面的代码我观察到3个内核首先被使用,然后突然下降到两个,然后是一个核心.整体CPU利用率在50%左右. 请注意以下示例的特点: 只有6000行; 每行约20ms进行处理; 整个过程大约需要一分钟. 这意
当流源是Reader时,我无法实现流处理的良好并行化.在四核CPU上运行下面的代码我观察到3个内核首先被使用,然后突然下降到两个,然后是一个核心.整体CPU利用率在50%左右.

请注意以下示例的特点:

>只有6000行;
>每行约20ms进行处理;
>整个过程大约需要一分钟.

这意味着所有的压力都在CPU上,I / O很小.这个例子是一个自动并行化的坐式鸭子.

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...    

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createInput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       CPU time: %.2f sn",cpuTime/SECONDS.toNanos(1));
    System.out.format("      Real time: %.2f sn",realTime/SECONDS.toNanos(1));
    System.out.format("CPU utilization: %.2f%%",100.0*cpuTime/realTime/cores);
  }

  private static String processLine(String line) {
    final long localStart = System.nanoTime();
    double ret = 0;
    for (int i = 0; i < line.length(); i++)
      for (int j = 0; j < line.length(); j++)
        ret += Math.pow(line.charAt(i),line.charAt(j)/32.0);
    final long took = System.nanoTime()-localStart;
    totalTime.getAndAdd(took);
    return NANOSECONDS.toMillis(took) + " " + ret;
  }

  private static Path createInput() throws IOException {
    final Path inputPath = Paths.get("input.txt");
    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
      for (int i = 0; i < 6_000; i++) {
        final String text = String.valueOf(System.nanoTime());
        for (int j = 0; j < 25; j++) w.print(text);
        w.println();
      }
    }
    return inputPath;
  }
}

我的典型输出:

Cores: 4
       CPU time: 110.23 s
      Real time: 53.60 s
CPU utilization: 51.41%

为了比较,如果我使用一个稍微修改的变体,我首先收集到列表中,然后处理:

Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
  .forEach(w::println);

我得到这个典型的输出:

Cores: 4
       CPU time: 138.43 s
      Real time: 35.00 s
CPU utilization: 98.87%

什么可以解释这个效果,如何解决这个问题呢?

注意,我最初在servlet输入流的读者上观察到这一点,因此它不是FileReader的特定的.

解决方法

这里是答案,在Spliterators.IteratorSpliterator的源代码中,BufferedReader#lines()使用的代码:
@Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations,across combinations of #elements vs #cores,* whether or not either are known.  We generate
         * O(sqrt(#elements)) splits,allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a,j,characteristics);
        }
        return null;
    }

也值得注意的是常数:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

所以在我的例子中,我使用6,000个元素,因为批量大小为1024,所以我只需要批量批次.这正好解释了我的观察结果:最初使用三个内核,当两个小批次完成时,它们都会被丢弃.在此期间,我尝试了一个具有6万个元素的修改示例,然后我得到几乎100%的CPU利用率.

为了解决我的问题,我已经开发了下面的代码,它允许我将任何现有流转换成一个Spliterator#trySplit将其分割成指定大小的批次.从我的问题使用它的最简单的方法是这样的:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(),20)

在较低级别上,下面的类是一个Spliterator包装器,它改变了包装的spliterator的trySplit行为,并保留其他方面不变.

import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,long est,int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,int batchSize) {
    this(toWrap,toWrap.estimateSize(),batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in,int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(),batchSize),true);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a,characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读