001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.ByteArrayOutputStream;
032import java.io.File;
033import java.io.IOException;
034import java.io.PrintStream;
035import java.net.URL;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.List;
039import java.util.Optional;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ArrayBackedTag;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellScanner;
046import org.apache.hadoop.hbase.CellUtil;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtil;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeepDeletedCells;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.Tag;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Delete;
059import org.apache.hadoop.hbase.client.Durability;
060import org.apache.hadoop.hbase.client.Get;
061import org.apache.hadoop.hbase.client.Mutation;
062import org.apache.hadoop.hbase.client.Put;
063import org.apache.hadoop.hbase.client.RegionInfo;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.Table;
068import org.apache.hadoop.hbase.client.TableDescriptor;
069import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
070import org.apache.hadoop.hbase.coprocessor.ObserverContext;
071import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
072import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
073import org.apache.hadoop.hbase.coprocessor.RegionObserver;
074import org.apache.hadoop.hbase.filter.Filter;
075import org.apache.hadoop.hbase.filter.FilterBase;
076import org.apache.hadoop.hbase.filter.PrefixFilter;
077import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
078import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
079import org.apache.hadoop.hbase.regionserver.HRegion;
080import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
081import org.apache.hadoop.hbase.regionserver.RegionScanner;
082import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
083import org.apache.hadoop.hbase.testclassification.MediumTests;
084import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
085import org.apache.hadoop.hbase.util.Bytes;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.LauncherSecurityManager;
088import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
089import org.apache.hadoop.hbase.wal.WAL;
090import org.apache.hadoop.hbase.wal.WALEdit;
091import org.apache.hadoop.hbase.wal.WALKey;
092import org.apache.hadoop.mapreduce.Mapper.Context;
093import org.apache.hadoop.util.GenericOptionsParser;
094import org.apache.hadoop.util.ToolRunner;
095import org.junit.After;
096import org.junit.AfterClass;
097import org.junit.Assert;
098import org.junit.Before;
099import org.junit.BeforeClass;
100import org.junit.ClassRule;
101import org.junit.Rule;
102import org.junit.Test;
103import org.junit.experimental.categories.Category;
104import org.junit.rules.TestName;
105import org.mockito.invocation.InvocationOnMock;
106import org.mockito.stubbing.Answer;
107import org.slf4j.Logger;
108import org.slf4j.LoggerFactory;
109
110/**
111 * Tests the table import and table export MR job functionality
112 */
113@Category({ VerySlowMapReduceTests.class, MediumTests.class })
114public class TestImportExport {
115
116  @ClassRule
117  public static final HBaseClassTestRule CLASS_RULE =
118    HBaseClassTestRule.forClass(TestImportExport.class);
119
120  private static final Logger LOG = LoggerFactory.getLogger(TestImportExport.class);
121  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
122  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
123  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
124  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
125  private static final String FAMILYA_STRING = "a";
126  private static final String FAMILYB_STRING = "b";
127  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
128  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
129  private static final byte[] QUAL = Bytes.toBytes("q");
130  private static final String OUTPUT_DIR = "outputdir";
131  private static String FQ_OUTPUT_DIR;
132  private static final String EXPORT_BATCH_SIZE = "100";
133
134  private static final long now = EnvironmentEdgeManager.currentTime();
135  private final TableName EXPORT_TABLE = TableName.valueOf("export_table");
136  private final TableName IMPORT_TABLE = TableName.valueOf("import_table");
137  public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1);
138  public static final String TEST_ATTR = "source_op";
139  public static final String TEST_TAG = "test_tag";
140
141  @BeforeClass
142  public static void beforeClass() throws Throwable {
143    // Up the handlers; this test needs more than usual.
144    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
145    UTIL.startMiniCluster();
146    FQ_OUTPUT_DIR =
147      new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
148  }
149
150  @AfterClass
151  public static void afterClass() throws Throwable {
152    UTIL.shutdownMiniCluster();
153  }
154
155  @Rule
156  public final TestName name = new TestName();
157
158  @Before
159  public void announce() {
160    LOG.info("Running " + name.getMethodName());
161  }
162
163  @After
164  public void cleanup() throws Throwable {
165    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
166    fs.delete(new Path(OUTPUT_DIR), true);
167    if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
168      UTIL.deleteTable(EXPORT_TABLE);
169    }
170    if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
171      UTIL.deleteTable(IMPORT_TABLE);
172    }
173  }
174
175  /**
176   * Runs an export job with the specified command line args
177   * @return true if job completed successfully
178   */
179  protected boolean runExport(String[] args) throws Throwable {
180    // need to make a copy of the configuration because to make sure different temp dirs are used.
181    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
182    return status == 0;
183  }
184
185  protected void runExportMain(String[] args) throws Throwable {
186    Export.main(args);
187  }
188
189  /**
190   * Runs an import job with the specified command line args
191   * @return true if job completed successfully
192   */
193  boolean runImport(String[] args) throws Throwable {
194    // need to make a copy of the configuration because to make sure different temp dirs are used.
195    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
196    return status == 0;
197  }
198
199  /**
200   * Test simple replication case with column mapping
201   */
202  @Test
203  public void testSimpleCase() throws Throwable {
204    try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3)) {
205      Put p = new Put(ROW1);
206      p.addColumn(FAMILYA, QUAL, now, QUAL);
207      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
208      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
209      t.put(p);
210      p = new Put(ROW2);
211      p.addColumn(FAMILYA, QUAL, now, QUAL);
212      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
213      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
214      t.put(p);
215      p = new Put(ROW3);
216      p.addColumn(FAMILYA, QUAL, now, QUAL);
217      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
218      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
219      t.put(p);
220    }
221
222    String[] args = new String[] {
223      // Only export row1 & row2.
224      "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
225      "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name.getMethodName(), FQ_OUTPUT_DIR,
226      "1000", // max number of key versions per key to export
227    };
228    assertTrue(runExport(args));
229
230    final String IMPORT_TABLE = name.getMethodName() + "import";
231    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3)) {
232      args =
233        new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
234          IMPORT_TABLE, FQ_OUTPUT_DIR };
235      assertTrue(runImport(args));
236
237      Get g = new Get(ROW1);
238      g.readAllVersions();
239      Result r = t.get(g);
240      assertEquals(3, r.size());
241      g = new Get(ROW2);
242      g.readAllVersions();
243      r = t.get(g);
244      assertEquals(3, r.size());
245      g = new Get(ROW3);
246      r = t.get(g);
247      assertEquals(0, r.size());
248    }
249  }
250
251  /**
252   * Test export hbase:meta table
253   */
254  @Test
255  public void testMetaExport() throws Throwable {
256    String[] args =
257      new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" };
258    assertTrue(runExport(args));
259  }
260
261  /**
262   * Test import data from 0.94 exported file
263   */
264  @Test
265  public void testImport94Table() throws Throwable {
266    final String name = "exportedTableIn94Format";
267    URL url = TestImportExport.class.getResource(name);
268    File f = new File(url.toURI());
269    if (!f.exists()) {
270      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
271      return;
272    }
273    assertTrue(f.exists());
274    LOG.info("FILE=" + f);
275    Path importPath = new Path(f.toURI());
276    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
277    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
278    String IMPORT_TABLE = name;
279    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3)) {
280      String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR };
281      assertTrue(runImport(args));
282      // @formatter:off
283      // exportedTableIn94Format contains 5 rows
284      // ROW         COLUMN+CELL
285      // r1          column=f1:c1, timestamp=1383766761171, value=val1
286      // r2          column=f1:c1, timestamp=1383766771642, value=val2
287      // r3          column=f1:c1, timestamp=1383766777615, value=val3
288      // r4          column=f1:c1, timestamp=1383766785146, value=val4
289      // r5          column=f1:c1, timestamp=1383766791506, value=val5
290      // @formatter:on
291      assertEquals(5, UTIL.countRows(t));
292    }
293  }
294
295  /**
296   * Test export scanner batching
297   */
298  @Test
299  public void testExportScannerBatching() throws Throwable {
300    TableDescriptor desc = TableDescriptorBuilder
301      .newBuilder(TableName.valueOf(name.getMethodName()))
302      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build())
303      .build();
304    UTIL.getAdmin().createTable(desc);
305    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
306      Put p = new Put(ROW1);
307      p.addColumn(FAMILYA, QUAL, now, QUAL);
308      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
309      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
310      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
311      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
312      t.put(p);
313      // added scanner batching arg.
314      String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,
315        name.getMethodName(), FQ_OUTPUT_DIR };
316      assertTrue(runExport(args));
317
318      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
319      fs.delete(new Path(FQ_OUTPUT_DIR), true);
320    }
321  }
322
323  @Test
324  public void testWithDeletes() throws Throwable {
325    TableDescriptor desc =
326      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
327        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
328          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
329        .build();
330    UTIL.getAdmin().createTable(desc);
331    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
332      Put p = new Put(ROW1);
333      p.addColumn(FAMILYA, QUAL, now, QUAL);
334      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
335      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
336      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
337      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
338      t.put(p);
339
340      Delete d = new Delete(ROW1, now + 3);
341      t.delete(d);
342      d = new Delete(ROW1);
343      d.addColumns(FAMILYA, QUAL, now + 2);
344      t.delete(d);
345    }
346
347    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name.getMethodName(),
348      FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export
349    };
350    assertTrue(runExport(args));
351
352    final String IMPORT_TABLE = name.getMethodName() + "import";
353    desc = TableDescriptorBuilder
354      .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder
355        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
356      .build();
357    UTIL.getAdmin().createTable(desc);
358    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
359      args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR };
360      assertTrue(runImport(args));
361
362      Scan s = new Scan();
363      s.readAllVersions();
364      s.setRaw(true);
365      ResultScanner scanner = t.getScanner(s);
366      Result r = scanner.next();
367      Cell[] res = r.rawCells();
368      assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
369      assertEquals(now + 4, res[1].getTimestamp());
370      assertEquals(now + 3, res[2].getTimestamp());
371      assertTrue(CellUtil.isDelete(res[3]));
372      assertEquals(now + 2, res[4].getTimestamp());
373      assertEquals(now + 1, res[5].getTimestamp());
374      assertEquals(now, res[6].getTimestamp());
375    }
376  }
377
378  @Test
379  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
380    final TableName exportTable = TableName.valueOf(name.getMethodName());
381    TableDescriptor desc =
382      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
383        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
384          .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
385        .build();
386    UTIL.getAdmin().createTable(desc);
387
388    Table exportT = UTIL.getConnection().getTable(exportTable);
389
390    // Add first version of QUAL
391    Put p = new Put(ROW1);
392    p.addColumn(FAMILYA, QUAL, now, QUAL);
393    exportT.put(p);
394
395    // Add Delete family marker
396    Delete d = new Delete(ROW1, now + 3);
397    exportT.delete(d);
398
399    // Add second version of QUAL
400    p = new Put(ROW1);
401    p.addColumn(FAMILYA, QUAL, now + 5, Bytes.toBytes("s"));
402    exportT.put(p);
403
404    // Add second Delete family marker
405    d = new Delete(ROW1, now + 7);
406    exportT.delete(d);
407
408    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
409      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
410                                                            // export
411    };
412    assertTrue(runExport(args));
413
414    final String importTable = name.getMethodName() + "import";
415    desc = TableDescriptorBuilder
416      .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder
417        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
418      .build();
419    UTIL.getAdmin().createTable(desc);
420
421    Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
422    args = new String[] { importTable, FQ_OUTPUT_DIR };
423    assertTrue(runImport(args));
424
425    Scan s = new Scan();
426    s.readAllVersions();
427    s.setRaw(true);
428
429    ResultScanner importedTScanner = importT.getScanner(s);
430    Result importedTResult = importedTScanner.next();
431
432    ResultScanner exportedTScanner = exportT.getScanner(s);
433    Result exportedTResult = exportedTScanner.next();
434    try {
435      Result.compareResults(exportedTResult, importedTResult);
436    } catch (Throwable e) {
437      fail("Original and imported tables data comparision failed with error:" + e.getMessage());
438    } finally {
439      exportT.close();
440      importT.close();
441    }
442  }
443
444  /**
445   * Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
446   * attempt with invalid values.
447   */
448  @Test
449  public void testWithFilter() throws Throwable {
450    // Create simple table to export
451    TableDescriptor desc = TableDescriptorBuilder
452      .newBuilder(TableName.valueOf(name.getMethodName()))
453      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
454      .build();
455    UTIL.getAdmin().createTable(desc);
456    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
457
458    Put p1 = new Put(ROW1);
459    p1.addColumn(FAMILYA, QUAL, now, QUAL);
460    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
461    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
462    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
463    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
464
465    // Having another row would actually test the filter.
466    Put p2 = new Put(ROW2);
467    p2.addColumn(FAMILYA, QUAL, now, QUAL);
468
469    exportTable.put(Arrays.asList(p1, p2));
470
471    // Export the simple table
472    String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
473    assertTrue(runExport(args));
474
475    // Import to a new table
476    final String IMPORT_TABLE = name.getMethodName() + "import";
477    desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
478      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
479      .build();
480    UTIL.getAdmin().createTable(desc);
481
482    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
483    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
484      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
485      "1000" };
486    assertTrue(runImport(args));
487
488    // get the count of the source table for that time range
489    PrefixFilter filter = new PrefixFilter(ROW1);
490    int count = getCount(exportTable, filter);
491
492    Assert.assertEquals("Unexpected row count between export and import tables", count,
493      getCount(importTable, null));
494
495    // and then test that a broken command doesn't bork everything - easier here because we don't
496    // need to re-run the export job
497
498    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
499      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(),
500      FQ_OUTPUT_DIR, "1000" };
501    assertFalse(runImport(args));
502
503    // cleanup
504    exportTable.close();
505    importTable.close();
506  }
507
508  /**
509   * Count the number of keyvalues in the specified table with the given filter
510   * @param table the table to scan
511   * @return the number of keyvalues found
512   */
513  private int getCount(Table table, Filter filter) throws IOException {
514    Scan scan = new Scan();
515    scan.setFilter(filter);
516    ResultScanner results = table.getScanner(scan);
517    int count = 0;
518    for (Result res : results) {
519      count += res.size();
520    }
521    results.close();
522    return count;
523  }
524
525  /**
526   * test main method. Import should print help and call System.exit
527   */
528  @Test
529  public void testImportMain() throws Throwable {
530    PrintStream oldPrintStream = System.err;
531    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
532    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
533    System.setSecurityManager(newSecurityManager);
534    ByteArrayOutputStream data = new ByteArrayOutputStream();
535    String[] args = {};
536    System.setErr(new PrintStream(data));
537    try {
538      System.setErr(new PrintStream(data));
539      Import.main(args);
540      fail("should be SecurityException");
541    } catch (SecurityException e) {
542      assertEquals(-1, newSecurityManager.getExitCode());
543      assertTrue(data.toString().contains("Wrong number of arguments:"));
544      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
545      assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
546      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
547      assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
548    } finally {
549      System.setErr(oldPrintStream);
550      System.setSecurityManager(SECURITY_MANAGER);
551    }
552  }
553
554  @Test
555  public void testExportScan() throws Exception {
556    int version = 100;
557    long startTime = EnvironmentEdgeManager.currentTime();
558    long endTime = startTime + 1;
559    String prefix = "row";
560    String label_0 = "label_0";
561    String label_1 = "label_1";
562    String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime),
563      String.valueOf(endTime), prefix };
564    Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
565    assertEquals(version, scan.getMaxVersions());
566    assertEquals(startTime, scan.getTimeRange().getMin());
567    assertEquals(endTime, scan.getTimeRange().getMax());
568    assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
569    assertEquals(0,
570      Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
571    String[] argsWithLabels =
572      { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table",
573        "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime),
574        prefix };
575    Configuration conf = new Configuration(UTIL.getConfiguration());
576    // parse the "-D" options
577    String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
578    Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
579    assertEquals(version, scanWithLabels.getMaxVersions());
580    assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
581    assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
582    assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
583    assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(),
584      Bytes.toBytesBinary(prefix)));
585    assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
586    assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
587    assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
588  }
589
590  /**
591   * test main method. Export should print help and call System.exit
592   */
593  @Test
594  public void testExportMain() throws Throwable {
595    PrintStream oldPrintStream = System.err;
596    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
597    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
598    System.setSecurityManager(newSecurityManager);
599    ByteArrayOutputStream data = new ByteArrayOutputStream();
600    String[] args = {};
601    System.setErr(new PrintStream(data));
602    try {
603      System.setErr(new PrintStream(data));
604      runExportMain(args);
605      fail("should be SecurityException");
606    } catch (SecurityException e) {
607      assertEquals(-1, newSecurityManager.getExitCode());
608      String errMsg = data.toString();
609      assertTrue(errMsg.contains("Wrong number of arguments:"));
610      assertTrue(
611        errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> "
612          + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
613      assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
614      assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
615      assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
616      assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
617      assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
618    } finally {
619      System.setErr(oldPrintStream);
620      System.setSecurityManager(SECURITY_MANAGER);
621    }
622  }
623
624  /**
625   * Test map method of Importer
626   */
627  @SuppressWarnings({ "unchecked", "rawtypes" })
628  @Test
629  public void testKeyValueImporter() throws Throwable {
630    CellImporter importer = new CellImporter();
631    Configuration configuration = new Configuration();
632    Context ctx = mock(Context.class);
633    when(ctx.getConfiguration()).thenReturn(configuration);
634
635    doAnswer(new Answer<Void>() {
636
637      @Override
638      public Void answer(InvocationOnMock invocation) throws Throwable {
639        ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0);
640        MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1);
641        assertEquals("Key", Bytes.toString(writer.get()));
642        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
643        return null;
644      }
645    }).when(ctx).write(any(), any());
646
647    importer.setup(ctx);
648    Result value = mock(Result.class);
649    KeyValue[] keys = {
650      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
651        Bytes.toBytes("value")),
652      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
653        Bytes.toBytes("value1")) };
654    when(value.rawCells()).thenReturn(keys);
655    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
656
657  }
658
659  /**
660   * Test addFilterAndArguments method of Import This method set couple parameters into
661   * Configuration
662   */
663  @Test
664  public void testAddFilterAndArguments() throws IOException {
665    Configuration configuration = new Configuration();
666
667    List<String> args = new ArrayList<>();
668    args.add("param1");
669    args.add("param2");
670
671    Import.addFilterAndArguments(configuration, FilterBase.class, args);
672    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
673      configuration.get(Import.FILTER_CLASS_CONF_KEY));
674    assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
675  }
676
677  @Test
678  public void testDurability() throws Throwable {
679    // Create an export table.
680    String exportTableName = name.getMethodName() + "export";
681    try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3)) {
682      // Insert some data
683      Put put = new Put(ROW1);
684      put.addColumn(FAMILYA, QUAL, now, QUAL);
685      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
686      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
687      exportTable.put(put);
688
689      put = new Put(ROW2);
690      put.addColumn(FAMILYA, QUAL, now, QUAL);
691      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
692      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
693      exportTable.put(put);
694
695      // Run the export
696      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" };
697      assertTrue(runExport(args));
698
699      // Create the table for import
700      String importTableName = name.getMethodName() + "import1";
701      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
702
703      // Register the wal listener for the import table
704      RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
705        .getRegions(importTable.getName()).get(0).getRegionInfo();
706      TableWALActionListener walListener = new TableWALActionListener(region);
707      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
708      wal.registerWALActionsListener(walListener);
709
710      // Run the import with SKIP_WAL
711      args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
712        importTableName, FQ_OUTPUT_DIR };
713      assertTrue(runImport(args));
714      // Assert that the wal is not visisted
715      assertTrue(!walListener.isWALVisited());
716      // Ensure that the count is 2 (only one version of key value is obtained)
717      assertTrue(getCount(importTable, null) == 2);
718
719      // Run the import with the default durability option
720      importTableName = name.getMethodName() + "import2";
721      importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
722      region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
723        .getRegions(importTable.getName()).get(0).getRegionInfo();
724      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
725      walListener = new TableWALActionListener(region);
726      wal.registerWALActionsListener(walListener);
727      args = new String[] { importTableName, FQ_OUTPUT_DIR };
728      assertTrue(runImport(args));
729      // Assert that the wal is visisted
730      assertTrue(walListener.isWALVisited());
731      // Ensure that the count is 2 (only one version of key value is obtained)
732      assertTrue(getCount(importTable, null) == 2);
733    }
734  }
735
736  /**
737   * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify
738   * that an entry is written to the Write Ahead Log for the given table.
739   */
740  private static class TableWALActionListener implements WALActionsListener {
741
742    private RegionInfo regionInfo;
743    private boolean isVisited = false;
744
745    public TableWALActionListener(RegionInfo region) {
746      this.regionInfo = region;
747    }
748
749    @Override
750    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
751      if (
752        logKey.getTableName().getNameAsString()
753          .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())
754      ) {
755        isVisited = true;
756      }
757    }
758
759    public boolean isWALVisited() {
760      return isVisited;
761    }
762  }
763
764  /**
765   * Add cell tags to delete mutations, run export and import tool and verify that tags are present
766   * in import table also.
767   * @throws Throwable throws Throwable.
768   */
769  @Test
770  public void testTagsAddition() throws Throwable {
771    final TableName exportTable = TableName.valueOf(name.getMethodName());
772    TableDescriptor desc = TableDescriptorBuilder.newBuilder(exportTable)
773      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
774        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
775      .setCoprocessor(MetadataController.class.getName()).build();
776    UTIL.getAdmin().createTable(desc);
777
778    Table exportT = UTIL.getConnection().getTable(exportTable);
779
780    // Add first version of QUAL
781    Put p = new Put(ROW1);
782    p.addColumn(FAMILYA, QUAL, now, QUAL);
783    exportT.put(p);
784
785    // Add Delete family marker
786    Delete d = new Delete(ROW1, now + 3);
787    // Add test attribute to delete mutation.
788    d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
789    exportT.delete(d);
790
791    // Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool
792    // will use KeyValueCodecWithTags.
793    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
794      // This will make sure that codec will encode and decode tags in rpc call.
795      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
796      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
797                                                            // export
798    };
799    assertTrue(runExport(args));
800    // Assert tag exists in exportTable
801    checkWhetherTagExists(exportTable, true);
802
803    // Create an import table with MetadataController.
804    final TableName importTable = TableName.valueOf("importWithTestTagsAddition");
805    TableDescriptor importTableDesc = TableDescriptorBuilder.newBuilder(importTable)
806      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
807        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
808      .setCoprocessor(MetadataController.class.getName()).build();
809    UTIL.getAdmin().createTable(importTableDesc);
810
811    // Run import tool.
812    args = new String[] {
813      // This will make sure that codec will encode and decode tags in rpc call.
814      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
815      importTable.getNameAsString(), FQ_OUTPUT_DIR };
816    assertTrue(runImport(args));
817    // Make sure that tags exists in imported table.
818    checkWhetherTagExists(importTable, true);
819  }
820
821  private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException {
822    List<Cell> values = new ArrayList<>();
823    for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) {
824      Scan scan = new Scan();
825      // Make sure to set rawScan to true so that we will get Delete Markers.
826      scan.setRaw(true);
827      scan.readAllVersions();
828      scan.withStartRow(ROW1);
829      // Need to use RegionScanner instead of table#getScanner since the latter will
830      // not return tags since it will go through rpc layer and remove tags intentionally.
831      RegionScanner scanner = region.getScanner(scan);
832      scanner.next(values);
833      if (!values.isEmpty()) {
834        break;
835      }
836    }
837    boolean deleteFound = false;
838    for (Cell cell : values) {
839      if (PrivateCellUtil.isDelete(cell.getType().getCode())) {
840        deleteFound = true;
841        List<Tag> tags = PrivateCellUtil.getTags(cell);
842        // If tagExists flag is true then validate whether tag contents are as expected.
843        if (tagExists) {
844          Assert.assertEquals(1, tags.size());
845          for (Tag tag : tags) {
846            Assert.assertEquals(TEST_TAG, Tag.getValueAsString(tag));
847          }
848        } else {
849          // If tagExists flag is disabled then check for 0 size tags.
850          assertEquals(0, tags.size());
851        }
852      }
853    }
854    Assert.assertTrue(deleteFound);
855  }
856
857  /*
858   * This co-proc will add a cell tag to delete mutation.
859   */
860  public static class MetadataController implements RegionCoprocessor, RegionObserver {
861    @Override
862    public Optional<RegionObserver> getRegionObserver() {
863      return Optional.of(this);
864    }
865
866    @Override
867    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
868      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
869      if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
870        return;
871      }
872      for (int i = 0; i < miniBatchOp.size(); i++) {
873        Mutation m = miniBatchOp.getOperation(i);
874        if (!(m instanceof Delete)) {
875          continue;
876        }
877        byte[] sourceOpAttr = m.getAttribute(TEST_ATTR);
878        if (sourceOpAttr == null) {
879          continue;
880        }
881        Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr);
882        List<Cell> updatedCells = new ArrayList<>();
883        for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
884          Cell cell = cellScanner.current();
885          List<Tag> tags = PrivateCellUtil.getTags(cell);
886          tags.add(sourceOpTag);
887          Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
888          updatedCells.add(updatedCell);
889        }
890        m.getFamilyCellMap().clear();
891        // Clear and add new Cells to the Mutation.
892        for (Cell cell : updatedCells) {
893          Delete d = (Delete) m;
894          d.add(cell);
895        }
896      }
897    }
898  }
899
900  /**
901   * Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string This means
902   * it will use no Codec. Make sure that we don't return Tags in response.
903   * @throws Exception Exception
904   */
905  @Test
906  public void testTagsWithEmptyCodec() throws Exception {
907    TableName tableName = TableName.valueOf(name.getMethodName());
908    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
909      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
910        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
911      .setCoprocessor(MetadataController.class.getName()).build();
912    UTIL.getAdmin().createTable(tableDesc);
913    Configuration conf = new Configuration(UTIL.getConfiguration());
914    conf.set(RPC_CODEC_CONF_KEY, "");
915    conf.set(DEFAULT_CODEC_CLASS, "");
916    try (Connection connection = ConnectionFactory.createConnection(conf);
917      Table table = connection.getTable(tableName)) {
918      // Add first version of QUAL
919      Put p = new Put(ROW1);
920      p.addColumn(FAMILYA, QUAL, now, QUAL);
921      table.put(p);
922
923      // Add Delete family marker
924      Delete d = new Delete(ROW1, now + 3);
925      // Add test attribute to delete mutation.
926      d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
927      table.delete(d);
928
929      // Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use
930      // empty Codec and it shouldn't encode/decode tags.
931      Scan scan = new Scan().withStartRow(ROW1).setRaw(true);
932      ResultScanner scanner = table.getScanner(scan);
933      int count = 0;
934      Result result;
935      while ((result = scanner.next()) != null) {
936        List<Cell> cells = result.listCells();
937        assertEquals(2, cells.size());
938        Cell cell = cells.get(0);
939        assertTrue(CellUtil.isDelete(cell));
940        List<Tag> tags = PrivateCellUtil.getTags(cell);
941        assertEquals(0, tags.size());
942        count++;
943      }
944      assertEquals(1, count);
945    } finally {
946      UTIL.deleteTable(tableName);
947    }
948  }
949}