package net.minecraft.util.thread; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.BiFunction; import net.minecraft.Util; import net.minecraft.util.Mth; import org.jetbrains.annotations.Nullable; public class ParallelMapTransform { private static final int DEFAULT_TASKS_PER_THREAD = 16; public static CompletableFuture> schedule(Map inputs, BiFunction operation, int maxTasksPerBatch, Executor executor) { int i = inputs.size(); if (i == 0) { return CompletableFuture.completedFuture(Map.of()); } else if (i == 1) { Entry entry = (Entry)inputs.entrySet().iterator().next(); K object = (K)entry.getKey(); U object2 = (U)entry.getValue(); return CompletableFuture.supplyAsync(() -> { V object3 = (V)operation.apply(object, object2); return object3 != null ? Map.of(object, object3) : Map.of(); }, executor); } else { ParallelMapTransform.SplitterBase splitterBase = (ParallelMapTransform.SplitterBase)(i <= maxTasksPerBatch ? new ParallelMapTransform.SingleTaskSplitter<>(operation, i) : new ParallelMapTransform.BatchedTaskSplitter<>(operation, i, maxTasksPerBatch)); return splitterBase.scheduleTasks(inputs, executor); } } public static CompletableFuture> schedule(Map inputs, BiFunction operation, Executor executor) { int i = Util.maxAllowedExecutorThreads() * 16; return schedule(inputs, operation, i, executor); } static class BatchedTaskSplitter extends ParallelMapTransform.SplitterBase { private final Map result; private final int batchSize; private final int firstUndersizedBatchIndex; BatchedTaskSplitter(BiFunction biFunction, int i, int j) { super(biFunction, i, j); this.result = new HashMap(i); this.batchSize = Mth.positiveCeilDiv(i, j); int k = this.batchSize * j; int l = k - i; this.firstUndersizedBatchIndex = j - l; assert this.firstUndersizedBatchIndex > 0 && this.firstUndersizedBatchIndex <= j; } @Override protected CompletableFuture scheduleBatch(ParallelMapTransform.Container container, int lastScheduledIndex, int currentIndex, Executor executor) { int i = currentIndex - lastScheduledIndex; assert i == this.batchSize || i == this.batchSize - 1; return CompletableFuture.runAsync(createTask(this.result, lastScheduledIndex, currentIndex, container), executor); } @Override protected int batchSize(int batchIndex) { return batchIndex < this.firstUndersizedBatchIndex ? this.batchSize : this.batchSize - 1; } private static Runnable createTask(Map result, int lastScheduledIndex, int currentIndex, ParallelMapTransform.Container container) { return () -> { for (int k = lastScheduledIndex; k < currentIndex; k++) { container.applyOperation(k); } synchronized (result) { for (int l = lastScheduledIndex; l < currentIndex; l++) { container.copyOut(l, result); } } }; } @Override protected CompletableFuture> scheduleFinalOperation(CompletableFuture future, ParallelMapTransform.Container container) { Map map = this.result; return future.thenApply(object -> map); } } record Container(BiFunction operation, Object[] keys, Object[] values) { public Container(BiFunction operation, int size) { this(operation, new Object[size], new Object[size]); } public void put(int index, K key, U value) { this.keys[index] = key; this.values[index] = value; } @Nullable private K key(int index) { return (K)this.keys[index]; } @Nullable private V output(int index) { return (V)this.values[index]; } @Nullable private U input(int index) { return (U)this.values[index]; } public void applyOperation(int index) { this.values[index] = this.operation.apply(this.key(index), this.input(index)); } public void copyOut(int index, Map outputMap) { V object = this.output(index); if (object != null) { K object2 = this.key(index); outputMap.put(object2, object); } } public int size() { return this.keys.length; } } static class SingleTaskSplitter extends ParallelMapTransform.SplitterBase { SingleTaskSplitter(BiFunction operation, int size) { super(operation, size, size); } @Override protected int batchSize(int batchIndex) { return 1; } @Override protected CompletableFuture scheduleBatch(ParallelMapTransform.Container container, int lastScheduledIndex, int currentIndex, Executor executor) { assert lastScheduledIndex + 1 == currentIndex; return CompletableFuture.runAsync(() -> container.applyOperation(lastScheduledIndex), executor); } @Override protected CompletableFuture> scheduleFinalOperation(CompletableFuture future, ParallelMapTransform.Container container) { return future.thenApply(object -> { Map map = new HashMap(container.size()); for (int i = 0; i < container.size(); i++) { container.copyOut(i, map); } return map; }); } } abstract static class SplitterBase { private int lastScheduledIndex; private int currentIndex; private final CompletableFuture[] tasks; private int batchIndex; private final ParallelMapTransform.Container container; SplitterBase(BiFunction operation, int containerSize, int numBatches) { this.container = new ParallelMapTransform.Container<>(operation, containerSize); this.tasks = new CompletableFuture[numBatches]; } private int pendingBatchSize() { return this.currentIndex - this.lastScheduledIndex; } public CompletableFuture> scheduleTasks(Map inputs, Executor executor) { inputs.forEach((object, object2) -> { this.container.put(this.currentIndex++, (K)object, (U)object2); if (this.pendingBatchSize() == this.batchSize(this.batchIndex)) { this.tasks[this.batchIndex++] = this.scheduleBatch(this.container, this.lastScheduledIndex, this.currentIndex, executor); this.lastScheduledIndex = this.currentIndex; } }); assert this.currentIndex == this.container.size(); assert this.lastScheduledIndex == this.currentIndex; assert this.batchIndex == this.tasks.length; return this.scheduleFinalOperation(CompletableFuture.allOf(this.tasks), this.container); } protected abstract int batchSize(int batchIndex); protected abstract CompletableFuture scheduleBatch( ParallelMapTransform.Container container, int lastScheduledIndex, int currentIndex, Executor executor ); protected abstract CompletableFuture> scheduleFinalOperation(CompletableFuture future, ParallelMapTransform.Container container); } }