001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
021import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.junit.jupiter.api.Assertions.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.mock;
029import static org.mockito.Mockito.when;
030
031import java.io.ByteArrayOutputStream;
032import java.io.File;
033import java.io.IOException;
034import java.io.PrintStream;
035import java.net.URL;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.List;
039import java.util.Optional;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ArrayBackedTag;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellUtil;
046import org.apache.hadoop.hbase.ExtendedCell;
047import org.apache.hadoop.hbase.ExtendedCellScanner;
048import org.apache.hadoop.hbase.HBaseTestingUtil;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeepDeletedCells;
051import org.apache.hadoop.hbase.KeyValue;
052import org.apache.hadoop.hbase.PrivateCellUtil;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.Tag;
055import org.apache.hadoop.hbase.client.ClientInternalHelper;
056import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.Delete;
060import org.apache.hadoop.hbase.client.Durability;
061import org.apache.hadoop.hbase.client.Get;
062import org.apache.hadoop.hbase.client.Mutation;
063import org.apache.hadoop.hbase.client.Put;
064import org.apache.hadoop.hbase.client.RegionInfo;
065import org.apache.hadoop.hbase.client.Result;
066import org.apache.hadoop.hbase.client.ResultScanner;
067import org.apache.hadoop.hbase.client.Scan;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.coprocessor.ObserverContext;
072import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
073import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
074import org.apache.hadoop.hbase.coprocessor.RegionObserver;
075import org.apache.hadoop.hbase.filter.Filter;
076import org.apache.hadoop.hbase.filter.FilterBase;
077import org.apache.hadoop.hbase.filter.PrefixFilter;
078import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
079import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
080import org.apache.hadoop.hbase.regionserver.HRegion;
081import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
082import org.apache.hadoop.hbase.regionserver.RegionScanner;
083import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
084import org.apache.hadoop.hbase.util.Bytes;
085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
086import org.apache.hadoop.hbase.util.LauncherSecurityManager;
087import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
088import org.apache.hadoop.hbase.wal.WAL;
089import org.apache.hadoop.hbase.wal.WALEdit;
090import org.apache.hadoop.hbase.wal.WALKey;
091import org.apache.hadoop.mapreduce.Mapper.Context;
092import org.apache.hadoop.util.GenericOptionsParser;
093import org.apache.hadoop.util.ToolRunner;
094import org.junit.jupiter.api.AfterEach;
095import org.junit.jupiter.api.BeforeEach;
096import org.junit.jupiter.api.Test;
097import org.junit.jupiter.api.TestInfo;
098import org.mockito.invocation.InvocationOnMock;
099import org.mockito.stubbing.Answer;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103/**
104 * Base class for testing Import/Export. Shared logic without @BeforeAll/@AfterAll to allow
105 * subclasses to manage their own lifecycle.
106 */
107public class TestImportExportBase {
108
109  private static final Logger LOG = LoggerFactory.getLogger(TestImportExportBase.class);
110  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
111  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
112  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
113  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
114  private static final String FAMILYA_STRING = "a";
115  private static final String FAMILYB_STRING = "b";
116  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
117  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
118  private static final byte[] QUAL = Bytes.toBytes("q");
119  protected static final String OUTPUT_DIR = "outputdir";
120  protected static String FQ_OUTPUT_DIR;
121  private static final String EXPORT_BATCH_SIZE = "100";
122
123  private static final long now = EnvironmentEdgeManager.currentTime();
124  protected final TableName EXPORT_TABLE = TableName.valueOf("export_table");
125  protected final TableName IMPORT_TABLE = TableName.valueOf("import_table");
126  public static final byte TEST_TAG_TYPE = (byte) (Tag.CUSTOM_TAG_TYPE_RANGE + 1);
127  public static final String TEST_ATTR = "source_op";
128  public static final String TEST_TAG = "test_tag";
129
130  protected String name;
131
132  @BeforeEach
133  public void announce(TestInfo testInfo) {
134    name = testInfo.getTestMethod().get().getName();
135    LOG.info("Running {}", name);
136  }
137
138  @AfterEach
139  public void cleanup() throws Throwable {
140    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
141    fs.delete(new Path(OUTPUT_DIR), true);
142    if (UTIL.getAdmin().tableExists(EXPORT_TABLE)) {
143      UTIL.deleteTable(EXPORT_TABLE);
144    }
145    if (UTIL.getAdmin().tableExists(IMPORT_TABLE)) {
146      UTIL.deleteTable(IMPORT_TABLE);
147    }
148  }
149
150  protected static void setUpBeforeClass() throws Exception {
151    // Up the handlers; this test needs more than usual.
152    UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
153    UTIL.startMiniCluster();
154    FQ_OUTPUT_DIR =
155      new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
156  }
157
158  /**
159   * Runs an export job with the specified command line args
160   * @return true if job completed successfully
161   */
162  protected boolean runExport(String[] args) throws Throwable {
163    // need to make a copy of the configuration because to make sure different temp dirs are used.
164    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args);
165    return status == 0;
166  }
167
168  protected void runExportMain(String[] args) throws Throwable {
169    Export.main(args);
170  }
171
172  /**
173   * Runs an import job with the specified command line args
174   * @return true if job completed successfully
175   */
176  boolean runImport(String[] args) throws Throwable {
177    // need to make a copy of the configuration because to make sure different temp dirs are used.
178    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args);
179    return status == 0;
180  }
181
182  /**
183   * Test simple replication case with column mapping
184   */
185  @Test
186  public void testSimpleCase() throws Throwable {
187    try (Table t = UTIL.createTable(TableName.valueOf(name), FAMILYA, 3)) {
188      Put p = new Put(ROW1);
189      p.addColumn(FAMILYA, QUAL, now, QUAL);
190      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
191      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
192      t.put(p);
193      p = new Put(ROW2);
194      p.addColumn(FAMILYA, QUAL, now, QUAL);
195      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
196      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
197      t.put(p);
198      p = new Put(ROW3);
199      p.addColumn(FAMILYA, QUAL, now, QUAL);
200      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
201      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
202      t.put(p);
203    }
204
205    String[] args = new String[] {
206      // Only export row1 & row2.
207      "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
208      "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", name, FQ_OUTPUT_DIR, "1000", // max
209                                                                                         // number
210                                                                                         // of key
211                                                                                         // versions
212                                                                                         // per key
213                                                                                         // to
214                                                                                         // export
215    };
216    assertTrue(runExport(args));
217
218    final String IMPORT_TABLE = name + "import";
219    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3)) {
220      args =
221        new String[] { "-D" + Import.CF_RENAME_PROP + "=" + FAMILYA_STRING + ":" + FAMILYB_STRING,
222          IMPORT_TABLE, FQ_OUTPUT_DIR };
223      assertTrue(runImport(args));
224
225      Get g = new Get(ROW1);
226      g.readAllVersions();
227      Result r = t.get(g);
228      assertEquals(3, r.size());
229      g = new Get(ROW2);
230      g.readAllVersions();
231      r = t.get(g);
232      assertEquals(3, r.size());
233      g = new Get(ROW3);
234      r = t.get(g);
235      assertEquals(0, r.size());
236    }
237  }
238
239  /**
240   * Test export hbase:meta table
241   */
242  @Test
243  public void testMetaExport() throws Throwable {
244    String[] args =
245      new String[] { TableName.META_TABLE_NAME.getNameAsString(), FQ_OUTPUT_DIR, "1", "0", "0" };
246    assertTrue(runExport(args));
247  }
248
249  /**
250   * Test import data from 0.94 exported file
251   */
252  @Test
253  public void testImport94Table() throws Throwable {
254    final String name = "exportedTableIn94Format";
255    URL url = TestImportExportBase.class.getResource(name);
256    File f = new File(url.toURI());
257    if (!f.exists()) {
258      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
259      return;
260    }
261    assertTrue(f.exists());
262    LOG.info("FILE=" + f);
263    Path importPath = new Path(f.toURI());
264    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
265    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
266    String IMPORT_TABLE = name;
267    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3)) {
268      String[] args = new String[] { "-Dhbase.import.version=0.94", IMPORT_TABLE, FQ_OUTPUT_DIR };
269      assertTrue(runImport(args));
270      // @formatter:off
271      // exportedTableIn94Format contains 5 rows
272      // ROW         COLUMN+CELL
273      // r1          column=f1:c1, timestamp=1383766761171, value=val1
274      // r2          column=f1:c1, timestamp=1383766771642, value=val2
275      // r3          column=f1:c1, timestamp=1383766777615, value=val3
276      // r4          column=f1:c1, timestamp=1383766785146, value=val4
277      // r5          column=f1:c1, timestamp=1383766791506, value=val5
278      // @formatter:on
279      assertEquals(5, UTIL.countRows(t));
280    }
281  }
282
283  /**
284   * Test export scanner batching
285   */
286  @Test
287  public void testExportScannerBatching() throws Throwable {
288    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(name))
289      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(1).build())
290      .build();
291    UTIL.getAdmin().createTable(desc);
292    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
293      Put p = new Put(ROW1);
294      p.addColumn(FAMILYA, QUAL, now, QUAL);
295      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
296      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
297      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
298      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
299      t.put(p);
300      // added scanner batching arg.
301      String[] args = new String[] { "-D" + ExportUtils.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,
302        name, FQ_OUTPUT_DIR };
303      assertTrue(runExport(args));
304
305      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
306      fs.delete(new Path(FQ_OUTPUT_DIR), true);
307    }
308  }
309
310  @Test
311  public void testWithDeletes() throws Throwable {
312    TableDescriptor desc = TableDescriptorBuilder
313      .newBuilder(TableName.valueOf(name)).setColumnFamily(ColumnFamilyDescriptorBuilder
314        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
315      .build();
316    UTIL.getAdmin().createTable(desc);
317    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
318      Put p = new Put(ROW1);
319      p.addColumn(FAMILYA, QUAL, now, QUAL);
320      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
321      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
322      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
323      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
324      t.put(p);
325
326      Delete d = new Delete(ROW1, now + 3);
327      t.delete(d);
328      d = new Delete(ROW1);
329      d.addColumns(FAMILYA, QUAL, now + 2);
330      t.delete(d);
331    }
332
333    String[] args =
334      new String[] { "-D" + ExportUtils.RAW_SCAN + "=true", name, FQ_OUTPUT_DIR, "1000", // max
335                                                                                         // number
336                                                                                         // of key
337                                                                                         // versions
338                                                                                         // per key
339                                                                                         // to
340                                                                                         // export
341      };
342    assertTrue(runExport(args));
343
344    final String IMPORT_TABLE = name + "import";
345    desc = TableDescriptorBuilder
346      .newBuilder(TableName.valueOf(IMPORT_TABLE)).setColumnFamily(ColumnFamilyDescriptorBuilder
347        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
348      .build();
349    UTIL.getAdmin().createTable(desc);
350    try (Table t = UTIL.getConnection().getTable(desc.getTableName())) {
351      args = new String[] { IMPORT_TABLE, FQ_OUTPUT_DIR };
352      assertTrue(runImport(args));
353
354      Scan s = new Scan();
355      s.readAllVersions();
356      s.setRaw(true);
357      ResultScanner scanner = t.getScanner(s);
358      Result r = scanner.next();
359      ExtendedCell[] res = ClientInternalHelper.getExtendedRawCells(r);
360      assertTrue(PrivateCellUtil.isDeleteFamily(res[0]));
361      assertEquals(now + 4, res[1].getTimestamp());
362      assertEquals(now + 3, res[2].getTimestamp());
363      assertTrue(CellUtil.isDelete(res[3]));
364      assertEquals(now + 2, res[4].getTimestamp());
365      assertEquals(now + 1, res[5].getTimestamp());
366      assertEquals(now, res[6].getTimestamp());
367    }
368  }
369
370  @Test
371  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Throwable {
372    final TableName exportTable = TableName.valueOf(name);
373    TableDescriptor desc = TableDescriptorBuilder
374      .newBuilder(TableName.valueOf(name)).setColumnFamily(ColumnFamilyDescriptorBuilder
375        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
376      .build();
377    UTIL.getAdmin().createTable(desc);
378
379    Table exportT = UTIL.getConnection().getTable(exportTable);
380
381    // Add first version of QUAL
382    Put p = new Put(ROW1);
383    p.addColumn(FAMILYA, QUAL, now, QUAL);
384    exportT.put(p);
385
386    // Add Delete family marker
387    Delete d = new Delete(ROW1, now + 3);
388    exportT.delete(d);
389
390    // Add second version of QUAL
391    p = new Put(ROW1);
392    p.addColumn(FAMILYA, QUAL, now + 5, Bytes.toBytes("s"));
393    exportT.put(p);
394
395    // Add second Delete family marker
396    d = new Delete(ROW1, now + 7);
397    exportT.delete(d);
398
399    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
400      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
401                                                            // export
402    };
403    assertTrue(runExport(args));
404
405    final String importTable = name + "import";
406    desc = TableDescriptorBuilder
407      .newBuilder(TableName.valueOf(importTable)).setColumnFamily(ColumnFamilyDescriptorBuilder
408        .newBuilder(FAMILYA).setMaxVersions(5).setKeepDeletedCells(KeepDeletedCells.TRUE).build())
409      .build();
410    UTIL.getAdmin().createTable(desc);
411
412    Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable));
413    args = new String[] { importTable, FQ_OUTPUT_DIR };
414    assertTrue(runImport(args));
415
416    Scan s = new Scan();
417    s.readAllVersions();
418    s.setRaw(true);
419
420    ResultScanner importedTScanner = importT.getScanner(s);
421    Result importedTResult = importedTScanner.next();
422
423    ResultScanner exportedTScanner = exportT.getScanner(s);
424    Result exportedTResult = exportedTScanner.next();
425    try {
426      Result.compareResults(exportedTResult, importedTResult);
427    } catch (Throwable e) {
428      fail("Original and imported tables data comparision failed with error:" + e.getMessage());
429    } finally {
430      exportT.close();
431      importT.close();
432    }
433  }
434
435  /**
436   * Create a simple table, run an Export Job on it, Import with filtering on, verify counts,
437   * attempt with invalid values.
438   */
439  @Test
440  public void testWithFilter() throws Throwable {
441    // Create simple table to export
442    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(name))
443      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
444      .build();
445    UTIL.getAdmin().createTable(desc);
446    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
447
448    Put p1 = new Put(ROW1);
449    p1.addColumn(FAMILYA, QUAL, now, QUAL);
450    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
451    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
452    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
453    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
454
455    // Having another row would actually test the filter.
456    Put p2 = new Put(ROW2);
457    p2.addColumn(FAMILYA, QUAL, now, QUAL);
458
459    exportTable.put(Arrays.asList(p1, p2));
460
461    // Export the simple table
462    String[] args = new String[] { name, FQ_OUTPUT_DIR, "1000" };
463    assertTrue(runExport(args));
464
465    // Import to a new table
466    final String IMPORT_TABLE = name + "import";
467    desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
468      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
469      .build();
470    UTIL.getAdmin().createTable(desc);
471
472    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
473    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
474      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
475      "1000" };
476    assertTrue(runImport(args));
477
478    // get the count of the source table for that time range
479    PrefixFilter filter = new PrefixFilter(ROW1);
480    int count = getCount(exportTable, filter);
481
482    assertEquals(count, getCount(importTable, null),
483      "Unexpected row count between export and import tables");
484
485    // and then test that a broken command doesn't bork everything - easier here because we don't
486    // need to re-run the export job
487
488    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
489      "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name, FQ_OUTPUT_DIR,
490      "1000" };
491    assertFalse(runImport(args));
492
493    // cleanup
494    exportTable.close();
495    importTable.close();
496  }
497
498  /**
499   * Create a simple table, run an Export Job on it, Import with bulk output and enable largeResult
500   */
501  @Test
502  public void testBulkImportAndLargeResult() throws Throwable {
503    // Create simple table to export
504    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(name))
505      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
506      .build();
507    UTIL.getAdmin().createTable(desc);
508    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
509
510    Put p1 = new Put(ROW1);
511    p1.addColumn(FAMILYA, QUAL, now, QUAL);
512
513    // Having another row would actually test the filter.
514    Put p2 = new Put(ROW2);
515    p2.addColumn(FAMILYA, QUAL, now, QUAL);
516
517    exportTable.put(Arrays.asList(p1, p2));
518
519    // Export the simple table
520    String[] args = new String[] { name, FQ_OUTPUT_DIR, "1000" };
521    assertTrue(runExport(args));
522
523    // Import to a new table
524    final String IMPORT_TABLE = name + "import";
525    desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
526      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
527      .build();
528    UTIL.getAdmin().createTable(desc);
529
530    String O_OUTPUT_DIR =
531      new Path(OUTPUT_DIR + 1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
532
533    args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" + O_OUTPUT_DIR,
534      "-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
535    assertTrue(runImport(args));
536  }
537
538  /**
539   * Count the number of keyvalues in the specified table with the given filter
540   * @param table the table to scan
541   * @return the number of keyvalues found
542   */
543  private int getCount(Table table, Filter filter) throws IOException {
544    Scan scan = new Scan();
545    scan.setFilter(filter);
546    ResultScanner results = table.getScanner(scan);
547    int count = 0;
548    for (Result res : results) {
549      count += res.size();
550    }
551    results.close();
552    return count;
553  }
554
555  /**
556   * test main method. Import should print help and call System.exit
557   */
558  @Test
559  public void testImportMain() throws Throwable {
560    PrintStream oldPrintStream = System.err;
561    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
562    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
563    System.setSecurityManager(newSecurityManager);
564    ByteArrayOutputStream data = new ByteArrayOutputStream();
565    String[] args = {};
566    System.setErr(new PrintStream(data));
567    try {
568      System.setErr(new PrintStream(data));
569      Import.main(args);
570      fail("should be SecurityException");
571    } catch (SecurityException e) {
572      assertEquals(-1, newSecurityManager.getExitCode());
573      assertTrue(data.toString().contains("Wrong number of arguments:"));
574      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
575      assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
576      assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
577      assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
578    } finally {
579      System.setErr(oldPrintStream);
580      System.setSecurityManager(SECURITY_MANAGER);
581    }
582  }
583
584  @Test
585  public void testExportScan() throws Exception {
586    int version = 100;
587    long startTime = EnvironmentEdgeManager.currentTime();
588    long endTime = startTime + 1;
589    String prefix = "row";
590    String label_0 = "label_0";
591    String label_1 = "label_1";
592    String[] args = { "table", "outputDir", String.valueOf(version), String.valueOf(startTime),
593      String.valueOf(endTime), prefix };
594    Scan scan = ExportUtils.getScanFromCommandLine(UTIL.getConfiguration(), args);
595    assertEquals(version, scan.getMaxVersions());
596    assertEquals(startTime, scan.getTimeRange().getMin());
597    assertEquals(endTime, scan.getTimeRange().getMax());
598    assertEquals(true, (scan.getFilter() instanceof PrefixFilter));
599    assertEquals(0,
600      Bytes.compareTo(((PrefixFilter) scan.getFilter()).getPrefix(), Bytes.toBytesBinary(prefix)));
601    String[] argsWithLabels =
602      { "-D " + ExportUtils.EXPORT_VISIBILITY_LABELS + "=" + label_0 + "," + label_1, "table",
603        "outputDir", String.valueOf(version), String.valueOf(startTime), String.valueOf(endTime),
604        prefix };
605    Configuration conf = new Configuration(UTIL.getConfiguration());
606    // parse the "-D" options
607    String[] otherArgs = new GenericOptionsParser(conf, argsWithLabels).getRemainingArgs();
608    Scan scanWithLabels = ExportUtils.getScanFromCommandLine(conf, otherArgs);
609    assertEquals(version, scanWithLabels.getMaxVersions());
610    assertEquals(startTime, scanWithLabels.getTimeRange().getMin());
611    assertEquals(endTime, scanWithLabels.getTimeRange().getMax());
612    assertEquals(true, (scanWithLabels.getFilter() instanceof PrefixFilter));
613    assertEquals(0, Bytes.compareTo(((PrefixFilter) scanWithLabels.getFilter()).getPrefix(),
614      Bytes.toBytesBinary(prefix)));
615    assertEquals(2, scanWithLabels.getAuthorizations().getLabels().size());
616    assertEquals(label_0, scanWithLabels.getAuthorizations().getLabels().get(0));
617    assertEquals(label_1, scanWithLabels.getAuthorizations().getLabels().get(1));
618  }
619
620  /**
621   * test main method. Export should print help and call System.exit
622   */
623  @Test
624  public void testExportMain() throws Throwable {
625    PrintStream oldPrintStream = System.err;
626    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
627    LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
628    System.setSecurityManager(newSecurityManager);
629    ByteArrayOutputStream data = new ByteArrayOutputStream();
630    String[] args = {};
631    System.setErr(new PrintStream(data));
632    try {
633      System.setErr(new PrintStream(data));
634      runExportMain(args);
635      fail("should be SecurityException");
636    } catch (SecurityException e) {
637      assertEquals(-1, newSecurityManager.getExitCode());
638      String errMsg = data.toString();
639      assertTrue(errMsg.contains("Wrong number of arguments:"));
640      assertTrue(
641        errMsg.contains("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> "
642          + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
643      assertTrue(errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
644      assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true"));
645      assertTrue(errMsg.contains("-D hbase.client.scanner.caching=100"));
646      assertTrue(errMsg.contains("-D hbase.export.scanner.batch=10"));
647      assertTrue(errMsg.contains("-D hbase.export.scanner.caching=100"));
648    } finally {
649      System.setErr(oldPrintStream);
650      System.setSecurityManager(SECURITY_MANAGER);
651    }
652  }
653
654  /**
655   * Test map method of Importer
656   */
657  @SuppressWarnings({ "unchecked", "rawtypes" })
658  @Test
659  public void testKeyValueImporter() throws Throwable {
660    CellImporter importer = new CellImporter();
661    Configuration configuration = new Configuration();
662    Context ctx = mock(Context.class);
663    when(ctx.getConfiguration()).thenReturn(configuration);
664
665    doAnswer(new Answer<Void>() {
666
667      @Override
668      public Void answer(InvocationOnMock invocation) throws Throwable {
669        ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0);
670        MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1);
671        assertEquals("Key", Bytes.toString(writer.get()));
672        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
673        return null;
674      }
675    }).when(ctx).write(any(), any());
676
677    importer.setup(ctx);
678    KeyValue[] keys = {
679      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
680        Bytes.toBytes("value")),
681      new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
682        Bytes.toBytes("value1")) };
683    Result value = Result.create(keys);
684    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
685
686  }
687
688  /**
689   * Test addFilterAndArguments method of Import This method set couple parameters into
690   * Configuration
691   */
692  @Test
693  public void testAddFilterAndArguments() throws IOException {
694    Configuration configuration = new Configuration();
695
696    List<String> args = new ArrayList<>();
697    args.add("param1");
698    args.add("param2");
699
700    Import.addFilterAndArguments(configuration, FilterBase.class, args);
701    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
702      configuration.get(Import.FILTER_CLASS_CONF_KEY));
703    assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
704  }
705
706  @Test
707  public void testDurability() throws Throwable {
708    // Create an export table.
709    String exportTableName = name + "export";
710    try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3)) {
711      // Insert some data
712      Put put = new Put(ROW1);
713      put.addColumn(FAMILYA, QUAL, now, QUAL);
714      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
715      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
716      exportTable.put(put);
717
718      put = new Put(ROW2);
719      put.addColumn(FAMILYA, QUAL, now, QUAL);
720      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
721      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
722      exportTable.put(put);
723
724      // Run the export
725      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000" };
726      assertTrue(runExport(args));
727
728      // Create the table for import
729      String importTableName = name + "import1";
730      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
731
732      // Register the wal listener for the import table
733      RegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
734        .getRegions(importTable.getName()).get(0).getRegionInfo();
735      TableWALActionListener walListener = new TableWALActionListener(region);
736      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
737      wal.registerWALActionsListener(walListener);
738
739      // Run the import with SKIP_WAL
740      args = new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
741        importTableName, FQ_OUTPUT_DIR };
742      assertTrue(runImport(args));
743      // Assert that the wal is not visisted
744      assertTrue(!walListener.isWALVisited());
745      // Ensure that the count is 2 (only one version of key value is obtained)
746      assertTrue(getCount(importTable, null) == 2);
747
748      // Run the import with the default durability option
749      importTableName = name + "import2";
750      importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
751      region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
752        .getRegions(importTable.getName()).get(0).getRegionInfo();
753      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
754      walListener = new TableWALActionListener(region);
755      wal.registerWALActionsListener(walListener);
756      args = new String[] { importTableName, FQ_OUTPUT_DIR };
757      assertTrue(runImport(args));
758      // Assert that the wal is visisted
759      assertTrue(walListener.isWALVisited());
760      // Ensure that the count is 2 (only one version of key value is obtained)
761      assertTrue(getCount(importTable, null) == 2);
762    }
763  }
764
765  /**
766   * This listens to the {@link #visitLogEntryBeforeWrite(RegionInfo, WALKey, WALEdit)} to identify
767   * that an entry is written to the Write Ahead Log for the given table.
768   */
769  private static class TableWALActionListener implements WALActionsListener {
770
771    private RegionInfo regionInfo;
772    private boolean isVisited = false;
773
774    public TableWALActionListener(RegionInfo region) {
775      this.regionInfo = region;
776    }
777
778    @Override
779    public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
780      if (
781        logKey.getTableName().getNameAsString()
782          .equalsIgnoreCase(this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())
783      ) {
784        isVisited = true;
785      }
786    }
787
788    public boolean isWALVisited() {
789      return isVisited;
790    }
791  }
792
793  /**
794   * Add cell tags to delete mutations, run export and import tool and verify that tags are present
795   * in import table also.
796   * @throws Throwable throws Throwable.
797   */
798  @Test
799  public void testTagsAddition() throws Throwable {
800    final TableName exportTable = TableName.valueOf(name);
801    TableDescriptor desc = TableDescriptorBuilder.newBuilder(exportTable)
802      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
803        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
804      .setCoprocessor(MetadataController.class.getName()).build();
805    UTIL.getAdmin().createTable(desc);
806
807    Table exportT = UTIL.getConnection().getTable(exportTable);
808
809    // Add first version of QUAL
810    Put p = new Put(ROW1);
811    p.addColumn(FAMILYA, QUAL, now, QUAL);
812    exportT.put(p);
813
814    // Add Delete family marker
815    Delete d = new Delete(ROW1, now + 3);
816    // Add test attribute to delete mutation.
817    d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
818    exportT.delete(d);
819
820    // Run export tool with KeyValueCodecWithTags as Codec. This will ensure that export tool
821    // will use KeyValueCodecWithTags.
822    String[] args = new String[] { "-D" + ExportUtils.RAW_SCAN + "=true",
823      // This will make sure that codec will encode and decode tags in rpc call.
824      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
825      exportTable.getNameAsString(), FQ_OUTPUT_DIR, "1000", // max number of key versions per key to
826                                                            // export
827    };
828    assertTrue(runExport(args));
829    // Assert tag exists in exportTable
830    checkWhetherTagExists(exportTable, true);
831
832    // Create an import table with MetadataController.
833    final TableName importTable = TableName.valueOf("importWithTestTagsAddition");
834    TableDescriptor importTableDesc = TableDescriptorBuilder.newBuilder(importTable)
835      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
836        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
837      .setCoprocessor(MetadataController.class.getName()).build();
838    UTIL.getAdmin().createTable(importTableDesc);
839
840    // Run import tool.
841    args = new String[] {
842      // This will make sure that codec will encode and decode tags in rpc call.
843      "-Dhbase.client.rpc.codec=org.apache.hadoop.hbase.codec.KeyValueCodecWithTags",
844      importTable.getNameAsString(), FQ_OUTPUT_DIR };
845    assertTrue(runImport(args));
846    // Make sure that tags exists in imported table.
847    checkWhetherTagExists(importTable, true);
848  }
849
850  private void checkWhetherTagExists(TableName table, boolean tagExists) throws IOException {
851    List<ExtendedCell> values = new ArrayList<>();
852    for (HRegion region : UTIL.getHBaseCluster().getRegions(table)) {
853      Scan scan = new Scan();
854      // Make sure to set rawScan to true so that we will get Delete Markers.
855      scan.setRaw(true);
856      scan.readAllVersions();
857      scan.withStartRow(ROW1);
858      // Need to use RegionScanner instead of table#getScanner since the latter will
859      // not return tags since it will go through rpc layer and remove tags intentionally.
860      RegionScanner scanner = region.getScanner(scan);
861      scanner.next(values);
862      if (!values.isEmpty()) {
863        break;
864      }
865    }
866    boolean deleteFound = false;
867    for (ExtendedCell cell : values) {
868      if (PrivateCellUtil.isDelete(cell.getType().getCode())) {
869        deleteFound = true;
870        List<Tag> tags = PrivateCellUtil.getTags(cell);
871        // If tagExists flag is true then validate whether tag contents are as expected.
872        if (tagExists) {
873          assertEquals(1, tags.size());
874          for (Tag tag : tags) {
875            assertEquals(TEST_TAG, Tag.getValueAsString(tag));
876          }
877        } else {
878          // If tagExists flag is disabled then check for 0 size tags.
879          assertEquals(0, tags.size());
880        }
881      }
882    }
883    assertTrue(deleteFound);
884  }
885
886  /*
887   * This co-proc will add a cell tag to delete mutation.
888   */
889  public static class MetadataController implements RegionCoprocessor, RegionObserver {
890    @Override
891    public Optional<RegionObserver> getRegionObserver() {
892      return Optional.of(this);
893    }
894
895    @Override
896    public void preBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c,
897      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
898      if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
899        return;
900      }
901      for (int i = 0; i < miniBatchOp.size(); i++) {
902        Mutation m = miniBatchOp.getOperation(i);
903        if (!(m instanceof Delete)) {
904          continue;
905        }
906        byte[] sourceOpAttr = m.getAttribute(TEST_ATTR);
907        if (sourceOpAttr == null) {
908          continue;
909        }
910        Tag sourceOpTag = new ArrayBackedTag(TEST_TAG_TYPE, sourceOpAttr);
911        List<Cell> updatedCells = new ArrayList<>();
912        for (ExtendedCellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
913          ExtendedCell cell = cellScanner.current();
914          List<Tag> tags = PrivateCellUtil.getTags(cell);
915          tags.add(sourceOpTag);
916          Cell updatedCell = PrivateCellUtil.createCell(cell, tags);
917          updatedCells.add(updatedCell);
918        }
919        m.getFamilyCellMap().clear();
920        // Clear and add new Cells to the Mutation.
921        for (Cell cell : updatedCells) {
922          Delete d = (Delete) m;
923          d.add(cell);
924        }
925      }
926    }
927  }
928
929  /**
930   * Set hbase.client.rpc.codec and hbase.client.default.rpc.codec both to empty string This means
931   * it will use no Codec. Make sure that we don't return Tags in response.
932   * @throws Exception Exception
933   */
934  @Test
935  public void testTagsWithEmptyCodec() throws Exception {
936    TableName tableName = TableName.valueOf(name);
937    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
938      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5)
939        .setKeepDeletedCells(KeepDeletedCells.TRUE).build())
940      .setCoprocessor(MetadataController.class.getName()).build();
941    UTIL.getAdmin().createTable(tableDesc);
942    Configuration conf = new Configuration(UTIL.getConfiguration());
943    conf.set(RPC_CODEC_CONF_KEY, "");
944    conf.set(DEFAULT_CODEC_CLASS, "");
945    try (Connection connection = ConnectionFactory.createConnection(conf);
946      Table table = connection.getTable(tableName)) {
947      // Add first version of QUAL
948      Put p = new Put(ROW1);
949      p.addColumn(FAMILYA, QUAL, now, QUAL);
950      table.put(p);
951
952      // Add Delete family marker
953      Delete d = new Delete(ROW1, now + 3);
954      // Add test attribute to delete mutation.
955      d.setAttribute(TEST_ATTR, Bytes.toBytes(TEST_TAG));
956      table.delete(d);
957
958      // Since RPC_CODEC_CONF_KEY and DEFAULT_CODEC_CLASS is set to empty, it will use
959      // empty Codec and it shouldn't encode/decode tags.
960      Scan scan = new Scan().withStartRow(ROW1).setRaw(true);
961      ResultScanner scanner = table.getScanner(scan);
962      int count = 0;
963      Result result;
964      while ((result = scanner.next()) != null) {
965        List<ExtendedCell> cells = Arrays.asList(ClientInternalHelper.getExtendedRawCells(result));
966        assertEquals(2, cells.size());
967        ExtendedCell cell = cells.get(0);
968        assertTrue(CellUtil.isDelete(cell));
969        List<Tag> tags = PrivateCellUtil.getTags(cell);
970        assertEquals(0, tags.size());
971        count++;
972      }
973      assertEquals(1, count);
974    } finally {
975      UTIL.deleteTable(tableName);
976    }
977  }
978}