Skip to content

Commit

Permalink
#16 Replace Dataset#getPartitioning() with Dataset#getNumPartitions()
Browse files Browse the repository at this point in the history
  • Loading branch information
Novotnik, Petr authored and mareksimunek committed Jul 9, 2018
1 parent 7fe10e2 commit c4b12da
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ public interface Dataset<T> extends Serializable {
Collection<Operator<?, ?>> getConsumers();

/**
* Retrieve partitioning for this dataset.
* The dataset might be partitioned by some other type
* (using some extraction function).
* Determines the parallelism of this data set - if known. Typically,
* a data set is split into multiple partitions which can be processed
* in parallel.
*
* @return {@code < 0} if the partition count is unknown, otherwise the
* count of partitions of this dataset (which can potentially
* be processed in parallel)
*/
<X> Partitioning<X> getPartitioning();

int getNumPartitions();

/**
* @return {@code true} if this is a bounded data set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,13 @@ public class Datasets {
public static <IN, OUT> Dataset<OUT> createOutputFor(
Flow flow, Dataset<IN> input, Operator<IN, OUT> op) {

return new OutputDataset<OUT>(flow, (Operator) op, input.isBounded()) {
return new OutputDataset<OUT>(flow, op, input.isBounded()) {
@Override
@SuppressWarnings("unchecked")
public <X> Partitioning<X> getPartitioning() {
if (op instanceof PartitioningAware) {
// only partitioning aware operators change the partitioning
PartitioningAware<IN> pa = (PartitioningAware<IN>) op;
return (Partitioning<X>) pa.getPartitioning();
}
return input.getPartitioning();
public int getNumPartitions() {
// only partitioning aware operators can change the partition count
return (op instanceof PartitioningAware)
? ((PartitioningAware) op).getPartitioning().getNumPartitions()
: input.getNumPartitions();
}
};
}
Expand All @@ -70,13 +67,8 @@ public static <T> Dataset<T> createInputFromSource(

return new InputDataset<T>(flow, source, source.isBounded()) {
@Override
public <X> Partitioning<X> getPartitioning() {
return new Partitioning<X>() {
@Override
public int getNumPartitions() {
return source.getPartitions().size();
}
};
public int getNumPartitions() {
return source.getPartitions().size();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ private Flow(String name, Settings settings) {
this.settings = cloneSettings(settings);
}

/**
* Creates a new (anonymous) Flow.
*
* @return a new flow with an undefined name,
* i.e. either not named at all or with a system generated name
*/
public static Flow create() {
return create(null);
}

/**
* Creates a new Flow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static class WindowingBuilder<IN, KEY>

WindowingBuilder(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor) {
// define default partitioning
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static class WindowingBuilder<IN, ELEM>
UnaryFunction<IN, ELEM> mapper /* optional */) {

// define default partitioning
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ public static class WindowingBuilder<LEFT, RIGHT, KEY, OUT>
BinaryFunctor<LEFT, RIGHT, OUT> joinFunc) {

// define default partitioning
super(new DefaultPartitioning<>(Math.max(
left.getPartitioning().getNumPartitions(),
right.getPartitioning().getNumPartitions())));
super(new DefaultPartitioning<>(
Math.max(left.getNumPartitions(), right.getNumPartitions())));

this.name = Objects.requireNonNull(name);
this.left = Objects.requireNonNull(left);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public static class DatasetBuilder4<IN, KEY, VALUE, OUT>
ReduceFunction<VALUE, OUT> reducer) {

// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static class DatasetBuilder5<
CombinableReduceFunction<STATE> stateCombiner)
{
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ public Partitioner<Byte> getPartitioner() {
}
@Override
public int getNumPartitions() {
return numPartitions > 0
? numPartitions : input.getPartitioning().getNumPartitions();
return numPartitions > 0 ? numPartitions : input.getNumPartitions();
}
});
this.reducer = reducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static class OutputBuilder<IN>
private final Dataset<IN> input;
OutputBuilder(String name, Dataset<IN> input) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,6 @@ protected StateAwareWindowWiseSingleInputOperator(
this.output = createOutput(input);
}

protected StateAwareWindowWiseSingleInputOperator(
String name,
Flow flow,
Dataset<IN> input,
UnaryFunction<KIN, KEY> extractor,
Windowing<WIN, W> windowing /* optional */,
UnaryFunction<WIN, Long> eventTimeAssigner /* optional */) {
this(name, flow, input, extractor, windowing, eventTimeAssigner, input.getPartitioning());
}

@Override
public Collection<Dataset<IN>> listInputs() {
return Collections.singletonList(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static class ByBuilder2<IN, KEY>
private UnaryFunction<IN, Long> valueExtractor = e -> 1L;
ByBuilder2(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ public static class WindowByBuilder<IN, K, V, S extends Comparable<S>>
UnaryFunction<IN, V> valueFn,
UnaryFunction<IN, S> scoreFn)
{
super(new DefaultPartitioning<>(
input.getPartitioning().getNumPartitions()));
super(new DefaultPartitioning<>(input.getNumPartitions()));

this.name = requireNonNull(name);
this.input = requireNonNull(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public static class OfBuilder {
this.name = name;
}

public <IN> OutputBuilder<IN> of(Dataset<IN> left, Dataset<IN> right)
{
public <IN> OutputBuilder<IN> of(Dataset<IN> left, Dataset<IN> right) {
if (right.getFlow() != left.getFlow()) {
throw new IllegalArgumentException("Pass inputs from the same flow");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.BinaryFunctor;
import cz.seznam.euphoria.core.client.io.Context;
import org.junit.Test;

Expand Down Expand Up @@ -57,4 +58,22 @@ public void testBuild_ImplicitName() {
FlatMap map = (FlatMap) flow.operators().iterator().next();
assertEquals("FlatMap", map.getName());
}

/**
* Verify that the number of partitions of the flat map
* operator's input is preserved in the output.
*/
@Test
public void testOutputNumPartitionsIsUnchanged() {
final int N_PARTITIONS = 78;

Flow f = Flow.create();
Dataset<Object> input = Util.createMockDataset(f, N_PARTITIONS);
assertEquals(N_PARTITIONS, input.getNumPartitions());

Dataset<Object> output = FlatMap.of(input)
.using((Object o, Context<Object> c) -> c.collect(o))
.output();
assertEquals(N_PARTITIONS, output.getNumPartitions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import cz.seznam.euphoria.core.client.dataset.HashPartitioning;
import cz.seznam.euphoria.core.client.dataset.windowing.Time;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.client.util.Sums;
import org.junit.Test;

import java.time.Duration;
import java.util.List;
import java.util.stream.StreamSupport;

import static org.junit.Assert.*;
Expand Down Expand Up @@ -148,4 +149,45 @@ public void testBuild_Partitioner() {
assertTrue(reduce.getPartitioning().getPartitioner() instanceof HashPartitioner);
assertEquals(5, reduce.getPartitioning().getNumPartitions());
}

/**
* Verify that the number of partitions of the reduce-by-key
* operator's input is preserved in the output since no partitioning
* is explicitly specified.
*/
@Test
public void testOutputNumPartitionsIsPreserved() {
final int N_PARTITIONS = 78;

Flow f = Flow.create();
Dataset<Pair<String, Long>> input = Util.createMockDataset(f, N_PARTITIONS);
assertEquals(N_PARTITIONS, input.getNumPartitions());

Dataset<Pair<String, Long>> output =
ReduceByKey.of(input)
.keyBy(Pair::getFirst)
.valueBy(Pair::getSecond)
.combineBy(Sums.ofLongs())
.output();
assertEquals(N_PARTITIONS, output.getNumPartitions());
}

@Test
public void testOutputExplicitNumPartitionsIsRespected() {
final int INPUT_PARTITIONS = 78;
final int OUTPUT_PARTITIONS = 13;

Flow f = Flow.create();
Dataset<Pair<String, Long>> input = Util.createMockDataset(f, INPUT_PARTITIONS);
assertEquals(INPUT_PARTITIONS, input.getNumPartitions());

Dataset<Pair<String, Long>> output =
ReduceByKey.of(input)
.keyBy(Pair::getFirst)
.valueBy(Pair::getSecond)
.combineBy(Sums.ofLongs())
.setNumPartitions(OUTPUT_PARTITIONS)
.output();
assertEquals(OUTPUT_PARTITIONS, output.getNumPartitions());
}
}

0 comments on commit c4b12da

Please sign in to comment.