Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion stresso/bin/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ mkdir -p lib
# populate lib dir used by fluo init
rm -f lib/*
cp target/stresso-0.0.1-SNAPSHOT.jar ./lib/
mvn dependency:copy-dependencies -DincludeArtifactIds=fluo-recipes-core -DoutputDirectory=./lib
mvn dependency:copy-dependencies -Dfluo.version=$FLUO_VERSION -Daccumulo.version=$ACCUMULO_VERSION -DincludeArtifactIds=fluo-recipes-core,commons-collections -DoutputDirectory=./lib
34 changes: 30 additions & 4 deletions stresso/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
<description>This repo contains an example application designed to stress Apache Fluo</description>
<url>https://github.com/astralway/stresso</url>
<properties>
<accumulo.version>2.0.1</accumulo.version>
<accumulo-plugin.version>1.0.0</accumulo-plugin.version>
<accumulo.version>2.1.0</accumulo.version>
<accumulo2-plugin.version>1.0.0</accumulo2-plugin.version>
<!-- Prevent findbugs from runnning because it does not work with Java 11 and is configured to run by parent pom. -->
<findbugs.skip>true</findbugs.skip>
<fluo-recipes.version>1.2.0</fluo-recipes.version>
<fluo.version>1.2.0</fluo.version>
<hadoop.version>3.1.1</hadoop.version>
<slf4j.version>1.7.12</slf4j.version>
<fluo.version>2.0.0</fluo.version>
<hadoop.version>3.3.5</hadoop.version>
<slf4j.version>1.7.36</slf4j.version>
<zookeeper.version>3.8.1</zookeeper.version>
</properties>
<!--
The provided scope is used for dependencies that should not end up in
Expand Down Expand Up @@ -205,6 +209,28 @@
<instanceName>it-instance-maven</instanceName>
<rootPassword>ITSecret</rootPassword>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${accumulo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>run-plugin</id>
Expand Down
7 changes: 4 additions & 3 deletions stresso/src/main/java/stresso/trie/AccumuloUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ public static <T> T getTableOp(FluoConfiguration fc, TableOp<T> tableOp) {
try (FluoAdmin fadmin = FluoFactory.newAdmin(fc)) {
FluoConfiguration appCfg = new FluoConfiguration(fadmin.getApplicationConfig());
appCfg.setApplicationName(fc.getApplicationName());
AccumuloClient client =
try (AccumuloClient client =
Accumulo.newClient().to(appCfg.getAccumuloInstance(), appCfg.getAccumuloZookeepers())
.as(appCfg.getAccumuloUser(), appCfg.getAccumuloPassword()).build();
return tableOp.run(client.tableOperations(), appCfg.getAccumuloTable());
.as(appCfg.getAccumuloUser(), appCfg.getAccumuloPassword()).build()) {
return tableOp.run(client.tableOperations(), appCfg.getAccumuloTable());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
2 changes: 1 addition & 1 deletion stresso/src/main/java/stresso/trie/Generate.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public int run(String[] args) throws Exception {
if (args.length != 4) {
log.error("Usage: " + this.getClass().getSimpleName()
+ " <numMappers> <numbersPerMapper> <max> <output dir>");
System.exit(-1);
return -1;
}

int numMappers = Integer.parseInt(args[0]);
Expand Down
2 changes: 1 addition & 1 deletion stresso/src/main/java/stresso/trie/Init.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public int run(String[] args) throws Exception {
if (args.length != 4) {
System.err.println("Usage: " + this.getClass().getSimpleName()
+ " <fluo conn props> <app name> <input dir> <tmp dir>");
System.exit(-1);
return -1;
}

FluoConfiguration props = new FluoConfiguration(new File(args[0]));
Expand Down
6 changes: 3 additions & 3 deletions stresso/src/main/java/stresso/trie/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ protected void map(LongWritable key, NullWritable val, Context context)
public int run(String[] args) throws Exception {

if (args.length != 3) {
log.error(
"Usage: " + this.getClass().getSimpleName() + "<fluo conn props> <app name> <input dir>");
System.exit(-1);
log.error("Usage: " + this.getClass().getSimpleName()
+ " <fluo conn props> <app name> <input dir>");
return -1;
}

FluoConfiguration props = new FluoConfiguration(new File(args[0]));
Expand Down
5 changes: 5 additions & 0 deletions stresso/src/main/java/stresso/trie/Print.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public boolean equals(Object o) {

return false;
}

@Override
public int hashCode() {
throw new UnsupportedOperationException();
}
}

public static Stats getStats(SimpleConfiguration config) throws Exception {
Expand Down
2 changes: 1 addition & 1 deletion stresso/src/main/java/stresso/trie/Unique.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public int run(String[] args) throws Exception {

if (args.length < 1) {
log.error("Usage: " + this.getClass().getSimpleName() + "<input dir>{ <input dir>}");
System.exit(-1);
return -1;
}

JobConf job = new JobConf(getConf());
Expand Down
2 changes: 1 addition & 1 deletion stresso/src/test/java/stresso/ITBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class ITBase {
@BeforeClass
public static void setUpAccumulo() throws Exception {
instanceName = System.getProperty(IT_INSTANCE_NAME_PROP, "it-instance-default");
File instanceDir = new File("target/accumulo-maven-plugin/" + instanceName);
File instanceDir = new File("target/accumulo2-maven-plugin/" + instanceName);
boolean instanceClear =
System.getProperty(IT_INSTANCE_CLEAR_PROP, "true").equalsIgnoreCase("true");
if (instanceDir.exists() && instanceClear) {
Expand Down
4 changes: 3 additions & 1 deletion stresso/src/test/java/stresso/TrieBasicIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import stresso.trie.Node;
import stresso.trie.NodeObserver;
import stresso.trie.NumberLoader;
import stresso.trie.StressoObserverProvider;

import static stresso.trie.Constants.COUNT_SEEN_COL;
import static stresso.trie.Constants.TYPEL;
Expand All @@ -49,7 +50,7 @@ public class TrieBasicIT extends ITBase {

@Override
protected void preInit(FluoConfiguration conf) {
conf.addObserver(new ObserverSpecification(NodeObserver.class.getName()));
conf.setObserverProvider(StressoObserverProvider.class);
conf.getAppConfiguration().setProperty(Constants.STOP_LEVEL_PROP, 0);
}

Expand Down Expand Up @@ -111,6 +112,7 @@ private void runTrieTest(int ingestNum, int maxValue, int nodeSize) throws Excep
if (result == null) {
log.error("Could not find root node");
FluoITHelper.printFluoTable(client);
result = 0;
} else if (!result.equals(uniqueNum)) {
log.error(
"Count (" + result + ") at root node does not match expected (" + uniqueNum + "):");
Expand Down
27 changes: 16 additions & 11 deletions stresso/src/test/java/stresso/TrieMapRedIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import stresso.trie.Load;
import stresso.trie.NodeObserver;
import stresso.trie.Print;
import stresso.trie.StressoObserverProvider;
import stresso.trie.Unique;

/**
Expand All @@ -46,7 +47,7 @@ public class TrieMapRedIT extends ITBase {

@Override
protected void preInit(FluoConfiguration conf) {
conf.addObserver(new ObserverSpecification(NodeObserver.class.getName()));
conf.setObserverProvider(StressoObserverProvider.class);

SimpleConfiguration appCfg = conf.getAppConfiguration();
appCfg.setProperty(Constants.STOP_LEVEL_PROP, 0);
Expand All @@ -60,16 +61,19 @@ static void generate(int numMappers, int numPerMapper, int max, File out1) throw
Assert.assertEquals(0, ret);
}

static void load(int nodeSize, File fluoPropsFile, File input) throws Exception {
int ret = ToolRunner.run(new Load(), new String[] {"-D", "mapred.job.tracker=local", "-D",
"fs.defaultFS=file:///", fluoPropsFile.getAbsolutePath(), input.toURI().toString()});
static void load(int nodeSize, File fluoPropsFile, String appName, File input) throws Exception {
int ret = ToolRunner.run(new Load(),
new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///",
fluoPropsFile.getAbsolutePath(), appName, input.toURI().toString()});
Assert.assertEquals(0, ret);
}

static void init(int nodeSize, File fluoPropsFile, File input, File tmp) throws Exception {
static void init(int nodeSize, File fluoPropsFile, String appName, File input, File tmp)
throws Exception {
int ret = ToolRunner.run(new Init(),
new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///",
fluoPropsFile.getAbsolutePath(), input.toURI().toString(), tmp.toURI().toString()});
fluoPropsFile.getAbsolutePath(), appName, input.toURI().toString(),
tmp.toURI().toString()});
Assert.assertEquals(0, ret);
}

Expand All @@ -90,15 +94,16 @@ static int unique(File... dirs) throws Exception {
public void testEndToEnd() throws Exception {
File testDir = new File("target/MRIT");
FileUtils.deleteQuietly(testDir);
testDir.mkdirs();
Assert.assertTrue(testDir.mkdirs());
File fluoPropsFile = new File(testDir, "fluo.props");

config.save(fluoPropsFile);
String appName = config.getApplicationName();

File out1 = new File(testDir, "nums-1");

generate(2, 100, 500, out1);
init(8, fluoPropsFile, out1, new File(testDir, "initTmp"));
init(8, fluoPropsFile, appName, out1, new File(testDir, "initTmp"));
int ucount = unique(out1);

Assert.assertTrue(ucount > 0);
Expand All @@ -108,7 +113,7 @@ public void testEndToEnd() throws Exception {
Assert.assertEquals(new Print.Stats(0, ucount, false), Print.getStats(config));

// reload same data
load(8, fluoPropsFile, out1);
load(8, fluoPropsFile, appName, out1);

miniFluo.waitForObservers();

Expand All @@ -117,7 +122,7 @@ public void testEndToEnd() throws Exception {
// load some new data
File out2 = new File(testDir, "nums-2");
generate(2, 100, 500, out2);
load(8, fluoPropsFile, out2);
load(8, fluoPropsFile, appName, out2);
int ucount2 = unique(out1, out2);
Assert.assertTrue(ucount2 > ucount); // used > because the probability that no new numbers are
// chosen is exceedingly small
Expand All @@ -128,7 +133,7 @@ public void testEndToEnd() throws Exception {

File out3 = new File(testDir, "nums-3");
generate(2, 100, 500, out3);
load(8, fluoPropsFile, out3);
load(8, fluoPropsFile, appName, out3);
int ucount3 = unique(out1, out2, out3);
Assert.assertTrue(ucount3 > ucount2); // used > because the probability that no new numbers are
// chosen is exceedingly small
Expand Down
3 changes: 2 additions & 1 deletion stresso/src/test/java/stresso/TrieStopLevelIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
import stresso.trie.Constants;
import stresso.trie.Node;
import stresso.trie.NodeObserver;
import stresso.trie.StressoObserverProvider;

public class TrieStopLevelIT extends TrieMapRedIT {

@Override
protected void preInit(FluoConfiguration conf) {
conf.addObserver(new ObserverSpecification(NodeObserver.class.getName()));
conf.setObserverProvider(StressoObserverProvider.class);

SimpleConfiguration appCfg = conf.getAppConfiguration();
appCfg.setProperty(Constants.STOP_LEVEL_PROP, 7);
Expand Down