diff --git a/framework/src/dslabs/framework/Node.java b/framework/src/dslabs/framework/Node.java index eb6dbeff..f02fb706 100644 --- a/framework/src/dslabs/framework/Node.java +++ b/framework/src/dslabs/framework/Node.java @@ -22,9 +22,11 @@ package dslabs.framework; +import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -35,17 +37,13 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import java.util.logging.Level; import lombok.EqualsAndHashCode; import lombok.NonNull; import lombok.SneakyThrows; import lombok.ToString; import lombok.extern.java.Log; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Nodes are the basic unit of computation. They can send and receive {@link Message}s, set and @@ -104,20 +102,78 @@ @EqualsAndHashCode(of = {"subNodes"}) @ToString(of = {"address", "subNodes"}) public abstract class Node implements Serializable { + + /** + * An {@code Environment} is how a {@link Node} sends messages to other nodes and sets timers. It + * also can handle exceptions which are thrown during the handling of events. + * + * @hidden + */ + public interface Environment { + void send(Message message, Address from, Address to); + + default void broadcast(Message message, Address from, ImmutableList
to) { + for (Address address : to) { + send(message, from, address); + } + } + + default void set(Timer timer, Address destination, Duration duration) { + set(timer, destination, duration, duration); + } + + void set(Timer timer, Address destination, Duration minDuration, Duration maxDuration); + + /** + * Possibly handles a throwable which was thrown during the execution of an event handler. + * + * @param throwable The exception which was thrown during the handling of an event. + * @return Whether the throwable was consumed. If {@code false}, {@code throwable} should be + * thrown. + */ + default boolean handleThrowable(Throwable throwable) { + return false; + } + } + + /** + * @param logExceptions Whether to log exceptions thrown by the node during message and timer + * handling, in addition to sending them to {@link Environment#handleThrowable}. + * @hidden + */ + public record Settings(boolean logExceptions) { + Settings() { + this(true); + } + } + private static final Map, Map>> methods = new ConcurrentHashMap<>(); /** This Node's address. */ @VizIgnore @NonNull private final Address address; - private transient Consumer> messageAdder; - private transient Consumer> batchMessageAdder; - private transient Consumer>> timerAdder; - private transient Consumer throwableCatcher; - private transient Boolean logExceptions = true; + /* + * INVARIANT: (environment == null && settings == null) || parentNode == null + * + * We do not store these as a sealed type, as environment and settings must be transient, while + * parentNode must not be transient. + */ + + /** + * This node's {@link Environment}, if this node is a root node (i.e., not a sub-node). If this + * node is a sub-node, this must be {@code null}. + */ + private transient @Nullable Environment environment; + + /** + * This node's current {@link Settings}, if this node is a root node (i.e., not a sub-node). If + * this node is a sub-node, this must be {@code null}. + */ + private transient @Nullable Settings settings; - /** The Node's parent (or null if this Node is the root Node in the hierarchy). */ - @VizIgnore private Node parentNode; + /** The Node's parent (or {@code null} if this Node is a root node). */ + @VizIgnore private @Nullable Node parentNode; /** * This Node's sub-Nodes, indexed by their ID. Sub-Nodes must have a {@link SubAddress} composed @@ -134,6 +190,26 @@ protected Node(@NonNull Address address) { this.address = address; } + /** The {@link Environment} for this Node's root Node. */ + private @Nullable Environment environment() { + if (parentNode != null) { + return parentNode.environment(); + } else { + return environment; + } + } + + /** The {@link Settings} for this Node's root Node. */ + private Settings settings() { + if (parentNode != null) { + return parentNode.settings(); + } else if (settings == null) { + return new Settings(); + } else { + return settings; + } + } + /** * Takes any initialization steps necessary (potentially sending {@link Message}s and setting * {@link Timer}s). @@ -153,9 +229,7 @@ protected final void addSubNode(@NonNull Node subNode) { "Attempting to add subNode with address that isn't a subAddress of this node."); } - if (subNode.messageAdder != null - || subNode.batchMessageAdder != null - || subNode.timerAdder != null) { + if (subNode.environment != null) { throw new IllegalArgumentException( "Cannot configure node; already configured as stand-alone."); } @@ -167,6 +241,7 @@ protected final void addSubNode(@NonNull Node subNode) { } subNode.parentNode = this; + subNode.settings = null; subNodes.put(subAddress.id(), subNode); } @@ -261,24 +336,17 @@ private void send(Message message, Address from, Address to) { return; } - // If this Node is a sub-Node, use the parent to send the message. - if (parentNode != null && messageAdder == null && batchMessageAdder == null) { - parentNode.send(message, from, to); - return; - } - - LOG.finest(() -> String.format("MessageSend(%s -> %s, %s)", from, to, message)); - - if (messageAdder != null) { - messageAdder.accept(new ImmutableTriple<>(from, to, message)); - } else if (batchMessageAdder != null) { - batchMessageAdder.accept(new ImmutableTriple<>(from, new Address[] {to}, message)); - } else { + Environment env = environment(); + if (env == null) { LOG.severe( String.format( "Attempting to send %s from %s to %s before node configured, not sending", message, from, to)); + return; } + + LOG.finest(() -> String.format("MessageSend(%s -> %s, %s)", from, to, message)); + env.send(message, from, to); } private void broadcast(Message message, Address from, Address[] to) { @@ -303,27 +371,18 @@ private void broadcast(Message message, Address from, Address[] to) { } } - // If this Node is a sub-Node, use the parent to broadcast the message. - if (parentNode != null && messageAdder == null && batchMessageAdder == null) { - parentNode.broadcast(message, from, to); - return; - } - - LOG.finest( - () -> String.format("MessageSend(%s -> %s, %s)", from, Arrays.toString(to), message)); - - if (batchMessageAdder != null) { - batchMessageAdder.accept(new ImmutableTriple<>(from, to, message)); - } else if (messageAdder != null) { - for (Address a : to) { - messageAdder.accept(new ImmutableTriple<>(from, a, message)); - } - } else { + Environment env = environment(); + if (env == null) { LOG.severe( String.format( "Attempting to send %s from %s to %s before node configured, not sending", message, from, Arrays.toString(to))); + return; } + + LOG.finest( + () -> String.format("MessageSend(%s -> %s, %s)", from, Arrays.toString(to), message)); + env.broadcast(message, from, ImmutableList.copyOf(to)); } private void set(Timer timer, int minTimerLengthMillis, int maxTimerLengthMillis, Address from) { @@ -332,23 +391,20 @@ private void set(Timer timer, int minTimerLengthMillis, int maxTimerLengthMillis return; } - // If this Node is a sub-Node, use the parent to set the timer. - if (parentNode != null && timerAdder == null) { - parentNode.set(timer, minTimerLengthMillis, maxTimerLengthMillis, from); - return; - } - - LOG.finest(() -> String.format("TimerSet(-> %s, %s)", from, timer)); - - if (timerAdder != null) { - timerAdder.accept( - new ImmutableTriple<>( - from, timer, new ImmutablePair<>(minTimerLengthMillis, maxTimerLengthMillis))); - } else { + Environment env = environment(); + if (env == null) { LOG.severe( String.format( "Attempting to set %s from %s before node configured, not setting", timer, from)); + return; } + + LOG.finest(() -> String.format("TimerSet(-> %s, %s)", from, timer)); + env.set( + timer, + from, + Duration.ofMillis(minTimerLengthMillis), + Duration.ofMillis(maxTimerLengthMillis)); } private Object handleMessageInternal( @@ -544,7 +600,7 @@ private Object callMethod( throw t; } - if (logExceptions) { + if (settings().logExceptions) { LOG.log( Level.SEVERE, String.format( @@ -553,8 +609,9 @@ private Object callMethod( t); } - if (throwableCatcher != null) { - throwableCatcher.accept(t); + Environment env = environment(); + if (env == null || !env.handleThrowable(t)) { + throw t; } } @@ -564,39 +621,19 @@ private Object callMethod( /** * Do not use. Only used by testing framework. * - *

Configures the node to allow it to send messages and set timers. + *

Configures the node to allow it to send messages and set timers. Should only be set on a + * root Node. * - *

At least one of {@code messageAdder}/{@code batchMessageAdder} must be non-null. - * - * @param messageAdder a function which consumes messages sent by the node, or {@code null} to - * have the node send all messages to the {@code batchMessageAdder} - * @param batchMessageAdder a function which consumes messages sent by the node to multiple - * recipients, or {@code null} to have the node send all messages to the {@code messageAdder} - * @param timerAdder a function which consumes timers set by the node - * @param throwableCatcher a function which consumes exceptions thrown by the node during message - * and timer handling, or {@code null} to have the node drop exceptions - * @param logExceptions whether to log exceptions thrown by the node during message and timer - * handling, in addition to sending them to the {@code throwableCatcher} + * @param environment The environment this node should use. + * @param settings The settings for this node. * @hidden */ - public void config( - Consumer> messageAdder, - Consumer> batchMessageAdder, - @NonNull Consumer>> timerAdder, - Consumer throwableCatcher, - boolean logExceptions) { + public void config(Environment environment, Settings settings) { if (parentNode != null) { + // TODO: Throw IllegalStateException? LOG.severe("Cannot configure Node already configured as sub-Node."); } - - if (messageAdder == null && batchMessageAdder == null) { - LOG.severe("Cannot configure Node without messageAdder or batchMessageAdder."); - } - - this.messageAdder = messageAdder; - this.batchMessageAdder = batchMessageAdder; - this.timerAdder = timerAdder; - this.throwableCatcher = throwableCatcher; - this.logExceptions = logExceptions; + this.environment = environment; + this.settings = settings; } } diff --git a/framework/tst/dslabs/framework/testing/ClientWorker.java b/framework/tst/dslabs/framework/testing/ClientWorker.java index 69531ea8..b1b7b6e1 100644 --- a/framework/tst/dslabs/framework/testing/ClientWorker.java +++ b/framework/tst/dslabs/framework/testing/ClientWorker.java @@ -36,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.function.Consumer; import javax.annotation.Nullable; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -44,7 +43,6 @@ import lombok.ToString; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; @EqualsAndHashCode( of = {"client", "results"}, @@ -297,14 +295,9 @@ public synchronized void onTimer(Timer timer, Address destination) { } @Override - public void config( - Consumer> messageAdder, - Consumer> batchMessageAdder, - Consumer>> timerAdder, - Consumer throwableCatcher, - boolean logExceptions) { + public void config(Environment environment, Settings settings) { // TODO: make sure there's no overhead for having the config both places - super.config(messageAdder, batchMessageAdder, timerAdder, throwableCatcher, logExceptions); - client().config(messageAdder, batchMessageAdder, timerAdder, throwableCatcher, logExceptions); + super.config(environment, settings); + client().config(environment, settings); } } diff --git a/framework/tst/dslabs/framework/testing/TimerEnvelope.java b/framework/tst/dslabs/framework/testing/TimerEnvelope.java index 142fa609..8b5f4d4f 100644 --- a/framework/tst/dslabs/framework/testing/TimerEnvelope.java +++ b/framework/tst/dslabs/framework/testing/TimerEnvelope.java @@ -25,6 +25,7 @@ import dslabs.framework.Address; import dslabs.framework.Timer; import dslabs.framework.VizIgnore; +import java.time.Duration; import java.util.Random; import lombok.Data; import lombok.EqualsAndHashCode; @@ -44,12 +45,11 @@ public final class TimerEnvelope implements Event, Comparable { private final Address to; private final Timer timer; - @VizIgnore private final int minTimerLengthMillis, maxTimerLengthMillis, timerLengthMillis; - - @VizIgnore private final long startTimeNanos; + @VizIgnore + private final long minTimerLengthMillis, maxTimerLengthMillis, timerLengthMillis, startTimeNanos; public TimerEnvelope( - Address to, Timer timer, int minTimerLengthMillis, int maxTimerLengthMillis) { + Address to, Timer timer, long minTimerLengthMillis, long maxTimerLengthMillis) { this.to = to; this.timer = timer; this.minTimerLengthMillis = minTimerLengthMillis; @@ -63,19 +63,24 @@ public TimerEnvelope( this.timerLengthMillis = minTimerLengthMillis; } else { this.timerLengthMillis = - minTimerLengthMillis + rand.nextInt(1 + maxTimerLengthMillis - minTimerLengthMillis); + minTimerLengthMillis + rand.nextLong(1 + maxTimerLengthMillis - minTimerLengthMillis); } this.startTimeNanos = System.nanoTime(); } + public TimerEnvelope(Address to, Timer timer, Duration minTimerLength, Duration maxTimerLength) { + this(to, timer, minTimerLength.toMillis(), maxTimerLength.toMillis()); + // TODO: Make this the canonical constructor, store Duration/Instant in TimerEnvelope. + } + @Override public Address locationRootAddress() { return to.rootAddress(); } public long endTimeNanos() { - return startTimeNanos + (((long) timerLengthMillis()) * 1000000); + return startTimeNanos + (timerLengthMillis() * 1000000); } public long timeRemainingNanos() { diff --git a/framework/tst/dslabs/framework/testing/runner/RunState.java b/framework/tst/dslabs/framework/testing/runner/RunState.java index b45b6c18..e188d8fe 100644 --- a/framework/tst/dslabs/framework/testing/runner/RunState.java +++ b/framework/tst/dslabs/framework/testing/runner/RunState.java @@ -27,6 +27,8 @@ import dslabs.framework.Client; import dslabs.framework.Message; import dslabs.framework.Node; +import dslabs.framework.Node.Environment; +import dslabs.framework.Node.Settings; import dslabs.framework.Timer; import dslabs.framework.testing.AbstractState; import dslabs.framework.testing.ClientWorker; @@ -46,7 +48,6 @@ import lombok.Getter; import lombok.ToString; import lombok.extern.java.Log; -import org.apache.commons.lang3.tuple.Pair; @Log @ToString(callSuper = true) @@ -97,22 +98,30 @@ protected synchronized void setupNode(Address address) { final Inbox inbox = network.inbox(address); node.config( - me -> { - // Clone on message send - Message m = Cloning.clone(me.getRight()); - network.send(new MessageEnvelope(me.getLeft(), me.getMiddle(), m)); - }, - null, - te -> { - // Clone timer on set - Timer t = Cloning.clone(te.getMiddle()); - Pair bounds = te.getRight(); - inbox.set(new TimerEnvelope(te.getLeft(), t, bounds.getLeft(), bounds.getRight())); - }, - e -> { - exceptionThrown = true; + new Environment() { + @Override + public void send(Message message, Address from, Address to) { + // Clone on message send + Message m = Cloning.clone(message); + network.send(new MessageEnvelope(from, to, message)); + } + + @Override + public void set( + Timer timer, Address destination, Duration minDuration, Duration maxDuration) { + // Clone timer on set + Timer t = Cloning.clone(timer); + inbox.set(new TimerEnvelope(destination, t, minDuration, maxDuration)); + } + + @Override + public boolean handleThrowable(Throwable throwable) { + // Log exceptions but ignore them during execution. + exceptionThrown = true; + return true; + } }, - true); + new Settings(true)); node.init(); // If we're already running in multi-threaded mode start the new node diff --git a/framework/tst/dslabs/framework/testing/search/SearchState.java b/framework/tst/dslabs/framework/testing/search/SearchState.java index 568ae7d7..b016e3ef 100644 --- a/framework/tst/dslabs/framework/testing/search/SearchState.java +++ b/framework/tst/dslabs/framework/testing/search/SearchState.java @@ -30,6 +30,8 @@ import dslabs.framework.Address; import dslabs.framework.Message; import dslabs.framework.Node; +import dslabs.framework.Node.Environment; +import dslabs.framework.Node.Settings; import dslabs.framework.Timer; import dslabs.framework.testing.AbstractState; import dslabs.framework.testing.ClientWorker; @@ -42,6 +44,7 @@ import dslabs.framework.testing.utils.Cloning; import java.io.PrintStream; import java.io.Serializable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -189,38 +192,49 @@ protected void cleanupNode(Address address) { private void configNode(final Address address) { node(address) .config( - me -> { - // Clone on message send - Message m = Cloning.clone(me.getRight()); - MessageEnvelope messageEnvelope = - new MessageEnvelope(me.getLeft(), me.getMiddle(), m); - network.add(messageEnvelope); - newMessages.add(messageEnvelope); - }, - me -> { - // Clone on message send - Message m = Cloning.clone(me.getRight()); - for (Address to : me.getMiddle()) { - MessageEnvelope messageEnvelope = new MessageEnvelope(me.getLeft(), to, m); + new Environment() { + @Override + public void send(Message message, Address from, Address to) { + // Clone on message send. + message = Cloning.clone(message); + MessageEnvelope messageEnvelope = new MessageEnvelope(from, to, message); network.add(messageEnvelope); newMessages.add(messageEnvelope); } + + @Override + public void broadcast(Message message, Address from, ImmutableList

to) { + // Clone message on send, but only clone it once to save resources. References will + // be re-cloned before delivery. + message = Cloning.clone(message); + for (Address destination : to) { + MessageEnvelope messageEnvelope = new MessageEnvelope(from, destination, message); + network.add(messageEnvelope); + newMessages.add(messageEnvelope); + } + } + + @Override + public void set( + Timer timer, Address destination, Duration minDuration, Duration maxDuration) { + // Clone on timer set. + timer = Cloning.clone(timer); + TimerEnvelope timerEnvelope = + new TimerEnvelope(destination, timer, minDuration, maxDuration); + timers.get(timerEnvelope.to().rootAddress()).add(timerEnvelope); + newTimers.add(timerEnvelope); + } + + @Override + public boolean handleThrowable(Throwable throwable) { + // Store the exception without logging it. + assert throwable != null; + assert thrownException == null; + thrownException = throwable; + return true; + } }, - te -> { - // Clone on timer set - Timer t = Cloning.clone(te.getMiddle()); - Pair bounds = te.getRight(); - TimerEnvelope timerEnvelope = - new TimerEnvelope(te.getLeft(), t, bounds.getLeft(), bounds.getRight()); - timers.get(timerEnvelope.to().rootAddress()).add(timerEnvelope); - newTimers.add(timerEnvelope); - }, - t -> { - assert t != null; - assert thrownException == null; - thrownException = t; - }, - false); + new Settings(false)); } Collection events(SearchSettings settings) { @@ -584,10 +598,9 @@ public boolean equals(final Object o) { if (o == this) { return true; } - if (!(o instanceof SearchEquivalenceWrappedSearchState)) { + if (!(o instanceof SearchEquivalenceWrappedSearchState other)) { return false; } - final SearchEquivalenceWrappedSearchState other = (SearchEquivalenceWrappedSearchState) o; if (!Objects.equals(state, other.state)) { return false; } diff --git a/framework/tst/dslabs/framework/testing/search/TimerQueue.java b/framework/tst/dslabs/framework/testing/search/TimerQueue.java index b92894e9..3ac7d478 100644 --- a/framework/tst/dslabs/framework/testing/search/TimerQueue.java +++ b/framework/tst/dslabs/framework/testing/search/TimerQueue.java @@ -69,7 +69,7 @@ Iterable deliverable() { @Nonnull public Iterator iterator() { return new Iterator<>() { - Integer minMaxTime = null; + Long minMaxTime = null; int i = 0; private void skip() { diff --git a/labs/lab2-primarybackup/tst/dslabs/primarybackup/ViewServerTest.java b/labs/lab2-primarybackup/tst/dslabs/primarybackup/ViewServerTest.java index 5e007ff6..8f9780b9 100644 --- a/labs/lab2-primarybackup/tst/dslabs/primarybackup/ViewServerTest.java +++ b/labs/lab2-primarybackup/tst/dslabs/primarybackup/ViewServerTest.java @@ -7,6 +7,9 @@ import dslabs.framework.Address; import dslabs.framework.Message; +import dslabs.framework.Node.Environment; +import dslabs.framework.Node.Settings; +import dslabs.framework.Timer; import dslabs.framework.testing.LocalAddress; import dslabs.framework.testing.MessageEnvelope; import dslabs.framework.testing.TimerEnvelope; @@ -17,6 +20,7 @@ import dslabs.framework.testing.junit.TestPointValue; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.time.Duration; import java.util.LinkedList; import java.util.Objects; import org.junit.Before; @@ -54,17 +58,19 @@ public void setup() // TODO: clone messages and timers!!! vs.config( - me -> messages.add(new MessageEnvelope(me.getLeft(), me.getMiddle(), me.getRight())), - null, - te -> - timers.add( - new TimerEnvelope( - te.getLeft(), - te.getMiddle(), - te.getRight().getLeft(), - te.getRight().getRight())), - null, - true); + new Environment() { + @Override + public void send(Message message, Address from, Address to) { + messages.add(new MessageEnvelope(from, to, message)); + } + + @Override + public void set( + Timer timer, Address destination, Duration minDuration, Duration maxDuration) { + timers.add(new TimerEnvelope(destination, timer, minDuration, maxDuration)); + } + }, + new Settings(true)); vs.init(); }