/*
*
*
* Copyright 1997-2007 BBNT Solutions, LLC
* under sponsorship of the Defense Advanced Research Projects
* Agency (DARPA).
*
* You can redistribute this software and/or modify it under the
* terms of the Cougaar Open Source License as published on the
* Cougaar Open Source Website (www.cougaar.org).
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*
*/
package org.cougaar.demo.mesh;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.cougaar.bootstrap.SystemProperties;
import org.cougaar.core.agent.service.alarm.AlarmBase;
import org.cougaar.core.blackboard.IncrementalSubscription;
import org.cougaar.core.blackboard.TodoSubscription;
import org.cougaar.core.mts.MessageAddress;
import org.cougaar.core.plugin.ComponentPlugin;
import org.cougaar.core.relay.SimpleRelay;
import org.cougaar.core.relay.SimpleRelaySource;
import org.cougaar.core.service.BlackboardService;
import org.cougaar.core.service.LoggingService;
import org.cougaar.core.service.UIDService;
import org.cougaar.util.Arguments;
import org.cougaar.util.UnaryPredicate;
/**
* This plugin is a relay scalability test that creates an arbitrarily large
* "mesh" of relays.
*
* For example, this plugin can be configured to create a fully-connected
* "star" network formation:
* agent "Peer0" sends to "Peer1" and "Peer2"
* agent "Peer1" sends to "Peer0" and "Peer2"
* agent "Peer2" sends to "Peer0" and "Peer1"
*
*
* Other topologies can be created, e.g. chains, rings, trees, etc. The
* only requirement is that, if agent "A" lists agent "B" as a target,
* then "B" must also list "A" as a target.
*
* Each relay iteration waits until the prior iteration has completed, making
* it easy to identify bottlenecks and dropped/duplicate relays. Each agent
* logs
* Completed all N iterations
* once all maxIterations have succeeded.
*
* Plugin parameters:
* - targets=
-
* Required comma-separated targets list, which supports range expressions
* for easy scalability testing. For example, "Peer[0..3]" is expanded
* to:
* Peer0, Peer1, Peer2
* Note that this agent must be listed as one of the targets, otherwise
* it will not send any relays. This "self" requirement makes it easy
* to enable/disable many agents using a global-replace in the configuration
* file.
* - verbose=true
-
* Enable verbose SHOUT logging.
* - bloatSize=-1
-
* Number of extra bytes to bloat each message, or -1 for no added size.
* - maxIterations=-1
-
* Maximum number of relay iterations, or -1 for no limit.
* - delayMillis=5000
-
* Added delay between iterations, in milliseconds, or -1 for for no
* extra delay.
* - timeoutMillis=5000
-
* How long to wait into an iteration before logging a warning that
* the iteration is taking a long time, or -1 for no warnings.
* - exitWhenDone=false
-
* Call {@link System#exit} when all maxIterations have been
* completed.
*
*
*/
public class MeshPlugin extends ComponentPlugin {
private LoggingService log;
private UIDService uids;
private List targets;
private boolean verbose;
private int bloatSize;
private long maxIterations;
private long delayMillis;
private long timeoutMillis;
private boolean exitWhenDone;
private long counter = -1;
private final Map sent =
new HashMap();
private final Map received =
new HashMap();
private int num_pending = 0;
// time at end of our "load()" method
private long loadTime;
// time when we completed our second iteration
private long activeTime = -2;
// time when our delay-alarm will expire
private long delayTime = -1;
// time when we called "setTimeout()", or -1 if cancelled
private long setTime = -1;
// time when our timeout-alarm will expire
private long timeoutTime = -1;
private IncrementalSubscription sub;
private TodoSubscription expiredAlarms;
/** This method is called when the agent is created */
public void load() {
super.load();
// get our required Cougaar services
log = (LoggingService)
getServiceBroker().getService(this, LoggingService.class, null);
uids = (UIDService)
getServiceBroker().getService(this, UIDService.class, null);
// parse our plugin parameters
Arguments args = new Arguments(getParameters(), getClass());
String targets_string = args.getString("targets");
targets = parseTargets(targets_string, agentId.getAddress());
verbose = args.getBoolean("verbose", true);
bloatSize = args.getInt("bloatSize", -1);
maxIterations = args.getLong("maxIterations", -1);
delayMillis = args.getLong("delayMillis", 5000);
timeoutMillis = args.getLong("timeoutMillis", 30000);
exitWhenDone = args.getBoolean("exitWhenDone", false);
if (verbose && log.isShoutEnabled()) {
log.shout(
"Parsed "+targets.size()+" target"+(targets.size() == 1 ? "" : "s")+
": "+targets);
}
loadTime = System.currentTimeMillis();
}
/** This method is called when the agent starts. */
protected void setupSubscriptions() {
if (targets.isEmpty()) {
// we're not in the targets set, so don't send anything
return;
}
// initialize our received table
for (String target : targets) {
received.put(target, new Long(0));
}
// subscribe to all relays sent to our agent
UnaryPredicate pred =
new UnaryPredicate() {
public boolean execute(Object o) {
return
((o instanceof SimpleRelay) &&
(agentId.equals(((SimpleRelay) o).getTarget())));
}
};
sub = (IncrementalSubscription) blackboard.subscribe(
new IncrementalSubscription(pred, new HashSet()) {
// we need our added list to be in order!
protected Set createAddedSet() {
return new LinkedHashSet(5);
}
});
// create a queue for expired alarms
expiredAlarms = (TodoSubscription) blackboard.subscribe(
new TodoSubscription("myAlarms"));
if (blackboard.didRehydrate()) {
// restarting from an agent move or persistence snapshot
restoreState();
return;
}
// send our first round of relays to our targets
sendNow();
// when any target publishes a relay or one of our alarms fires,
// our "execute()" method will be called.
}
/** This method is called whenever a subscription changes or alarm fires. */
protected void execute() {
if (sub == null) {
// Our "execute()" is always called at least once, even if we
// don't have any subscriptions.
assert targets.isEmpty();
return;
}
// check for expired alarms (delays & timeouts)
if (expiredAlarms.hasChanged()) {
for (Object oi : expiredAlarms.getAddedCollection()) {
handleAlarm((MyAlarm) oi);
}
}
// check for incoming relays
if (sub.hasChanged()) {
for (Object oi : sub.getAddedCollection()) {
handleRelay((SimpleRelay) oi);
}
}
}
// handle an expired alarm
private void handleAlarm(MyAlarm alarm) {
assert (delayMillis > 0 || timeoutMillis > 0);
long now = System.currentTimeMillis();
if (delayTime > 0 && delayTime <= now) {
// send our next round of relays to our targets
delayTime = -1;
sendNow();
}
// check for an input timeout
if (timeoutTime > 0 && timeoutTime <= now) {
checkTimeout();
}
}
// handle an incoming relay
private void handleRelay(SimpleRelay relay) {
String target = relay.getSource().getAddress();
Object new_obj = relay.getQuery();
if (new_obj instanceof Payload) {
new_obj = ((Payload) new_obj).getData();
}
Long new_value = (Long) new_obj;
long value = new_value.longValue();
if (verbose && log.isShoutEnabled()) {
log.shout("Received "+value+" from "+target);
}
if (value != (counter+1) && value != (counter+2)) {
log.error(
"Expecting "+(counter+1)+" or "+(counter+2)+" from "+target+
", not "+value+", relay is "+relay);
return;
}
Long old_obj = received.get(target);
if (old_obj == null) {
log.error("Unexpected relay from "+target+" "+relay);
return;
}
long old_value = old_obj.longValue();
if (value == (old_value + 1)) {
received.put(target, new_value);
} else {
// this is an error, unless we're restarting from an agent move or
// persistence snapshot.
log.warn(
"Unexpected value "+value+" from "+target+", expecting "+(old_value+1)+
", relay is "+relay+". Was this agent moved or restarted?");
if (value > old_value) {
received.put(target, new_value);
}
}
if (value != (counter+1)) {
return;
}
num_pending--;
if (num_pending > 0) {
// keep waiting for more relays
return;
}
cancelTimeout();
// send next relay
if (delayMillis > 0) {
// set an alarm to call our "execute()" method in the future
sendLater();
} else {
// send our relay now
sendNow();
}
}
/** Send our next relay iteration now */
private void sendNow() {
// record the timestamp of our second iteration
//
// we ignore the first iteration because it includes the naming and
// messaging startup costs.
if (activeTime < 0) {
if (activeTime < -1) {
activeTime = -1;
} else {
activeTime = System.currentTimeMillis();
}
}
// increment counter
counter++;
if (maxIterations >= 0 && counter >= maxIterations) {
long now = System.currentTimeMillis();
log.shout(
"Completed all "+counter+" iterations in an initial "+
(activeTime - loadTime) + " plus subsequent " +
(now - activeTime) + " milliseconds");
if (exitWhenDone) {
try {
System.exit(0);
} catch (Exception e) {
log.error("Unable to exit", e);
}
}
return;
}
// delete old sent relays
for (SimpleRelay priorRelay : sent.values()) {
blackboard.publishRemove(priorRelay);
}
sent.clear();
// send new relays
Object content = new Long(counter+1);
if (bloatSize > 0) {
content = new Payload(content, bloatSize);
}
if (verbose && log.isShoutEnabled()) {
log.shout(
"Sending counter "+content+" to "+targets.size()+" target"+
(targets.size() == 1 ? "" : "s"));
}
for (String target : targets) {
SimpleRelay relay = new SimpleRelaySource(
uids.nextUID(),
agentId,
MessageAddress.getMessageAddress(target),
content);
sent.put(target, relay);
blackboard.publishAdd(relay);
}
updateNumPending();
if (num_pending <= 0) {
// if our delay is relatively large compared to the comms time, then
// all our inputs may have already arrived.
if (delayMillis > 0) {
sendLater();
} else {
log.error("None pending?");
}
return;
}
// set timeout alarm
setTimeout();
}
/** Send our next relay iteration after the non-zero delayMillis */
private void sendLater() {
if (verbose && log.isShoutEnabled()) {
log.shout(
"Will send counter "+(counter+1)+" in "+(delayMillis/1000)+" seconds");
}
delayTime = System.currentTimeMillis() + delayMillis;
getAlarmService().addRealTimeAlarm(new MyAlarm(delayTime));
}
private void updateNumPending() {
// update num_pending
num_pending = targets.size();
for (Long vi : received.values()) {
long value = vi.longValue();
if (value > counter) {
num_pending--;
}
}
}
// The following use of alarms has been optimized!
//
// The naive solution is to create an alarm in "setTimeout()", cancel it in
// "cancelTimeout()", and "checkTimeout()" simply calls "handleTimeout()".
// The downside to this solution is that we expect our mesh iteration period
// to be *much* smaller than the timeout period, which will result in lots of
// created-then-cancelled alarms. This is wasteful.
//
// Instead, it's more efficient to keep an alarm around and not cancel it.
// This makes the "setTimeout()" and "cancelTimeout()" operations very fast.
// The minor downside is that the "checkTimeout()" method is a bit more
// complicated.
private void setTimeout() {
if (timeoutMillis <= 0) return;
setTime = System.currentTimeMillis();
if (timeoutTime < 0) {
timeoutTime = setTime + timeoutMillis;
getAlarmService().addRealTimeAlarm(new MyAlarm(timeoutTime));
}
}
private void cancelTimeout() {
if (timeoutMillis <= 0) return;
setTime = -1;
}
private void checkTimeout() {
assert timeoutMillis > 0;
long expirationTime = timeoutTime;
timeoutTime = -1;
if (setTime >= 0) {
long t = setTime + timeoutMillis;
if (expirationTime < t) {
timeoutTime = t;
getAlarmService().addRealTimeAlarm(new MyAlarm(timeoutTime));
} else {
handleTimeout();
}
}
}
private void handleTimeout() {
reportTimeout();
setTimeout();
}
private void reportTimeout() {
if (!log.isWarnEnabled()) return;
StringBuffer buf =
new StringBuffer(
"Waiting for "+num_pending+" of "+sent.size()+" relays {");
for (String target : targets) {
buf.append("\n ").append(target).append(" ");
long value = received.get(target).longValue();
buf.append(value);
if (value <= counter) {
buf.append(" PENDING");
}
}
buf.append("\n}");
log.warn(buf.toString());
}
private void restoreState() {
// figure out our counter by looking at our sent relays
UnaryPredicate pred = new UnaryPredicate() {
public boolean execute(Object o) {
return
((o instanceof SimpleRelay) &&
(agentId.equals(((SimpleRelay) o).getSource())));
}
};
long sent_value = -1;
Collection sent_col = blackboard.query(pred);
for (Object si : sent_col) {
SimpleRelay relay = (SimpleRelay) si;
String target = relay.getTarget().getAddress();
Object obj = relay.getQuery();
if (obj instanceof Payload) {
obj = ((Payload) obj).getData();
}
long value = ((Long) obj).longValue();
if (verbose && log.isShoutEnabled()) {
log.shout("Sent "+value+" to "+target+" "+relay);
}
if (value >= sent_value) {
sent_value = value;
}
}
counter = sent_value - 1;
if (log.isShoutEnabled()) {
log.shout("Restored counter at "+counter);
}
// remove any old sent relays
for (Object si : sent_col) {
SimpleRelay relay = (SimpleRelay) si;
String target = relay.getTarget().getAddress();
Object obj = relay.getQuery();
if (obj instanceof Payload) {
obj = ((Payload) obj).getData();
}
long value = ((Long) obj).longValue();
if (value == (counter + 1)) {
sent.put(target, relay);
} else {
log.warn(
"Found stale relay to "+target+" with value "+value+
" instead of expected "+(counter+1)+", removing "+relay);
blackboard.publishRemove(relay);
}
}
// figure out what we've received
for (Object oi : sub) {
SimpleRelay relay = (SimpleRelay) oi;
String target = relay.getSource().getAddress();
Object obj = relay.getQuery();
if (obj instanceof Payload) {
obj = ((Payload) obj).getData();
}
Long longV = (Long) obj;
long value = longV.longValue();
Long old_value = received.get(target);
if (verbose && log.isShoutEnabled()) {
log.shout("Received "+value+" from "+target+" "+relay);
}
if (old_value == null || old_value.longValue() < value) {
received.put(target, longV);
}
}
updateNumPending();
if (num_pending <= 0) {
// we must have been delaying between iterations.
if (delayMillis > 0) {
sendLater();
} else {
log.error("Restore with zero delay found no pending relays?");
}
return;
}
// we're still waiting for input, set timeout alarm
if (verbose && log.isShoutEnabled()) {
reportTimeout();
}
setTimeout();
}
// parse our target list, e.g. "Peer[0..20]"
private static List parseTargets(String s, String thisAgent) {
if (s == null) {
throw new IllegalArgumentException("Must specify targets");
}
boolean containsThisAgent = false;
List ret = new ArrayList();
s = s.replace('\n', ' ');
String[] sa = s.split(",");
ret = new ArrayList();
for (String si : sa) {
si = si.trim();
if (si.length() <= 0) continue;
try {
int j = si.indexOf('[');
int k = (j >= 0 ? si.indexOf(']', j) : -1);
if (j <= 0 || k <= 0) {
// specific target name, e.g. "Foo"
String target = si;
if (target.equals(thisAgent)) {
containsThisAgent = true;
} else {
ret.add(target);
}
continue;
}
// expand pattern, e.g. "X[0..3]Y" becomes "X0Y, X1Y, X2Y"
//
// could use a regex here
int q = si.indexOf("..", j);
if (q >= k) q = -1;
int seq_begin;
int seq_end;
if (q < 0) {
String x = si.substring(j+1, k).trim();
seq_begin = Integer.parseInt(x);
seq_end = seq_begin+1;
} else {
String x = si.substring(j+1, q).trim();
String y = si.substring(q+2, k).trim();
seq_begin = Integer.parseInt(x);
seq_end = Integer.parseInt(y);
}
for (int index = seq_begin; index < seq_end; index++) {
String target =
si.substring(0, j).trim() +
index +
si.substring(k+1).trim();
if (target.equals(thisAgent)) {
containsThisAgent = true;
} else {
ret.add(target);
}
}
} catch (Exception e) {
throw new RuntimeException("Invalid target: "+si, e);
}
}
if (!containsThisAgent) {
ret = Collections.emptyList();
}
return ret;
}
private class MyAlarm extends AlarmBase {
public MyAlarm(long futureTime) {
super(futureTime);
}
// Put this alarm on the "expiredAlarms" queue and request an "execute()"
public void onExpire() {
expiredAlarms.add(this);
}
}
}