Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions simulator/config/dasprotocol.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
# ::::: GLOBAL ::::::

# Network size
SIZE 5000
SIZE 100

# Random seed
K 5

MINDELAY 10
MAXDELAY 200
MINDELAY 100
MAXDELAY 100

#Simulation time in ms
SIM_TIME 1000*60*9
Expand Down Expand Up @@ -88,7 +88,7 @@ control.0traffic.dasprotocol 4dasprotocol
control.0traffic.step TRAFFIC_STEP
control.0traffic.mapping_fn 2
control.0traffic.sample_copy_per_node 2
control.0traffic.block_dim_size 512
control.0traffic.block_dim_size 100

# turbulence
#control.2turbolenceAdd peersim.kademlia.Turbulence
Expand Down
2 changes: 1 addition & 1 deletion simulator/src/main/java/peersim/kademlia/das/Block.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public BigInteger computeRegionRadius(int numberOfCopiesPerSample) {
MAX_KEY
.divide(BigInteger.valueOf(Network.size()))
.multiply(BigInteger.valueOf(numberOfCopiesPerSample));
radius = radius.shiftRight(1);
// radius = radius.shiftRight(1);
return radius;
}

Expand Down
165 changes: 84 additions & 81 deletions simulator/src/main/java/peersim/kademlia/das/DASProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import peersim.kademlia.operations.Operation;
import peersim.transport.UnreliableTransport;

public class DASProtocol implements Cloneable, EDProtocol, KademliaEvents, MissingNode {
public class DASProtocol implements EDProtocol, KademliaEvents, MissingNode {

private static final String PAR_TRANSPORT = "transport";
// private static final String PAR_DASPROTOCOL = "dasprotocol";
Expand All @@ -46,9 +46,9 @@ public class DASProtocol implements Cloneable, EDProtocol, KademliaEvents, Missi
private static String prefix = null;
private UnreliableTransport transport;
private int tid;
private int kademliaId;
protected int kademliaId;

private KademliaProtocol kadProtocol;
protected KademliaProtocol kadProtocol;
/** allow to call the service initializer only once */
private static boolean _ALREADY_INSTALLED = false;

Expand All @@ -60,25 +60,25 @@ public class DASProtocol implements Cloneable, EDProtocol, KademliaEvents, Missi

private boolean isValidator;

private KeyValueStore kv;
protected KeyValueStore kv;

private Block currentBlock;
protected Block currentBlock;

private LinkedHashMap<Long, SamplingOperation> samplingOp;
protected LinkedHashMap<Long, SamplingOperation> samplingOp;

private LinkedHashMap<Operation, SamplingOperation> kadOps;
protected LinkedHashMap<Operation, SamplingOperation> kadOps;

private boolean samplingStarted;
protected boolean samplingStarted;

private SearchTable searchTable;
protected SearchTable searchTable;

private int[] row, column;
protected int[] row, column;

private int samplesRequested;
protected int samplesRequested;

private BigInteger[] validatorsList;
protected BigInteger[] validatorsList;

private HashSet<BigInteger> queried;
protected HashSet<BigInteger> queried;

protected int dasID;

Expand Down Expand Up @@ -408,69 +408,72 @@ protected void handleGetSampleResponse(Message m, int myPid) {
row[s.getRow()]++;
}

SamplingOperation op = (SamplingOperation) samplingOp.get(m.operationId);
SamplingOperation sop = (SamplingOperation) samplingOp.get(m.operationId);
// We continue an existing operation
if (op != null) {
if (sop != null) {
// keeping track of received samples
op.elaborateResponse(samples);
sop.elaborateResponse(samples);
logger.warning(
"Continue operation "
+ op.getId()
+ sop.getId()
+ " "
+ op.getAvailableRequests()
+ sop.getAvailableRequests()
+ " "
+ op.nrHops
+ sop.nrHops
+ " "
+ searchTable.nodesIndexed().size()
+ " "
+ ((SamplingOperation) op).samplesCount());
+ ((SamplingOperation) sop).samplesCount());

if (!op.completed() && op.nrHops < KademliaCommonConfigDas.MAX_HOPS) {
BigInteger[] nextNodes = op.doSampling();
if (!sop.completed() && sop.nrHops < KademliaCommonConfigDas.MAX_HOPS) {
BigInteger[] missingSamples = sop.getMissingSamples();
BigInteger[] nodesToAsk = searchTable.getNodesForSamples(missingSamples, sop.getRadius());

for (BigInteger nextNode : nextNodes) {
for (BigInteger nextNode : nodesToAsk) {
logger.warning("sending to node " + nextNode);
BigInteger[] reqSamples = op.getSamples();
Message msg = generateGetSampleMessage(reqSamples);
msg.operationId = op.getId();

Message msg = generateGetSampleMessage(missingSamples);
msg.operationId = sop.getId();
msg.src = this.kadProtocol.getKademliaNode();

msg.dst = kadProtocol.nodeIdtoNode(nextNode).getKademliaProtocol().getKademliaNode();
/*if (nextNode.compareTo(builderAddress) == 0) {
logger.warning("Error sending to builder or 0 samples assigned");
continue;
}*/
op.AddMessage(msg.id);
sop.AddMessage(msg.id);
sendMessage(msg, nextNode, myPid);
op.nrHops++;
sop.nrHops++;
}
if (nextNodes.length == 0) {
if (nodesToAsk.length == 0) {
logger.warning(
"No left nodes to ask "
+ op.getAvailableRequests()
+ sop.getAvailableRequests()
+ " "
+ kadOps.size()
+ " "
+ op.getSamples().length);
if (op.getAvailableRequests() == KademliaCommonConfigDas.ALPHA) {
for (BigInteger sample : op.getSamples()) logger.warning("Missing sample " + sample);
while (!doSampling(op)) {
op.increaseRadius(2);
logger.warning("Increasing radius " + op.getId());
+ sop.getMissingSamples().length);
if (sop.getAvailableRequests() == KademliaCommonConfigDas.ALPHA) {
for (BigInteger sample : sop.getMissingSamples())
logger.warning("Missing sample " + sample);
while (!doSampling(sop)) {
if (sop.increaseRadius(2)) {
samplingOp.remove(m.operationId);
logger.warning("Sampling operation finished");
KademliaObserver.reportOperation(sop);
break;
}
logger.warning("Increasing radius " + sop.getId());
}
/*samplingOp.remove(m.operationId);
logger.warning("Sampling operation finished");
KademliaObserver.reportOperation(op);*/

}
}
} else {
logger.warning("Operation completed");
samplingOp.remove(m.operationId);
if (op instanceof ValidatorSamplingOperation)
logger.warning("Sampling operation finished validator completed " + op.getId());
else logger.warning("Sampling operation finished random completed " + op.getId());
KademliaObserver.reportOperation(op);
if (sop instanceof ValidatorSamplingOperation)
logger.warning("Sampling operation finished validator completed " + sop.getId());
else logger.warning("Sampling operation finished random completed " + sop.getId());
KademliaObserver.reportOperation(sop);
}
// We start a new operation
// we start the actual sampling when the last sample from the builder is received
Expand All @@ -497,7 +500,7 @@ protected void handleGetSampleResponse(Message m, int myPid) {
* @param destId the Id of the destination node
* @param myPid the sender Pid
*/
private void sendMessage(Message m, BigInteger destId, int myPid) {
protected void sendMessage(Message m, BigInteger destId, int myPid) {

// int destpid;
assert m.src != null;
Expand Down Expand Up @@ -526,7 +529,7 @@ private void sendMessage(Message m, BigInteger destId, int myPid) {
*
* @return Message
*/
private Message generateGetSampleMessage(BigInteger[] sampleId) {
protected Message generateGetSampleMessage(BigInteger[] sampleId) {

Message m = new Message(Message.MSG_GET_SAMPLE, sampleId);
m.timestamp = CommonState.getTime();
Expand Down Expand Up @@ -569,7 +572,7 @@ public void addKnownValidator(BigInteger[] ids) {
* @param m initial message
* @param myPid protocol pid
*/
private void startRandomSampling(Message m, int myPid) {
protected void startRandomSampling(Message m, int myPid) {

logger.warning("Starting random sampling");
RandomSamplingOperation op =
Expand All @@ -594,7 +597,7 @@ private void startRandomSampling(Message m, int myPid) {
* @param m initial message
* @param myPid protocol pid
*/
private void startRowsandColumnsSampling(Message m, int myPid) {
protected void startRowsandColumnsSampling(Message m, int myPid) {
logger.warning(
"Starting rows and columns fetch "
+ rowWithHighestNumSamples()
Expand All @@ -615,7 +618,7 @@ private void startRowsandColumnsSampling(Message m, int myPid) {
0, CommonState.r.nextInt(KademliaCommonConfigDas.BLOCK_DIM_SIZE) + 1, m.timestamp);
}

private boolean doSampling(SamplingOperation sop) {
protected boolean doSampling(SamplingOperation sop) {

if (sop.completed()) {
samplingOp.remove(sop.getId());
Expand All @@ -625,38 +628,38 @@ private boolean doSampling(SamplingOperation sop) {
logger.warning("Sampling operation finished validator dosampling " + sop.getId());
else logger.warning("Sampling operation finished random dosampling " + sop.getId());
return true;
} else {
boolean success = false;
logger.warning("Dosampling " + sop.getAvailableRequests());
BigInteger[] nextNodes = sop.doSampling();
for (BigInteger nextNode : nextNodes) {
BigInteger[] reqSamples = sop.getSamples();
logger.warning(
"sending to node "
+ nextNode
+ " "
+ reqSamples.length
+ " "
+ sop.getAvailableRequests()
+ " "
+ sop.getId());

Message msg = generateGetSampleMessage(reqSamples);
msg.operationId = sop.getId();
msg.src = this.kadProtocol.getKademliaNode();
success = true;
msg.dst = kadProtocol.nodeIdtoNode(nextNode).getKademliaProtocol().getKademliaNode();
/*if (nextNode.compareTo(builderAddress) == 0) {
logger.warning("Error sending to builder or 0 samples assigned");
continue;
}*/
sop.AddMessage(msg.id);
// logger.warning("Send message " + dasID + " " + this);
sendMessage(msg, nextNode, dasID);
sop.nrHops++;
}
return success;
}
BigInteger[] missingSamples = sop.getMissingSamples();
BigInteger[] nodesToAsk = searchTable.getNodesForSamples(missingSamples, sop.getRadius());

boolean success = false;
logger.warning("Dosampling " + sop.getAvailableRequests());
for (BigInteger nextNode : nodesToAsk) {
logger.warning(
"sending to node "
+ nextNode
+ " "
+ missingSamples.length
+ " "
+ sop.getAvailableRequests()
+ " "
+ sop.getId());

Message msg = generateGetSampleMessage(missingSamples);
msg.operationId = sop.getId();
msg.src = this.kadProtocol.getKademliaNode();
success = true;
msg.dst = kadProtocol.nodeIdtoNode(nextNode).getKademliaProtocol().getKademliaNode();
/*if (nextNode.compareTo(builderAddress) == 0) {
logger.warning("Error sending to builder or 0 samples assigned");
continue;
}*/
sop.AddMessage(msg.id);
// logger.warning("Send message " + dasID + " " + this);
sendMessage(msg, nextNode, dasID);
sop.nrHops++;
}
return success;
}

private void createValidatorSamplingOperation(int row, int column, long timestamp) {
Expand All @@ -676,7 +679,7 @@ private void createValidatorSamplingOperation(int row, int column, long timestam
op.elaborateResponse(kv.getAll().toArray(new Sample[0]));
op.setAvailableRequests(KademliaCommonConfigDas.ALPHA);
while (!doSampling(op)) {
op.increaseRadius(2);
if (op.increaseRadius(2)) break;
}
}

Expand Down
19 changes: 11 additions & 8 deletions simulator/src/main/java/peersim/kademlia/das/Sample.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import peersim.kademlia.Util;

public class Sample {

Expand Down Expand Up @@ -95,22 +96,24 @@ public boolean isInRegion(BigInteger peerID, BigInteger radius) {

/** Given the peerID of a node, determine if this sample falls within the region of the node. */
public boolean isInRegionByColumn(BigInteger peerID, BigInteger radius) {
/** (peerID - radius) < this.id < (peerID + radius) */
if ((this.idByColumn.compareTo(peerID.subtract(radius)) == 1)
&& (this.idByColumn.compareTo(peerID.add(radius)) == -1)) {
// if radius is larger or equal than the distance between the node and the sample ID, the peerID
// is in the region
if (radius.compareTo(Util.xorDistance(this.idByColumn, peerID)) > -1) {
return true;
} else {
}
{
return false;
}
}

/** Given the peerID of a node, determine if this sample falls within the region of the node. */
public boolean isInRegionByRow(BigInteger peerID, BigInteger radius) {
/** (peerID - radius) < this.id < (peerID + radius) */
if ((this.idByRow.compareTo(peerID.subtract(radius)) == 1)
&& (this.idByRow.compareTo(peerID.add(radius)) == -1)) {
// if radius is larger or equal than the distance between the node and the sample ID, the peerID
// is in the region
if (radius.compareTo(Util.xorDistance(this.idByRow, peerID)) > -1) {
return true;
} else {
}
{
return false;
}
}
Expand Down
Loading