minecraft-src/net/minecraft/util/thread/ParallelMapTransform.java
2025-07-04 03:45:38 +03:00

207 lines
6.8 KiB
Java

package net.minecraft.util.thread;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import net.minecraft.Util;
import net.minecraft.util.Mth;
import org.jetbrains.annotations.Nullable;
public class ParallelMapTransform {
private static final int DEFAULT_TASKS_PER_THREAD = 16;
public static <K, U, V> CompletableFuture<Map<K, V>> schedule(Map<K, U> inputs, BiFunction<K, U, V> operation, int maxTasksPerBatch, Executor executor) {
int i = inputs.size();
if (i == 0) {
return CompletableFuture.completedFuture(Map.of());
} else if (i == 1) {
Entry<K, U> entry = (Entry<K, U>)inputs.entrySet().iterator().next();
K object = (K)entry.getKey();
U object2 = (U)entry.getValue();
return CompletableFuture.supplyAsync(() -> {
V object3 = (V)operation.apply(object, object2);
return object3 != null ? Map.of(object, object3) : Map.of();
}, executor);
} else {
ParallelMapTransform.SplitterBase<K, U, V> splitterBase = (ParallelMapTransform.SplitterBase<K, U, V>)(i <= maxTasksPerBatch
? new ParallelMapTransform.SingleTaskSplitter<>(operation, i)
: new ParallelMapTransform.BatchedTaskSplitter<>(operation, i, maxTasksPerBatch));
return splitterBase.scheduleTasks(inputs, executor);
}
}
public static <K, U, V> CompletableFuture<Map<K, V>> schedule(Map<K, U> inputs, BiFunction<K, U, V> operation, Executor executor) {
int i = Util.maxAllowedExecutorThreads() * 16;
return schedule(inputs, operation, i, executor);
}
static class BatchedTaskSplitter<K, U, V> extends ParallelMapTransform.SplitterBase<K, U, V> {
private final Map<K, V> result;
private final int batchSize;
private final int firstUndersizedBatchIndex;
BatchedTaskSplitter(BiFunction<K, U, V> biFunction, int i, int j) {
super(biFunction, i, j);
this.result = new HashMap(i);
this.batchSize = Mth.positiveCeilDiv(i, j);
int k = this.batchSize * j;
int l = k - i;
this.firstUndersizedBatchIndex = j - l;
assert this.firstUndersizedBatchIndex > 0 && this.firstUndersizedBatchIndex <= j;
}
@Override
protected CompletableFuture<?> scheduleBatch(ParallelMapTransform.Container<K, U, V> container, int lastScheduledIndex, int currentIndex, Executor executor) {
int i = currentIndex - lastScheduledIndex;
assert i == this.batchSize || i == this.batchSize - 1;
return CompletableFuture.runAsync(createTask(this.result, lastScheduledIndex, currentIndex, container), executor);
}
@Override
protected int batchSize(int batchIndex) {
return batchIndex < this.firstUndersizedBatchIndex ? this.batchSize : this.batchSize - 1;
}
private static <K, U, V> Runnable createTask(Map<K, V> result, int lastScheduledIndex, int currentIndex, ParallelMapTransform.Container<K, U, V> container) {
return () -> {
for (int k = lastScheduledIndex; k < currentIndex; k++) {
container.applyOperation(k);
}
synchronized (result) {
for (int l = lastScheduledIndex; l < currentIndex; l++) {
container.copyOut(l, result);
}
}
};
}
@Override
protected CompletableFuture<Map<K, V>> scheduleFinalOperation(CompletableFuture<?> future, ParallelMapTransform.Container<K, U, V> container) {
Map<K, V> map = this.result;
return future.thenApply(object -> map);
}
}
record Container<K, U, V>(BiFunction<K, U, V> operation, Object[] keys, Object[] values) {
public Container(BiFunction<K, U, V> operation, int size) {
this(operation, new Object[size], new Object[size]);
}
public void put(int index, K key, U value) {
this.keys[index] = key;
this.values[index] = value;
}
@Nullable
private K key(int index) {
return (K)this.keys[index];
}
@Nullable
private V output(int index) {
return (V)this.values[index];
}
@Nullable
private U input(int index) {
return (U)this.values[index];
}
public void applyOperation(int index) {
this.values[index] = this.operation.apply(this.key(index), this.input(index));
}
public void copyOut(int index, Map<K, V> outputMap) {
V object = this.output(index);
if (object != null) {
K object2 = this.key(index);
outputMap.put(object2, object);
}
}
public int size() {
return this.keys.length;
}
}
static class SingleTaskSplitter<K, U, V> extends ParallelMapTransform.SplitterBase<K, U, V> {
SingleTaskSplitter(BiFunction<K, U, V> operation, int size) {
super(operation, size, size);
}
@Override
protected int batchSize(int batchIndex) {
return 1;
}
@Override
protected CompletableFuture<?> scheduleBatch(ParallelMapTransform.Container<K, U, V> container, int lastScheduledIndex, int currentIndex, Executor executor) {
assert lastScheduledIndex + 1 == currentIndex;
return CompletableFuture.runAsync(() -> container.applyOperation(lastScheduledIndex), executor);
}
@Override
protected CompletableFuture<Map<K, V>> scheduleFinalOperation(CompletableFuture<?> future, ParallelMapTransform.Container<K, U, V> container) {
return future.thenApply(object -> {
Map<K, V> map = new HashMap(container.size());
for (int i = 0; i < container.size(); i++) {
container.copyOut(i, map);
}
return map;
});
}
}
abstract static class SplitterBase<K, U, V> {
private int lastScheduledIndex;
private int currentIndex;
private final CompletableFuture<?>[] tasks;
private int batchIndex;
private final ParallelMapTransform.Container<K, U, V> container;
SplitterBase(BiFunction<K, U, V> operation, int containerSize, int numBatches) {
this.container = new ParallelMapTransform.Container<>(operation, containerSize);
this.tasks = new CompletableFuture[numBatches];
}
private int pendingBatchSize() {
return this.currentIndex - this.lastScheduledIndex;
}
public CompletableFuture<Map<K, V>> scheduleTasks(Map<K, U> inputs, Executor executor) {
inputs.forEach((object, object2) -> {
this.container.put(this.currentIndex++, (K)object, (U)object2);
if (this.pendingBatchSize() == this.batchSize(this.batchIndex)) {
this.tasks[this.batchIndex++] = this.scheduleBatch(this.container, this.lastScheduledIndex, this.currentIndex, executor);
this.lastScheduledIndex = this.currentIndex;
}
});
assert this.currentIndex == this.container.size();
assert this.lastScheduledIndex == this.currentIndex;
assert this.batchIndex == this.tasks.length;
return this.scheduleFinalOperation(CompletableFuture.allOf(this.tasks), this.container);
}
protected abstract int batchSize(int batchIndex);
protected abstract CompletableFuture<?> scheduleBatch(
ParallelMapTransform.Container<K, U, V> container, int lastScheduledIndex, int currentIndex, Executor executor
);
protected abstract CompletableFuture<Map<K, V>> scheduleFinalOperation(CompletableFuture<?> future, ParallelMapTransform.Container<K, U, V> container);
}
}