001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
021import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
022import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HDFSBlocksDistribution;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Admin;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
044import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.RegionSplitter;
050import org.apache.hadoop.io.NullWritable;
051import org.apache.hadoop.mapreduce.InputSplit;
052import org.apache.hadoop.mapreduce.Job;
053import org.apache.hadoop.mapreduce.RecordReader;
054import org.apache.hadoop.mapreduce.Reducer;
055import org.apache.hadoop.mapreduce.TaskAttemptContext;
056import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
057import org.junit.Assert;
058import org.junit.ClassRule;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
067
068@Category({VerySlowMapReduceTests.class, LargeTests.class})
069public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestTableSnapshotInputFormat.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotInputFormat.class);
076
077  private static final byte[] bbb = Bytes.toBytes("bbb");
078  private static final byte[] yyy = Bytes.toBytes("yyy");
079  private static final byte[] bbc = Bytes.toBytes("bbc");
080  private static final byte[] yya = Bytes.toBytes("yya");
081
082  @Rule
083  public TestName name = new TestName();
084
085  @Override
086  protected byte[] getStartRow() {
087    return bbb;
088  }
089
090  @Override
091  protected byte[] getEndRow() {
092    return yyy;
093  }
094
095
096  @Test
097  public void testGetBestLocations() throws IOException {
098    TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
099    Configuration conf = UTIL.getConfiguration();
100
101    HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
102    Assert.assertEquals(null,
103      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
104
105    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
106    Assert.assertEquals(Lists.newArrayList("h1"),
107      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
108
109    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
110    Assert.assertEquals(Lists.newArrayList("h1"),
111      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
112
113    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
114    Assert.assertEquals(Lists.newArrayList("h1"),
115      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
116
117    blockDistribution = new HDFSBlocksDistribution();
118    blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
119    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
120    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
121    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
122    Assert.assertEquals(Lists.newArrayList("h1"),
123      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
124
125    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
126    Assert.assertEquals(Lists.newArrayList("h1", "h2"),
127      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
128
129    blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
130    Assert.assertEquals(Lists.newArrayList("h2", "h1"),
131      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
132
133    blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
134    blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
135
136    Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4"),
137      TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
138  }
139
140  public static enum TestTableSnapshotCounters {
141    VALIDATION_ERROR
142  }
143
144  public static class TestTableSnapshotMapper
145    extends TableMapper<ImmutableBytesWritable, NullWritable> {
146    @Override
147    protected void map(ImmutableBytesWritable key, Result value,
148        Context context) throws IOException, InterruptedException {
149      // Validate a single row coming from the snapshot, and emit the row key
150      verifyRowFromMap(key, value);
151      context.write(key, NullWritable.get());
152    }
153  }
154
155  public static class TestTableSnapshotReducer
156    extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
157    HBaseTestingUtility.SeenRowTracker rowTracker =
158        new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
159    @Override
160    protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
161       Context context) throws IOException, InterruptedException {
162      rowTracker.addRow(key.get());
163    }
164
165    @Override
166    protected void cleanup(Context context) throws IOException,
167        InterruptedException {
168      rowTracker.validate();
169    }
170  }
171
172  @Test
173  public void testInitTableSnapshotMapperJobConfig() throws Exception {
174    final TableName tableName = TableName.valueOf(name.getMethodName());
175    String snapshotName = "foo";
176
177    try {
178      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
179      Job job = new Job(UTIL.getConfiguration());
180      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
181
182      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
183        new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
184        NullWritable.class, job, false, tmpTableDir);
185
186      // TODO: would be better to examine directly the cache instance that results from this
187      // config. Currently this is not possible because BlockCache initialization is static.
188      Assert.assertEquals(
189        "Snapshot job should be configured for default LruBlockCache.",
190        HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
191        job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
192      Assert.assertEquals(
193        "Snapshot job should not use BucketCache.",
194        0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
195    } finally {
196      UTIL.getAdmin().deleteSnapshot(snapshotName);
197      UTIL.deleteTable(tableName);
198    }
199  }
200
201  @Override
202  public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
203      String snapshotName, Path tmpTableDir) throws Exception {
204    Job job = new Job(UTIL.getConfiguration());
205    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
206      new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
207      NullWritable.class, job, false, tmpTableDir);
208  }
209
210  @Override
211  public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
212      int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
213      throws Exception {
214    final TableName tableName = TableName.valueOf(name.getMethodName());
215    try {
216      createTableAndSnapshot(
217        util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
218
219      Configuration conf = util.getConfiguration();
220      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
221      Job job = new Job(conf);
222      Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
223      Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
224
225      if (numSplitsPerRegion > 1) {
226        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
227                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
228                NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
229                numSplitsPerRegion);
230      } else {
231        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
232                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
233                NullWritable.class, job, false, tmpTableDir);
234      }
235
236      verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
237
238    } finally {
239      util.getAdmin().deleteSnapshot(snapshotName);
240      util.deleteTable(tableName);
241    }
242  }
243
244  @Test
245  public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
246    String snapshotName = "testWithMockedMapReduceMultiRegion";
247    final TableName tableName = TableName.valueOf(name.getMethodName());
248    try {
249      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
250
251      Configuration conf = UTIL.getConfiguration();
252      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
253      Job job = new Job(conf);
254      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
255      // test scan with startRow and stopRow
256      Scan scan = new Scan(bbc, yya);
257
258      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
259        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
260        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
261
262      verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
263    } finally {
264      UTIL.getAdmin().deleteSnapshot(snapshotName);
265      UTIL.deleteTable(tableName);
266    }
267  }
268
269  @Test
270  public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
271    String snapshotName = "testWithMockedMapReduceMultiRegion";
272    final TableName tableName = TableName.valueOf(name.getMethodName());
273    try {
274      createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
275
276      Configuration conf = UTIL.getConfiguration();
277      conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
278      Job job = new Job(conf);
279      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
280      // test scan without startRow and stopRow
281      Scan scan2 = new Scan();
282
283      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
284        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
285        tmpTableDir, new RegionSplitter.UniformSplit(), 5);
286
287      verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
288        HConstants.EMPTY_START_ROW);
289
290    } finally {
291      UTIL.getAdmin().deleteSnapshot(snapshotName);
292      UTIL.deleteTable(tableName);
293    }
294  }
295
296  @Test
297  public void testScanLimit() throws Exception {
298    final TableName tableName = TableName.valueOf(name.getMethodName());
299    final String snapshotName = tableName + "Snapshot";
300    Table table = null;
301    try {
302      UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
303      if (UTIL.getAdmin().tableExists(tableName)) {
304        UTIL.deleteTable(tableName);
305      }
306
307      UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
308
309      Admin admin = UTIL.getAdmin();
310
311      int regionNum = admin.getRegions(tableName).size();
312      // put some stuff in the table
313      table = UTIL.getConnection().getTable(tableName);
314      UTIL.loadTable(table, FAMILIES);
315
316      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
317      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
318
319      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
320        null, snapshotName, rootDir, fs, true);
321
322      Job job = new Job(UTIL.getConfiguration());
323      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
324      Scan scan = new Scan();
325      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
326        TestTableSnapshotInputFormat.class);
327
328      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
329        RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
330        tmpTableDir);
331      Assert.assertTrue(job.waitForCompletion(true));
332      Assert.assertEquals(10 * regionNum,
333        job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
334    } finally {
335      if (table != null) {
336        table.close();
337      }
338      UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
339      UTIL.getAdmin().deleteSnapshot(snapshotName);
340      UTIL.deleteTable(tableName);
341    }
342  }
343
344  @Test
345  public void testNoDuplicateResultsWhenSplitting() throws Exception {
346    TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
347    String snapshotName = "testSnapshotBug";
348    try {
349      if (UTIL.getAdmin().tableExists(tableName)) {
350        UTIL.deleteTable(tableName);
351      }
352
353      UTIL.createTable(tableName, FAMILIES);
354      Admin admin = UTIL.getAdmin();
355
356      // put some stuff in the table
357      Table table = UTIL.getConnection().getTable(tableName);
358      UTIL.loadTable(table, FAMILIES);
359
360      // split to 2 regions
361      admin.split(tableName, Bytes.toBytes("eee"));
362      TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
363
364      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
365      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
366
367      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
368        null, snapshotName, rootDir, fs, true);
369
370      // load different values
371      byte[] value = Bytes.toBytes("after_snapshot_value");
372      UTIL.loadTable(table, FAMILIES, value);
373
374      // cause flush to create new files in the region
375      admin.flush(tableName);
376      table.close();
377
378      Job job = new Job(UTIL.getConfiguration());
379      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
380      // limit the scan
381      Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
382
383      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
384        TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
385        tmpTableDir);
386
387      verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
388    } finally {
389      UTIL.getAdmin().deleteSnapshot(snapshotName);
390      UTIL.deleteTable(tableName);
391    }
392  }
393
394  private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
395      byte[] startRow, byte[] stopRow)
396      throws IOException, InterruptedException {
397    TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
398    List<InputSplit> splits = tsif.getSplits(job);
399
400    Assert.assertEquals(expectedNumSplits, splits.size());
401
402    HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow,
403        stopRow.length > 0 ? stopRow : Bytes.toBytes("\uffff"));
404
405    boolean localityEnabled =
406        job.getConfiguration().getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
407                                          SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
408
409    for (int i = 0; i < splits.size(); i++) {
410      // validate input split
411      InputSplit split = splits.get(i);
412      Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
413      TableSnapshotRegionSplit snapshotRegionSplit = (TableSnapshotRegionSplit) split;
414      if (localityEnabled) {
415        Assert.assertTrue(split.getLocations() != null && split.getLocations().length != 0);
416      } else {
417        Assert.assertTrue(split.getLocations() != null && split.getLocations().length == 0);
418      }
419
420      Scan scan =
421          TableMapReduceUtil.convertStringToScan(snapshotRegionSplit.getDelegate().getScan());
422      if (startRow.length > 0) {
423        Assert.assertTrue(
424          Bytes.toStringBinary(startRow) + " should <= " + Bytes.toStringBinary(scan.getStartRow()),
425          Bytes.compareTo(startRow, scan.getStartRow()) <= 0);
426      }
427      if (stopRow.length > 0) {
428        Assert.assertTrue(
429          Bytes.toStringBinary(stopRow) + " should >= " + Bytes.toStringBinary(scan.getStopRow()),
430          Bytes.compareTo(stopRow, scan.getStopRow()) >= 0);
431      }
432      Assert.assertTrue("startRow should < stopRow",
433        Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) < 0);
434
435      // validate record reader
436      TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
437      when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
438      RecordReader<ImmutableBytesWritable, Result> rr =
439          tsif.createRecordReader(split, taskAttemptContext);
440      rr.initialize(split, taskAttemptContext);
441
442      // validate we can read all the data back
443      while (rr.nextKeyValue()) {
444        byte[] row = rr.getCurrentKey().get();
445        verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
446        rowTracker.addRow(row);
447      }
448
449      rr.close();
450    }
451
452    // validate all rows are seen
453    rowTracker.validate();
454  }
455
456  @Override
457  protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
458      String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion,
459      int expectedNumSplits, boolean shutdownCluster) throws Exception {
460    doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
461      numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
462  }
463
464  // this is also called by the IntegrationTestTableSnapshotInputFormat
465  public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
466      String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
467      int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
468
469    LOG.info("testing with MapReduce");
470
471    LOG.info("create the table and snapshot");
472    createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
473
474    if (shutdownCluster) {
475      LOG.info("shutting down hbase cluster.");
476      util.shutdownMiniHBaseCluster();
477    }
478
479    try {
480      // create the job
481      Job job = new Job(util.getConfiguration());
482      Scan scan = new Scan(startRow, endRow); // limit the scan
483
484      job.setJarByClass(util.getClass());
485      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
486              TestTableSnapshotInputFormat.class);
487
488      if (numSplitsPerRegion > 1) {
489        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
490                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
491                NullWritable.class, job, true, tableDir, new RegionSplitter.UniformSplit(),
492                numSplitsPerRegion);
493      } else {
494        TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
495                scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
496                NullWritable.class, job, true, tableDir);
497      }
498
499      job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
500      job.setNumReduceTasks(1);
501      job.setOutputFormatClass(NullOutputFormat.class);
502
503      Assert.assertTrue(job.waitForCompletion(true));
504    } finally {
505      if (!shutdownCluster) {
506        util.getAdmin().deleteSnapshot(snapshotName);
507        util.deleteTable(tableName);
508      }
509    }
510  }
511
512  @Test
513  public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
514    testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
515  }
516}