001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.Matchers.any;
025import static org.mockito.Mockito.doAnswer;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.ByteArrayOutputStream;
030import java.io.File;
031import java.io.IOException;
032import java.io.PrintStream;
033import java.net.URL;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.List;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeepDeletedCells;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Durability;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
061import org.apache.hadoop.hbase.filter.Filter;
062import org.apache.hadoop.hbase.filter.FilterBase;
063import org.apache.hadoop.hbase.filter.PrefixFilter;
064import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
065import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
066import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.LauncherSecurityManager;
071import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
072import org.apache.hadoop.hbase.wal.WAL;
073import org.apache.hadoop.hbase.wal.WALEdit;
074import org.apache.hadoop.hbase.wal.WALKey;
075import org.apache.hadoop.mapreduce.Mapper.Context;
076import org.apache.hadoop.util.GenericOptionsParser;
077import org.apache.hadoop.util.ToolRunner;
078import org.junit.After;
079import org.junit.AfterClass;
080import org.junit.Assert;
081import org.junit.Before;
082import org.junit.BeforeClass;
083import org.junit.ClassRule;
084import org.junit.Rule;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.junit.rules.TestName;
088import org.mockito.invocation.InvocationOnMock;
089import org.mockito.stubbing.Answer;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093/**
094 * Tests the table import and table export MR job functionality
095 */
096@Category({VerySlowMapReduceTests.class, MediumTests.class})
097public class TestImportExport {
098
099  @ClassRule
100  public static final HBaseClassTestRule CLASS_RULE =
101      HBaseClassTestRule.forClass(TestImportExport.class);
102
103  private static final Logger LOG = LoggerFactory.getLogger(TestImportExport.class);
104  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
105  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
106  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
107  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
108  private static final String FAMILYA_STRING = "a";
109  private static final String FAMILYB_STRING = "b";
110  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
111  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
112  private static final byte[] QUAL = Bytes.toBytes("q");
113  private static final String OUTPUT_DIR = "outputdir";
114  private static String FQ_OUTPUT_DIR;
115  private static final String EXPORT_BATCH_SIZE = "100";
116
117  private static final long now = System.currentTimeMillis();
118  private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
119  private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
120
121  @BeforeClass
122  public static void beforeClass() throws Throwable {
123    // Up the handlers; this test needs more than usual.
124    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
125    UTIL.startMiniCluster();
126    FQ_OUTPUT_DIR =
127      new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
128  }
129
130  @AfterClass
131  public static void afterClass() throws Throwable {
132    UTIL.shutdownMiniCluster();
133  }
134
135  @Rule
136  public final TestName name = new TestName();
137
138  @Before
139  public void announce() {
140    LOG.info("Running " + name.getMethodName());
141  }
142
143  @After
144  public void cleanup() throws Throwable {
145    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
146    fs.delete(new Path(OUTPUT_DIR), true);
147    if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
148      UTIL.deleteTable(EXPORT_TABLE);
149    }
150    if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
151      UTIL.deleteTable(IMPORT_TABLE);
152    }
153  }
154
155  /**
156   * Runs an export job with the specified command line args
157   * @param args
158   * @return true if job completed successfully
159   * @throws IOException
160   * @throws InterruptedException
161   * @throws ClassNotFoundException
162   */
163  protected boolean runExport(String[] args) throws Throwable {
164    // need to make a copy of the configuration because to make sure different temp dirs are used.
165    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
166    return status == 0;
167  }
168
169  protected void runExportMain(String[] args) throws Throwable {
170    Export.main(args);
171  }
172
173  /**
174   * Runs an import job with the specified command line args
175   * @param args
176   * @return true if job completed successfully
177   * @throws IOException
178   * @throws InterruptedException
179   * @throws ClassNotFoundException
180   */
181  boolean runImport(String[] args) throws Throwable {
182    // need to make a copy of the configuration because to make sure different temp dirs are used.
183    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
184    return status == 0;
185  }
186
187  /**
188   * Test simple replication case with column mapping
189   * @throws Exception
190   */
191  @Test
192  public void testSimpleCase() throws Throwable {
193    try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3)) {
194      Put p = new Put(ROW1);
195      p.addColumn(FAMILYA, QUAL, now, QUAL);
196      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
197      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
198      t.put(p);
199      p = new Put(ROW2);
200      p.addColumn(FAMILYA, QUAL, now, QUAL);
201      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
202      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
203      t.put(p);
204      p = new Put(ROW3);
205      p.addColumn(FAMILYA, QUAL, now, QUAL);
206      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
207      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
208      t.put(p);
209    }
210
211    String[] args = new String[] {
212      // Only export row1 & row2.
213      "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
214      "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
215      name.getMethodName(),
216      FQ_OUTPUT_DIR,
217      "1000", // max number of key versions per key to export
218    };
219    assertTrue(runExport(args));
220
221    final String IMPORT_TABLE = name.getMethodName() + "import";
222    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3)) {
223      args = new String[] {
224        "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
225        IMPORT_TABLE,
226        FQ_OUTPUT_DIR
227      };
228      assertTrue(runImport(args));
229
230      Get g = new Get(ROW1);
231      g.readAllVersions();
232      Result r = t.get(g);
233      assertEquals(3, r.size());
234      g = new Get(ROW2);
235      g.readAllVersions();
236      r = t.get(g);
237      assertEquals(3, r.size());
238      g = new Get(ROW3);
239      r = t.get(g);
240      assertEquals(0, r.size());
241    }
242  }
243
244  /**
245   * Test export hbase:meta table
246   *
247   * @throws Throwable
248   */
249  @Test
250  public void testMetaExport() throws Throwable {
251    String[] args = new String[] { TableName.META_TABLE_NAME.getNameAsString(),
252      FQ_OUTPUT_DIR, "1", "0", "0" };
253    assertTrue(runExport(args));
254  }
255
256  /**
257   * Test import data from 0.94 exported file
258   * @throws Throwable
259   */
260  @Test
261  public void testImport94Table() throws Throwable {
262    final String name = "exportedTableIn94Format";
263    URL url = TestImportExport.class.getResource(name);
264    File f = new File(url.toURI());
265    if (!f.exists()) {
266      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
267      return;
268    }
269    assertTrue(f.exists());
270    LOG.info("FILE=" + f);
271    Path importPath = new Path(f.toURI());
272    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
273    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
274    String IMPORT_TABLE = name;
275    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3)) {
276      String[] args = new String[] {
277              "-Dhbase.import.version=0.94" ,
278              IMPORT_TABLE, FQ_OUTPUT_DIR
279      };
280      assertTrue(runImport(args));
281      /* exportedTableIn94Format contains 5 rows
282      ROW         COLUMN+CELL
283      r1          column=f1:c1, timestamp=1383766761171, value=val1
284      r2          column=f1:c1, timestamp=1383766771642, value=val2
285      r3          column=f1:c1, timestamp=1383766777615, value=val3
286      r4          column=f1:c1, timestamp=1383766785146, value=val4
287      r5          column=f1:c1, timestamp=1383766791506, value=val5
288      */
289     assertEquals(5, UTIL.countRows(t));
290    }
291  }
292
293  /**
294   * Test export scanner batching
295   */
296   @Test
297   public void testExportScannerBatching() throws Throwable {
298    TableDescriptor desc = TableDescriptorBuilder
299            .newBuilder(TableName.valueOf(name.getMethodName()))
300            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
301              .setMaxVersions(1)
302              .build())
303            .build();
304    UTIL.getAdmin().createTable(desc);
305    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
306      Put p = new Put(ROW1);
307      p.addColumn(FAMILYA, QUAL, now, QUAL);
308      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
309      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
310      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
311      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
312      t.put(p);
313
314      String[] args = new String[] {
315          "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added scanner batching arg.
316          name.getMethodName(),
317          FQ_OUTPUT_DIR
318      };
319      assertTrue(runExport(args));
320
321      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
322      fs.delete(new Path(FQ_OUTPUT_DIR), true);
323    }
324  }
325
326  @Test
327  public void testWithDeletes() throws Throwable {
328    TableDescriptor desc = TableDescriptorBuilder
329            .newBuilder(TableName.valueOf(name.getMethodName()))
330            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
331              .setMaxVersions(5)
332              .setKeepDeletedCells(KeepDeletedCells.TRUE)
333              .build())
334            .build();
335    UTIL.getAdmin().createTable(desc);
336    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
337      Put p = new Put(ROW1);
338      p.addColumn(FAMILYA, QUAL, now, QUAL);
339      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
340      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
341      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
342      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
343      t.put(p);
344
345      Delete d = new Delete(ROW1, now+3);
346      t.delete(d);
347      d = new Delete(ROW1);
348      d.addColumns(FAMILYA, QUAL, now+2);
349      t.delete(d);
350    }
351
352    String[] args = new String[] {
353        "-D" + ExportUtils.RAW_SCAN + "=true",
354        name.getMethodName(),
355        FQ_OUTPUT_DIR,
356        "1000", // max number of key versions per key to export
357    };
358    assertTrue(runExport(args));
359
360    final String IMPORT_TABLE = name.getMethodName() + "import";
361    desc = TableDescriptorBuilder
362            .newBuilder(TableName.valueOf(IMPORT_TABLE))
363            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
364              .setMaxVersions(5)
365              .setKeepDeletedCells(KeepDeletedCells.TRUE)
366              .build())
367            .build();
368    UTIL.getAdmin().createTable(desc);
369    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
370      args = new String[] {
371          IMPORT_TABLE,
372          FQ_OUTPUT_DIR
373      };
374      assertTrue(runImport(args));
375
376      Scan s = new Scan();
377      s.readAllVersions();
378      s.setRaw(true);
379      ResultScanner scanner = t.getScanner(s);
380      Result r = scanner.next();
381      Cell[] res = r.rawCells();
382      assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
383      assertEquals(now+4, res[1].getTimestamp());
384      assertEquals(now+3, res[2].getTimestamp());
385      assertTrue(CellUtil.isDelete(res[3]));
386      assertEquals(now+2, res[4].getTimestamp());
387      assertEquals(now+1, res[5].getTimestamp());
388      assertEquals(now, res[6].getTimestamp());
389    }
390  }
391
392
393  @Test
394  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
395    final TableName exportTable = TableName.valueOf(name.getMethodName());
396    TableDescriptor desc = TableDescriptorBuilder
397            .newBuilder(TableName.valueOf(name.getMethodName()))
398            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
399              .setMaxVersions(5)
400              .setKeepDeletedCells(KeepDeletedCells.TRUE)
401              .build())
402            .build();
403    UTIL.getAdmin().createTable(desc);
404
405    Table exportT = UTIL.getConnection().getTable(exportTable);
406
407    //Add first version of QUAL
408    Put p = new Put(ROW1);
409    p.addColumn(FAMILYA, QUAL, now, QUAL);
410    exportT.put(p);
411
412    //Add Delete family marker
413    Delete d = new Delete(ROW1, now+3);
414    exportT.delete(d);
415
416    //Add second version of QUAL
417    p = new Put(ROW1);
418    p.addColumn(FAMILYA, QUAL, now + 5, Bytes.toBytes("s"));
419    exportT.put(p);
420
421    //Add second Delete family marker
422    d = new Delete(ROW1, now+7);
423    exportT.delete(d);
424
425
426    String[] args = new String[] {
427        "-D" + ExportUtils.RAW_SCAN + "=true", exportTable.getNameAsString(),
428        FQ_OUTPUT_DIR,
429        "1000", // max number of key versions per key to export
430    };
431    assertTrue(runExport(args));
432
433    final String importTable = name.getMethodName() + "import";
434    desc = TableDescriptorBuilder
435            .newBuilder(TableName.valueOf(importTable))
436            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
437              .setMaxVersions(5)
438              .setKeepDeletedCells(KeepDeletedCells.TRUE)
439              .build())
440            .build();
441    UTIL.getAdmin().createTable(desc);
442
443    Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
444    args = new String[] {
445        importTable,
446        FQ_OUTPUT_DIR
447    };
448    assertTrue(runImport(args));
449
450    Scan s = new Scan();
451    s.readAllVersions();
452    s.setRaw(true);
453
454    ResultScanner importedTScanner = importT.getScanner(s);
455    Result importedTResult = importedTScanner.next();
456
457    ResultScanner exportedTScanner = exportT.getScanner(s);
458    Result  exportedTResult =  exportedTScanner.next();
459    try {
460      Result.compareResults(exportedTResult, importedTResult);
461    } catch (Throwable e) {
462      fail("Original and imported tables data comparision failed with error:"+e.getMessage());
463    } finally {
464      exportT.close();
465      importT.close();
466    }
467  }
468
469  /**
470   * Create a simple table, run an Export Job on it, Import with filtering on,  verify counts,
471   * attempt with invalid values.
472   */
473  @Test
474  public void testWithFilter() throws Throwable {
475    // Create simple table to export
476    TableDescriptor desc = TableDescriptorBuilder
477            .newBuilder(TableName.valueOf(name.getMethodName()))
478            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
479              .setMaxVersions(5)
480              .build())
481            .build();
482    UTIL.getAdmin().createTable(desc);
483    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
484
485    Put p1 = new Put(ROW1);
486    p1.addColumn(FAMILYA, QUAL, now, QUAL);
487    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
488    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
489    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
490    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
491
492    // Having another row would actually test the filter.
493    Put p2 = new Put(ROW2);
494    p2.addColumn(FAMILYA, QUAL, now, QUAL);
495
496    exportTable.put(Arrays.asList(p1, p2));
497
498    // Export the simple table
499    String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
500    assertTrue(runExport(args));
501
502    // Import to a new table
503    final String IMPORT_TABLE = name.getMethodName() + "import";
504    desc = TableDescriptorBuilder
505            .newBuilder(TableName.valueOf(IMPORT_TABLE))
506            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
507              .setMaxVersions(5)
508              .build())
509            .build();
510    UTIL.getAdmin().createTable(desc);
511
512    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
513    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
514        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
515        FQ_OUTPUT_DIR,
516        "1000" };
517    assertTrue(runImport(args));
518
519    // get the count of the source table for that time range
520    PrefixFilter filter = new PrefixFilter(ROW1);
521    int count = getCount(exportTable, filter);
522
523    Assert.assertEquals("Unexpected row count between export and import tables", count,
524      getCount(importTable, null));
525
526    // and then test that a broken command doesn't bork everything - easier here because we don't
527    // need to re-run the export job
528
529    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
530        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
531        FQ_OUTPUT_DIR, "1000" };
532    assertFalse(runImport(args));
533
534    // cleanup
535    exportTable.close();
536    importTable.close();
537  }
538
539  /**
540   * Count the number of keyvalues in the specified table with the given filter
541   * @param table the table to scan
542   * @return the number of keyvalues found
543   * @throws IOException
544   */
545  private int getCount(Table table, Filter filter) throws IOException {
546    Scan scan = new Scan();
547    scan.setFilter(filter);
548    ResultScanner results = table.getScanner(scan);
549    int count = 0;
550    for (Result res : results) {
551      count += res.size();
552    }
553    results.close();
554    return count;
555  }
556
557  /**
558   * test main method. Import should print help and call System.exit
559   */
560  @Test
561  public void testImportMain() throws Throwable {
562    PrintStream oldPrintStream = System.err;
563    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
564    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
565    System.setSecurityManager(newSecurityManager);
566    ByteArrayOutputStream data = new ByteArrayOutputStream();
567    String[] args = {};
568    System.setErr(new PrintStream(data));
569    try {
570      System.setErr(new PrintStream(data));
571      Import.main(args);
572      fail("should be SecurityException");
573    } catch (SecurityException e) {
574      assertEquals(-1, newSecurityManager.getExitCode());
575      assertTrue(data.toString().contains("Wrong number of arguments:"));
576      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
577      assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
578      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
579      assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
580    } finally {
581      System.setErr(oldPrintStream);
582      System.setSecurityManager(SECURITY_MANAGER);
583    }
584  }
585
586  @Test
587  public void testExportScan() throws Exception {
588    int version = 100;
589    long startTime = System.currentTimeMillis();
590    long endTime = startTime + 1;
591    String prefix = "row";
592    String label_0 = "label_0";
593    String label_1 = "label_1";
594    String[] args = {
595      "table",
596      "outputDir",
597      String.valueOf(version),
598      String.valueOf(startTime),
599      String.valueOf(endTime),
600      prefix
601    };
602    Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
603    assertEquals(version, scan.getMaxVersions());
604    assertEquals(startTime, scan.getTimeRange().getMin());
605    assertEquals(endTime, scan.getTimeRange().getMax());
606    assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
607    assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
608    String[] argsWithLabels = {
609      "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1,
610      "table",
611      "outputDir",
612      String.valueOf(version),
613      String.valueOf(startTime),
614      String.valueOf(endTime),
615      prefix
616    };
617    Configuration conf = new Configuration(UTIL.getConfiguration());
618    // parse the "-D" options
619    String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
620    Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
621    assertEquals(version, scanWithLabels.getMaxVersions());
622    assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
623    assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
624    assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
625    assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
626    assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
627    assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
628    assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
629  }
630
631  /**
632   * test main method. Export should print help and call System.exit
633   */
634  @Test
635  public void testExportMain() throws Throwable {
636    PrintStream oldPrintStream = System.err;
637    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
638    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
639    System.setSecurityManager(newSecurityManager);
640    ByteArrayOutputStream data = new ByteArrayOutputStream();
641    String[] args = {};
642    System.setErr(new PrintStream(data));
643    try {
644      System.setErr(new PrintStream(data));
645      runExportMain(args);
646      fail("should be SecurityException");
647    } catch (SecurityException e) {
648      assertEquals(-1, newSecurityManager.getExitCode());
649      String errMsg = data.toString();
650      assertTrue(errMsg.contains("Wrong number of arguments:"));
651      assertTrue(errMsg.contains(
652              "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
653              "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
654      assertTrue(
655        errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
656      assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
657      assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
658      assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
659      assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
660    } finally {
661      System.setErr(oldPrintStream);
662      System.setSecurityManager(SECURITY_MANAGER);
663    }
664  }
665
666  /**
667   * Test map method of Importer
668   */
669  @SuppressWarnings({ "unchecked", "rawtypes" })
670  @Test
671  public void testKeyValueImporter() throws Throwable {
672    CellImporter importer = new CellImporter();
673    Configuration configuration = new Configuration();
674    Context ctx = mock(Context.class);
675    when(ctx.getConfiguration()).thenReturn(configuration);
676
677    doAnswer(new Answer<Void>() {
678
679      @Override
680      public Void answer(InvocationOnMock invocation) throws Throwable {
681        ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0);
682        MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1);
683        assertEquals("Key", Bytes.toString(writer.get()));
684        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
685        return null;
686      }
687    }).when(ctx).write(any(), any());
688
689    importer.setup(ctx);
690    Result value = mock(Result.class);
691    KeyValue[] keys = {
692        new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
693            Bytes.toBytes("value")),
694        new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
695            Bytes.toBytes("value1")) };
696    when(value.rawCells()).thenReturn(keys);
697    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
698
699  }
700
701  /**
702   * Test addFilterAndArguments method of Import This method set couple
703   * parameters into Configuration
704   */
705  @Test
706  public void testAddFilterAndArguments() throws IOException {
707    Configuration configuration = new Configuration();
708
709    List<String> args = new ArrayList<>();
710    args.add("param1");
711    args.add("param2");
712
713    Import.addFilterAndArguments(configuration, FilterBase.class, args);
714    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
715        configuration.get(Import.FILTER_CLASS_CONF_KEY));
716    assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
717  }
718
719  @Test
720  public void testDurability() throws Throwable {
721    // Create an export table.
722    String exportTableName = name.getMethodName() + "export";
723    try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3)) {
724      // Insert some data
725      Put put = new Put(ROW1);
726      put.addColumn(FAMILYA, QUAL, now, QUAL);
727      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
728      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
729      exportTable.put(put);
730
731      put = new Put(ROW2);
732      put.addColumn(FAMILYA, QUAL, now, QUAL);
733      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
734      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
735      exportTable.put(put);
736
737      // Run the export
738      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
739      assertTrue(runExport(args));
740
741      // Create the table for import
742      String importTableName = name.getMethodName() + "import1";
743      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
744
745      // Register the wal listener for the import table
746      RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
747          .getRegions(importTable.getName()).get(0).getRegionInfo();
748      TableWALActionListener walListener = new TableWALActionListener(region);
749      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
750      wal.registerWALActionsListener(walListener);
751
752      // Run the import with SKIP_WAL
753      args =
754          new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
755              importTableName, FQ_OUTPUT_DIR };
756      assertTrue(runImport(args));
757      //Assert that the wal is not visisted
758      assertTrue(!walListener.isWALVisited());
759      //Ensure that the count is 2 (only one version of key value is obtained)
760      assertTrue(getCount(importTable, null) == 2);
761
762      // Run the import with the default durability option
763      importTableName = name.getMethodName() + "import2";
764      importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
765      region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
766          .getRegions(importTable.getName()).get(0).getRegionInfo();
767      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
768      walListener = new TableWALActionListener(region);
769      wal.registerWALActionsListener(walListener);
770      args = new String[] { importTableName, FQ_OUTPUT_DIR };
771      assertTrue(runImport(args));
772      //Assert that the wal is visisted
773      assertTrue(walListener.isWALVisited());
774      //Ensure that the count is 2 (only one version of key value is obtained)
775      assertTrue(getCount(importTable, null) == 2);
776    }
777  }
778
779  /**
780   * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to
781   * identify that an entry is written to the Write Ahead Log for the given table.
782   */
783  private static class TableWALActionListener implements WALActionsListener {
784
785    private RegionInfo regionInfo;
786    private boolean isVisited = false;
787
788    public TableWALActionListener(RegionInfo region) {
789      this.regionInfo = region;
790    }
791
792    @Override
793    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
794      if (logKey.getTableName().getNameAsString().equalsIgnoreCase(
795          this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) {
796        isVisited = true;
797      }
798    }
799
800    public boolean isWALVisited() {
801      return isVisited;
802    }
803  }
804}