Skip to content
This repository was archived by the owner on Jan 29, 2022. It is now read-only.

Descending split key #121

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ public StandaloneMongoSplitter(final Configuration conf) {
public List<InputSplit> calculateSplits() throws SplitFailedException {
final DBObject splitKey = MongoConfigUtil.getInputSplitKey(getConfiguration());
final int splitSize = MongoConfigUtil.getSplitSize(getConfiguration());
final BasicDBObject splitMin = (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MIN_KEY);
final BasicDBObject splitMax = splitMin == null ? null : (BasicDBObject)MongoConfigUtil.getDBObject(getConfiguration(), MongoConfigUtil.SPLITS_MAX_KEY);
final MongoClientURI inputURI;

DBCollection inputCollection = null;
final ArrayList<InputSplit> returnVal;
final List<InputSplit> returnVal;
try {
inputURI = MongoConfigUtil.getInputURI(getConfiguration());
MongoClientURI authURI = MongoConfigUtil.getAuthURI(getConfiguration());
Expand All @@ -79,16 +82,20 @@ public List<InputSplit> calculateSplits() throws SplitFailedException {
inputCollection = MongoConfigUtil.getCollection(inputURI);
}

returnVal = new ArrayList<InputSplit>();
//returnVal = new ArrayList<InputSplit>();
final String ns = inputCollection.getFullName();

LOG.info("Running splitvector to check splits against " + inputURI);
final DBObject cmd = BasicDBObjectBuilder.start("splitVector", ns)
final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("splitVector", ns)
.add("keyPattern", splitKey)
// force:True is misbehaving it seems
.add("force", false)
.add("maxChunkSize", splitSize)
.get();
.add("maxChunkSize", splitSize);
if(splitMin != null) {
builder.add( "min", splitMin );
builder.add( "max", splitMax );
}
final DBObject cmd = builder.get();

CommandResult data;
boolean ok = true;
Expand Down Expand Up @@ -151,6 +158,8 @@ public List<InputSplit> calculateSplits() throws SplitFailedException {
}
}
if (data != null && !data.get("ok").equals(1.0)) {
System.out.println("Unable to calculate input splits for cmd:" );
System.out.println(""+cmd );
throw new SplitFailedException("Unable to calculate input splits: " + data.get("errmsg"));
}

Expand All @@ -164,18 +173,7 @@ public List<InputSplit> calculateSplits() throws SplitFailedException {
LOG.warn("WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too"
+ " small, try lowering 'mongo.input.split_size' if this is undesirable.");
}

BasicDBObject lastKey = null; // Lower boundary of the first min split

for (final Object aSplitData : splitData) {
final BasicDBObject currentKey = (BasicDBObject) aSplitData;
returnVal.add(createSplitFromBounds(lastKey, currentKey));
lastKey = currentKey;
}

// Last max split, with empty upper boundary
final MongoInputSplit lastSplit = createSplitFromBounds(lastKey, null);
returnVal.add(lastSplit);
returnVal = createSplits(splitData, splitMin, splitMax);
} finally {
if (inputCollection != null) {
MongoConfigUtil.close(inputCollection.getDB().getMongo());
Expand All @@ -185,4 +183,18 @@ public List<InputSplit> calculateSplits() throws SplitFailedException {
return returnVal;
}

protected List<InputSplit> createSplits(BasicDBList splitData,
BasicDBObject splitMin, BasicDBObject splitMax) throws SplitFailedException {
final ArrayList<InputSplit> returnVal = new ArrayList<InputSplit>();

BasicDBObject lastKey = splitMin;
for( Object aSplitData : splitData ) {
BasicDBObject currentKey = (BasicDBObject)aSplitData;
returnVal.add(createSplitFromBounds(lastKey,currentKey));
lastKey = currentKey;
}
returnVal.add(createSplitFromBounds(lastKey,splitMax));

return returnVal;
}
}
21 changes: 20 additions & 1 deletion core/src/main/java/com/mongodb/hadoop/util/MongoConfigUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,25 @@ public final class MongoConfigUtil {
public static final String SPLITS_USE_RANGEQUERY = "mongo.input.split.use_range_queries";

/**
* The lower bound shard key to use when creating the input splits.<p/>
* Defaults to {@code null}. Values must be provided for both this key
* and {@link #SPLITS_MAX_KEY} in order for the range to be used. Remember
* that if your index is ordered descending then the value for this key
* will actually be greater than the value specified at {@link #SPLITS_MAX_KEY}.
**/
public static final String SPLITS_MIN_KEY = "mongo.input.split.split_key_min";

/**
* The upper bound shard key to use when creating the input splits.<p/>
* Defaults to {@code null}. Values must be provided for both this key
* and {@link #SPLITS_MIN_KEY} in order for the range to be used. Remember
* that if your index is ordered descending then the value for this key
* will actually be less than the value specified at {@link #SPLITS_MIN_KEY}.
**/
public static final String SPLITS_MAX_KEY = "mongo.input.split.split_key_max";

/**
* Shared MongoClient instance cache.
* One client per thread
*/
private static final ThreadLocal<Map<MongoClientURI, MongoClient>> CLIENTS =
Expand Down Expand Up @@ -880,4 +899,4 @@ private static MongoClient getMongoClient(final MongoClientURI uri) throws Unkno
}
return mongoClient;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.mongodb.hadoop.splitter;

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBList;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
Expand All @@ -16,6 +17,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;

public class StandaloneMongoSplitterTest {

Expand Down Expand Up @@ -79,4 +81,103 @@ public void testLowerUpperBounds() throws Exception {
assertEquals(0, split.getMin().get("a"));
assertEquals(10, split.getMax().get("a"));
}

@Test
public void testCreateSplitsNoRange() throws Exception {
Configuration config = new Configuration();
StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config);

BasicDBList list = new BasicDBList();
list.add(new BasicDBObject( "frame", "1500"));
list.add(new BasicDBObject( "frame", "1000"));
list.add(new BasicDBObject( "frame", "500"));

List<InputSplit> splits = splitter.createSplits(list, null, null);
assertEquals(4, splits.size());

MongoInputSplit s1 = (MongoInputSplit)splits.get(0);
assertNull(s1.getMin().get("frame"));
assertEquals("1500", s1.getMax().get("frame"));

MongoInputSplit s2 = (MongoInputSplit)splits.get(1);
assertEquals("1500", s2.getMin().get("frame"));
assertEquals("1000", s2.getMax().get("frame"));

MongoInputSplit s4 = (MongoInputSplit)splits.get(3);
assertEquals("500", s4.getMin().get("frame"));
assertNull(s4.getMax().get("frame"));
}

@Test
public void testCreateSplitsRange() throws Exception {
Configuration config = new Configuration();
StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config);

BasicDBList list = new BasicDBList();
list.add(new BasicDBObject( "frame", "1000"));
list.add(new BasicDBObject( "frame", "1500"));
list.add(new BasicDBObject( "frame", "2000"));

BasicDBObject splitMin = new BasicDBObject("frame", "500");
BasicDBObject splitMax = new BasicDBObject("frame", "2500");
List<InputSplit> splits = splitter.createSplits(list, splitMin, splitMax);
assertEquals(4, splits.size());

MongoInputSplit s1 = (MongoInputSplit)splits.get(0);
assertEquals("500", s1.getMin().get("frame"));
assertEquals("1000", s1.getMax().get("frame"));

MongoInputSplit s2 = (MongoInputSplit)splits.get(1);
assertEquals("1000", s2.getMin().get("frame"));
assertEquals("1500", s2.getMax().get("frame"));

MongoInputSplit s3 = (MongoInputSplit)splits.get(3);
assertEquals("2000", s3.getMin().get("frame"));
assertEquals("2500", s3.getMax().get("frame"));
}

@Test
public void testCreateSplitsRangeDescending() throws Exception {
Configuration config = new Configuration();
StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config);

BasicDBList list = new BasicDBList();
list.add(new BasicDBObject( "frame", "2000"));
list.add(new BasicDBObject( "frame", "1500"));
list.add(new BasicDBObject( "frame", "1000"));

BasicDBObject splitMin = new BasicDBObject("frame", "2500");
BasicDBObject splitMax = new BasicDBObject("frame", "500");
List<InputSplit> splits = splitter.createSplits(list, splitMin, splitMax);
assertEquals(4, splits.size());

MongoInputSplit s1 = (MongoInputSplit)splits.get(0);
assertEquals("2500", s1.getMin().get("frame"));
assertEquals("2000", s1.getMax().get("frame"));

MongoInputSplit s2 = (MongoInputSplit)splits.get(1);
assertEquals("2000", s2.getMin().get("frame"));
assertEquals("1500", s2.getMax().get("frame"));

MongoInputSplit s3 = (MongoInputSplit)splits.get(3);
assertEquals("1000", s3.getMin().get("frame"));
assertEquals("500", s3.getMax().get("frame"));
}

@Test
public void testCreateSingleSplitAscending() throws Exception {
Configuration config = new Configuration();
StandaloneMongoSplitter splitter = new StandaloneMongoSplitter(config);

BasicDBList list = new BasicDBList();
BasicDBObject splitMin = new BasicDBObject("frame", "500");
BasicDBObject splitMax = new BasicDBObject("frame", "2500");

List<InputSplit> splits = splitter.createSplits(list, splitMin, splitMax);
assertEquals(1, splits.size());

MongoInputSplit s1 = (MongoInputSplit)splits.get(0);
assertEquals("500", s1.getMin().get("frame"));
assertEquals("2500", s1.getMax().get("frame"));
}
}