001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.doAnswer;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.ByteArrayOutputStream;
030import java.io.File;
031import java.io.IOException;
032import java.io.PrintStream;
033import java.net.URL;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.List;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeepDeletedCells;
046import org.apache.hadoop.hbase.KeyValue;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Durability;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableDescriptor;
060import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
061import org.apache.hadoop.hbase.filter.Filter;
062import org.apache.hadoop.hbase.filter.FilterBase;
063import org.apache.hadoop.hbase.filter.PrefixFilter;
064import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
065import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
066import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
067import org.apache.hadoop.hbase.testclassification.MediumTests;
068import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.LauncherSecurityManager;
071import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
072import org.apache.hadoop.hbase.wal.WAL;
073import org.apache.hadoop.hbase.wal.WALEdit;
074import org.apache.hadoop.hbase.wal.WALKey;
075import org.apache.hadoop.mapreduce.Mapper.Context;
076import org.apache.hadoop.util.GenericOptionsParser;
077import org.apache.hadoop.util.ToolRunner;
078import org.junit.After;
079import org.junit.AfterClass;
080import org.junit.Assert;
081import org.junit.Before;
082import org.junit.BeforeClass;
083import org.junit.ClassRule;
084import org.junit.Rule;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.junit.rules.TestName;
088import org.mockito.invocation.InvocationOnMock;
089import org.mockito.stubbing.Answer;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093/**
094 * Tests the table import and table export MR job functionality
095 */
096@Category({VerySlowMapReduceTests.class, MediumTests.class})
097public class TestCellBasedImportExport2 {
098
099  @ClassRule
100  public static final HBaseClassTestRule CLASS_RULE =
101      HBaseClassTestRule.forClass(TestCellBasedImportExport2.class);
102
103  private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedImportExport2.class);
104  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
105  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
106  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
107  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
108  private static final String FAMILYA_STRING = "a";
109  private static final String FAMILYB_STRING = "b";
110  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
111  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
112  private static final byte[] QUAL = Bytes.toBytes("q");
113  private static final String OUTPUT_DIR = "outputdir";
114  private static String FQ_OUTPUT_DIR;
115  private static final String EXPORT_BATCH_SIZE = "100";
116
117  private static final long now = System.currentTimeMillis();
118  private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
119  private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
120
121  @BeforeClass
122  public static void beforeClass() throws Throwable {
123    // Up the handlers; this test needs more than usual.
124    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
125    UTIL.startMiniCluster();
126    FQ_OUTPUT_DIR =
127      new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
128  }
129
130  @AfterClass
131  public static void afterClass() throws Throwable {
132    UTIL.shutdownMiniCluster();
133  }
134
135  @Rule
136  public final TestName name = new TestName();
137
138  @Before
139  public void announce() {
140    LOG.info("Running " + name.getMethodName());
141  }
142
143  @After
144  public void cleanup() throws Throwable {
145    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
146    fs.delete(new Path(OUTPUT_DIR), true);
147    if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
148      UTIL.deleteTable(EXPORT_TABLE);
149    }
150    if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
151      UTIL.deleteTable(IMPORT_TABLE);
152    }
153  }
154
155  /**
156   * Runs an export job with the specified command line args
157   * @param args
158   * @return true if job completed successfully
159   * @throws IOException
160   * @throws InterruptedException
161   * @throws ClassNotFoundException
162   */
163  protected boolean runExport(String[] args) throws Throwable {
164    // need to make a copy of the configuration because to make sure different temp dirs are used.
165    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
166    return status == 0;
167  }
168
169  protected void runExportMain(String[] args) throws Throwable {
170    Export.main(args);
171  }
172
173  /**
174   * Runs an import job with the specified command line args
175   * @param args
176   * @return true if job completed successfully
177   * @throws IOException
178   * @throws InterruptedException
179   * @throws ClassNotFoundException
180   */
181  boolean runImport(String[] args) throws Throwable {
182    // need to make a copy of the configuration because to make sure different temp dirs are used.
183    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
184    return status == 0;
185  }
186
187  /**
188   * Test simple replication case with column mapping
189   * @throws Exception
190   */
191  @Test
192  public void testSimpleCase() throws Throwable {
193    try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) {
194      Put p = new Put(ROW1);
195      p.addColumn(FAMILYA, QUAL, now, QUAL);
196      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
197      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
198      t.put(p);
199      p = new Put(ROW2);
200      p.addColumn(FAMILYA, QUAL, now, QUAL);
201      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
202      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
203      t.put(p);
204      p = new Put(ROW3);
205      p.addColumn(FAMILYA, QUAL, now, QUAL);
206      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
207      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
208      t.put(p);
209    }
210
211      String[] args = new String[] {
212          // Only export row1 & row2.
213          "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
214          "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
215          name.getMethodName(),
216          FQ_OUTPUT_DIR,
217          "1000", // max number of key versions per key to export
218      };
219      assertTrue(runExport(args));
220
221      final String IMPORT_TABLE = name.getMethodName() + "import";
222      try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
223        args = new String[] {
224            "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
225            IMPORT_TABLE,
226            FQ_OUTPUT_DIR
227        };
228        assertTrue(runImport(args));
229
230        Get g = new Get(ROW1);
231        g.setMaxVersions();
232        Result r = t.get(g);
233        assertEquals(3, r.size());
234        g = new Get(ROW2);
235        g.setMaxVersions();
236        r = t.get(g);
237        assertEquals(3, r.size());
238        g = new Get(ROW3);
239        r = t.get(g);
240        assertEquals(0, r.size());
241      }
242  }
243
244  /**
245   * Test export hbase:meta table
246   *
247   * @throws Throwable
248   */
249  @Test
250  public void testMetaExport() throws Throwable {
251    String[] args = new String[] { TableName.META_TABLE_NAME.getNameAsString(),
252      FQ_OUTPUT_DIR, "1", "0", "0" };
253    assertTrue(runExport(args));
254  }
255
256  /**
257   * Test import data from 0.94 exported file
258   * @throws Throwable
259   */
260  @Test
261  public void testImport94Table() throws Throwable {
262    final String name = "exportedTableIn94Format";
263    URL url = TestCellBasedImportExport2.class.getResource(name);
264    File f = new File(url.toURI());
265    if (!f.exists()) {
266      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
267      return;
268    }
269    assertTrue(f.exists());
270    LOG.info("FILE=" + f);
271    Path importPath = new Path(f.toURI());
272    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
273    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
274    String IMPORT_TABLE = name;
275    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
276      String[] args = new String[] {
277              "-Dhbase.import.version=0.94" ,
278              IMPORT_TABLE, FQ_OUTPUT_DIR
279      };
280      assertTrue(runImport(args));
281      /* exportedTableIn94Format contains 5 rows
282      ROW         COLUMN+CELL
283      r1          column=f1:c1, timestamp=1383766761171, value=val1
284      r2          column=f1:c1, timestamp=1383766771642, value=val2
285      r3          column=f1:c1, timestamp=1383766777615, value=val3
286      r4          column=f1:c1, timestamp=1383766785146, value=val4
287      r5          column=f1:c1, timestamp=1383766791506, value=val5
288      */
289     assertEquals(5, UTIL.countRows(t));
290    }
291  }
292
293  /**
294   * Test export scanner batching
295   */
296   @Test
297   public void testExportScannerBatching() throws Throwable {
298    TableDescriptor desc = TableDescriptorBuilder
299            .newBuilder(TableName.valueOf(name.getMethodName()))
300            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
301              .setMaxVersions(1)
302              .build())
303            .build();
304    UTIL.getAdmin().createTable(desc);
305    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
306
307      Put p = new Put(ROW1);
308      p.addColumn(FAMILYA, QUAL, now, QUAL);
309      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
310      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
311      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
312      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
313      t.put(p);
314
315      String[] args = new String[] {
316          "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added scanner batching arg.
317          name.getMethodName(),
318          FQ_OUTPUT_DIR
319      };
320      assertTrue(runExport(args));
321
322      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
323      fs.delete(new Path(FQ_OUTPUT_DIR), true);
324    }
325  }
326
327  @Test
328  public void testWithDeletes() throws Throwable {
329    TableDescriptor desc = TableDescriptorBuilder
330            .newBuilder(TableName.valueOf(name.getMethodName()))
331            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
332              .setMaxVersions(5)
333              .setKeepDeletedCells(KeepDeletedCells.TRUE)
334              .build())
335            .build();
336    UTIL.getAdmin().createTable(desc);
337    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
338
339      Put p = new Put(ROW1);
340      p.addColumn(FAMILYA, QUAL, now, QUAL);
341      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
342      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
343      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
344      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
345      t.put(p);
346
347      Delete d = new Delete(ROW1, now+3);
348      t.delete(d);
349      d = new Delete(ROW1);
350      d.addColumns(FAMILYA, QUAL, now+2);
351      t.delete(d);
352    }
353
354    String[] args = new String[] {
355        "-D" + ExportUtils.RAW_SCAN + "=true",
356        name.getMethodName(),
357        FQ_OUTPUT_DIR,
358        "1000", // max number of key versions per key to export
359    };
360    assertTrue(runExport(args));
361
362    final String IMPORT_TABLE = name.getMethodName() + "import";
363    desc = TableDescriptorBuilder
364            .newBuilder(TableName.valueOf(IMPORT_TABLE))
365            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
366              .setMaxVersions(5)
367              .setKeepDeletedCells(KeepDeletedCells.TRUE)
368              .build())
369            .build();
370    UTIL.getAdmin().createTable(desc);
371    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
372      args = new String[] {
373          IMPORT_TABLE,
374          FQ_OUTPUT_DIR
375      };
376      assertTrue(runImport(args));
377
378      Scan s = new Scan();
379      s.setMaxVersions();
380      s.setRaw(true);
381      ResultScanner scanner = t.getScanner(s);
382      Result r = scanner.next();
383      Cell[] res = r.rawCells();
384      assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
385      assertEquals(now+4, res[1].getTimestamp());
386      assertEquals(now+3, res[2].getTimestamp());
387      assertTrue(CellUtil.isDelete(res[3]));
388      assertEquals(now+2, res[4].getTimestamp());
389      assertEquals(now+1, res[5].getTimestamp());
390      assertEquals(now, res[6].getTimestamp());
391    }
392  }
393
394
395  @Test
396  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
397    final TableName exportTable = TableName.valueOf(name.getMethodName());
398    TableDescriptor desc = TableDescriptorBuilder
399            .newBuilder(TableName.valueOf(name.getMethodName()))
400            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
401              .setMaxVersions(5)
402              .setKeepDeletedCells(KeepDeletedCells.TRUE)
403              .build())
404            .build();
405    UTIL.getAdmin().createTable(desc);
406
407    Table exportT = UTIL.getConnection().getTable(exportTable);
408
409    //Add first version of QUAL
410    Put p = new Put(ROW1);
411    p.addColumn(FAMILYA, QUAL, now, QUAL);
412    exportT.put(p);
413
414    //Add Delete family marker
415    Delete d = new Delete(ROW1, now+3);
416    exportT.delete(d);
417
418    //Add second version of QUAL
419    p = new Put(ROW1);
420    p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes());
421    exportT.put(p);
422
423    //Add second Delete family marker
424    d = new Delete(ROW1, now+7);
425    exportT.delete(d);
426
427
428    String[] args = new String[] {
429        "-D" + ExportUtils.RAW_SCAN + "=true", exportTable.getNameAsString(),
430        FQ_OUTPUT_DIR,
431        "1000", // max number of key versions per key to export
432    };
433    assertTrue(runExport(args));
434
435    final String importTable = name.getMethodName() + "import";
436    desc = TableDescriptorBuilder
437            .newBuilder(TableName.valueOf(importTable))
438            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
439              .setMaxVersions(5)
440              .setKeepDeletedCells(KeepDeletedCells.TRUE)
441              .build())
442            .build();
443    UTIL.getAdmin().createTable(desc);
444
445    Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
446    args = new String[] {
447        importTable,
448        FQ_OUTPUT_DIR
449    };
450    assertTrue(runImport(args));
451
452    Scan s = new Scan();
453    s.setMaxVersions();
454    s.setRaw(true);
455
456    ResultScanner importedTScanner = importT.getScanner(s);
457    Result importedTResult = importedTScanner.next();
458
459    ResultScanner exportedTScanner = exportT.getScanner(s);
460    Result  exportedTResult =  exportedTScanner.next();
461    try {
462      Result.compareResults(exportedTResult, importedTResult);
463    } catch (Throwable e) {
464      fail("Original and imported tables data comparision failed with error:"+e.getMessage());
465    } finally {
466      exportT.close();
467      importT.close();
468    }
469  }
470
471  /**
472   * Create a simple table, run an Export Job on it, Import with filtering on,  verify counts,
473   * attempt with invalid values.
474   */
475  @Test
476  public void testWithFilter() throws Throwable {
477    // Create simple table to export
478    TableDescriptor desc = TableDescriptorBuilder
479            .newBuilder(TableName.valueOf(name.getMethodName()))
480            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
481              .setMaxVersions(5)
482              .build())
483            .build();
484    UTIL.getAdmin().createTable(desc);
485    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
486
487    Put p1 = new Put(ROW1);
488    p1.addColumn(FAMILYA, QUAL, now, QUAL);
489    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
490    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
491    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
492    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
493
494    // Having another row would actually test the filter.
495    Put p2 = new Put(ROW2);
496    p2.addColumn(FAMILYA, QUAL, now, QUAL);
497
498    exportTable.put(Arrays.asList(p1, p2));
499
500    // Export the simple table
501    String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
502    assertTrue(runExport(args));
503
504    // Import to a new table
505    final String IMPORT_TABLE = name.getMethodName() + "import";
506    desc = TableDescriptorBuilder
507            .newBuilder(TableName.valueOf(IMPORT_TABLE))
508            .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA)
509              .setMaxVersions(5)
510              .build())
511            .build();
512    UTIL.getAdmin().createTable(desc);
513
514    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
515    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
516        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
517        FQ_OUTPUT_DIR,
518        "1000" };
519    assertTrue(runImport(args));
520
521    // get the count of the source table for that time range
522    PrefixFilter filter = new PrefixFilter(ROW1);
523    int count = getCount(exportTable, filter);
524
525    Assert.assertEquals("Unexpected row count between export and import tables", count,
526      getCount(importTable, null));
527
528    // and then test that a broken command doesn't bork everything - easier here because we don't
529    // need to re-run the export job
530
531    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
532        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
533        FQ_OUTPUT_DIR, "1000" };
534    assertFalse(runImport(args));
535
536    // cleanup
537    exportTable.close();
538    importTable.close();
539  }
540
541  /**
542   * Count the number of keyvalues in the specified table for the given timerange
543   * @param table
544   * @return
545   * @throws IOException
546   */
547  private int getCount(Table table, Filter filter) throws IOException {
548    Scan scan = new Scan();
549    scan.setFilter(filter);
550    ResultScanner results = table.getScanner(scan);
551    int count = 0;
552    for (Result res : results) {
553      count += res.size();
554    }
555    results.close();
556    return count;
557  }
558
559  /**
560   * test main method. Import should print help and call System.exit
561   */
562  @Test
563  public void testImportMain() throws Throwable {
564    PrintStream oldPrintStream = System.err;
565    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
566    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
567    System.setSecurityManager(newSecurityManager);
568    ByteArrayOutputStream data = new ByteArrayOutputStream();
569    String[] args = {};
570    System.setErr(new PrintStream(data));
571    try {
572      System.setErr(new PrintStream(data));
573      Import.main(args);
574      fail("should be SecurityException");
575    } catch (SecurityException e) {
576      assertEquals(-1, newSecurityManager.getExitCode());
577      assertTrue(data.toString().contains("Wrong number of arguments:"));
578      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
579      assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
580      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
581      assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
582    } finally {
583      System.setErr(oldPrintStream);
584      System.setSecurityManager(SECURITY_MANAGER);
585    }
586  }
587
588  @Test
589  public void testExportScan() throws Exception {
590    int version = 100;
591    long startTime = System.currentTimeMillis();
592    long endTime = startTime + 1;
593    String prefix = "row";
594    String label_0 = "label_0";
595    String label_1 = "label_1";
596    String[] args = {
597      "table",
598      "outputDir",
599      String.valueOf(version),
600      String.valueOf(startTime),
601      String.valueOf(endTime),
602      prefix
603    };
604    Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
605    assertEquals(version, scan.getMaxVersions());
606    assertEquals(startTime, scan.getTimeRange().getMin());
607    assertEquals(endTime, scan.getTimeRange().getMax());
608    assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
609    assertEquals(0, Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
610    String[] argsWithLabels = {
611      "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1,
612      "table",
613      "outputDir",
614      String.valueOf(version),
615      String.valueOf(startTime),
616      String.valueOf(endTime),
617      prefix
618    };
619    Configuration conf = new Configuration(UTIL.getConfiguration());
620    // parse the "-D" options
621    String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
622    Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
623    assertEquals(version, scanWithLabels.getMaxVersions());
624    assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
625    assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
626    assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
627    assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
628    assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
629    assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
630    assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
631  }
632
633  /**
634   * test main method. Export should print help and call System.exit
635   */
636  @Test
637  public void testExportMain() throws Throwable {
638    PrintStream oldPrintStream = System.err;
639    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
640    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
641    System.setSecurityManager(newSecurityManager);
642    ByteArrayOutputStream data = new ByteArrayOutputStream();
643    String[] args = {};
644    System.setErr(new PrintStream(data));
645    try {
646      System.setErr(new PrintStream(data));
647      runExportMain(args);
648      fail("should be SecurityException");
649    } catch (SecurityException e) {
650      assertEquals(-1, newSecurityManager.getExitCode());
651      String errMsg = data.toString();
652      assertTrue(errMsg.contains("Wrong number of arguments:"));
653      assertTrue(errMsg.contains(
654              "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
655              "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
656      assertTrue(
657        errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
658      assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
659      assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
660      assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
661      assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
662    } finally {
663      System.setErr(oldPrintStream);
664      System.setSecurityManager(SECURITY_MANAGER);
665    }
666  }
667
668  /**
669   * Test map method of Importer
670   */
671  @SuppressWarnings({ "unchecked", "rawtypes" })
672  @Test
673  public void testKeyValueImporter() throws Throwable {
674    CellImporter importer = new CellImporter();
675    Configuration configuration = new Configuration();
676    Context ctx = mock(Context.class);
677    when(ctx.getConfiguration()).thenReturn(configuration);
678
679    doAnswer(new Answer<Void>() {
680
681      @Override
682      public Void answer(InvocationOnMock invocation) throws Throwable {
683        ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
684        MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArguments()[1];
685        assertEquals("Key", Bytes.toString(writer.get()));
686        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
687        return null;
688      }
689    }).when(ctx).write(any(ImmutableBytesWritable.class), any(MapReduceExtendedCell.class));
690
691    importer.setup(ctx);
692    Result value = mock(Result.class);
693    KeyValue[] keys = {
694        new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
695            Bytes.toBytes("value")),
696        new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
697            Bytes.toBytes("value1")) };
698    when(value.rawCells()).thenReturn(keys);
699    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
700
701  }
702
703  /**
704   * Test addFilterAndArguments method of Import This method set couple
705   * parameters into Configuration
706   */
707  @Test
708  public void testAddFilterAndArguments() throws IOException {
709    Configuration configuration = new Configuration();
710
711    List<String> args = new ArrayList<>();
712    args.add("param1");
713    args.add("param2");
714
715    Import.addFilterAndArguments(configuration, FilterBase.class, args);
716    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
717        configuration.get(Import.FILTER_CLASS_CONF_KEY));
718    assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
719  }
720
721  @Test
722  public void testDurability() throws Throwable {
723    // Create an export table.
724    String exportTableName = name.getMethodName() + "export";
725    try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
726
727      // Insert some data
728      Put put = new Put(ROW1);
729      put.addColumn(FAMILYA, QUAL, now, QUAL);
730      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
731      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
732      exportTable.put(put);
733
734      put = new Put(ROW2);
735      put.addColumn(FAMILYA, QUAL, now, QUAL);
736      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
737      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
738      exportTable.put(put);
739
740      // Run the export
741      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
742      assertTrue(runExport(args));
743
744      // Create the table for import
745      String importTableName = name.getMethodName() + "import1";
746      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
747
748      // Register the wal listener for the import table
749      RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
750          .getRegions(importTable.getName()).get(0).getRegionInfo();
751      TableWALActionListener walListener = new TableWALActionListener(region);
752      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
753      wal.registerWALActionsListener(walListener);
754
755      // Run the import with SKIP_WAL
756      args =
757          new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
758              importTableName, FQ_OUTPUT_DIR };
759      assertTrue(runImport(args));
760      //Assert that the wal is not visisted
761      assertTrue(!walListener.isWALVisited());
762      //Ensure that the count is 2 (only one version of key value is obtained)
763      assertTrue(getCount(importTable, null) == 2);
764
765      // Run the import with the default durability option
766      importTableName = name.getMethodName() + "import2";
767      importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
768      region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
769          .getRegions(importTable.getName()).get(0).getRegionInfo();
770      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
771      walListener = new TableWALActionListener(region);
772      wal.registerWALActionsListener(walListener);
773      args = new String[] { importTableName, FQ_OUTPUT_DIR };
774      assertTrue(runImport(args));
775      //Assert that the wal is visisted
776      assertTrue(walListener.isWALVisited());
777      //Ensure that the count is 2 (only one version of key value is obtained)
778      assertTrue(getCount(importTable, null) == 2);
779    }
780  }
781
782  /**
783   * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to
784   * identify that an entry is written to the Write Ahead Log for the given table.
785   */
786  private static class TableWALActionListener implements WALActionsListener {
787
788    private RegionInfo regionInfo;
789    private boolean isVisited = false;
790
791    public TableWALActionListener(RegionInfo region) {
792      this.regionInfo = region;
793    }
794
795    @Override
796    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
797      if (logKey.getTableName().getNameAsString().equalsIgnoreCase(
798          this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) {
799        isVisited = true;
800      }
801    }
802
803    public boolean isWALVisited() {
804      return isVisited;
805    }
806  }
807}