001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.coprocessor;
020
021import com.google.protobuf.RpcCallback;
022import com.google.protobuf.RpcController;
023import com.google.protobuf.Service;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Map;
033import java.util.TreeMap;
034
035import org.apache.commons.lang3.ArrayUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CoprocessorEnvironment;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseInterfaceAudience;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
051import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
052import org.apache.hadoop.hbase.ipc.RpcServer;
053import org.apache.hadoop.hbase.ipc.ServerRpcController;
054import org.apache.hadoop.hbase.mapreduce.ExportUtils;
055import org.apache.hadoop.hbase.mapreduce.Import;
056import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
057import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
058import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
059import org.apache.hadoop.hbase.protobuf.generated.ExportProtos;
060import org.apache.hadoop.hbase.regionserver.HRegion;
061import org.apache.hadoop.hbase.regionserver.InternalScanner;
062import org.apache.hadoop.hbase.regionserver.Region;
063import org.apache.hadoop.hbase.regionserver.RegionScanner;
064import org.apache.hadoop.hbase.security.User;
065import org.apache.hadoop.hbase.security.UserProvider;
066import org.apache.hadoop.hbase.security.token.FsDelegationToken;
067import org.apache.hadoop.hbase.util.ByteStringer;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.Triple;
070import org.apache.hadoop.io.SequenceFile;
071import org.apache.hadoop.io.Text;
072import org.apache.hadoop.io.compress.CompressionCodec;
073import org.apache.hadoop.io.compress.DefaultCodec;
074import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
075import org.apache.hadoop.security.token.Token;
076import org.apache.hadoop.util.GenericOptionsParser;
077import org.apache.hadoop.util.ReflectionUtils;
078import org.apache.yetus.audience.InterfaceAudience;
079import org.apache.yetus.audience.InterfaceStability;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
083
084/**
085 * Export an HBase table. Writes content to sequence files up in HDFS. Use
086 * {@link Import} to read it back in again. It is implemented by the endpoint
087 * technique.
088 *
089 * @see org.apache.hadoop.hbase.mapreduce.Export
090 */
091@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
092@InterfaceStability.Evolving
093public class Export extends ExportProtos.ExportService implements RegionCoprocessor {
094  private static final Logger LOG = LoggerFactory.getLogger(Export.class);
095  private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class;
096  private static final SequenceFile.CompressionType DEFAULT_TYPE =
097      SequenceFile.CompressionType.RECORD;
098  private RegionCoprocessorEnvironment env = null;
099  private UserProvider userProvider;
100
101  public static void main(String[] args) throws Throwable {
102    Map<byte[], Response> response = run(HBaseConfiguration.create(), args);
103    System.exit(response == null ? -1 : 0);
104  }
105
106  @VisibleForTesting
107  static Map<byte[], Response> run(final Configuration conf, final String[] args) throws Throwable {
108    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
109    if (!ExportUtils.isValidArguements(args)) {
110      ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.getLength(otherArgs));
111      return null;
112    }
113    Triple<TableName, Scan, Path> arguments =
114        ExportUtils.getArgumentsFromCommandLine(conf, otherArgs);
115    return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
116  }
117
118  public static Map<byte[], Response> run(final Configuration conf, TableName tableName,
119      Scan scan, Path dir) throws Throwable {
120    FileSystem fs = dir.getFileSystem(conf);
121    UserProvider userProvider = UserProvider.instantiate(conf);
122    checkDir(fs, dir);
123    FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
124    fsDelegationToken.acquireDelegationToken(fs);
125    try {
126      final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir,
127        scan, fsDelegationToken.getUserToken());
128      try (Connection con = ConnectionFactory.createConnection(conf);
129              Table table = con.getTable(tableName)) {
130        Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR);
131        table.coprocessorService(ExportProtos.ExportService.class,
132          scan.getStartRow(),
133          scan.getStopRow(),
134          (ExportProtos.ExportService service) -> {
135            ServerRpcController controller = new ServerRpcController();
136            Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
137            CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse>
138              rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
139            service.export(controller, request, rpcCallback);
140            if (controller.failedOnException()) {
141              throw controller.getFailedOn();
142            }
143            return rpcCallback.get();
144          }).forEach((k, v) -> result.put(k, new Response(v)));
145        return result;
146      } catch (Throwable e) {
147        fs.delete(dir, true);
148        throw e;
149      }
150    } finally {
151      fsDelegationToken.releaseDelegationToken();
152    }
153  }
154
155  private static boolean getCompression(final ExportProtos.ExportRequest request) {
156    if (request.hasCompressed()) {
157      return request.getCompressed();
158    } else {
159      return false;
160    }
161  }
162
163  private static SequenceFile.CompressionType getCompressionType(
164      final ExportProtos.ExportRequest request) {
165    if (request.hasCompressType()) {
166      return SequenceFile.CompressionType.valueOf(request.getCompressType());
167    } else {
168      return DEFAULT_TYPE;
169    }
170  }
171
172  private static CompressionCodec getCompressionCodec(final Configuration conf,
173      final ExportProtos.ExportRequest request) {
174    try {
175      Class<? extends CompressionCodec> codecClass;
176      if (request.hasCompressCodec()) {
177        codecClass = conf.getClassByName(request.getCompressCodec())
178            .asSubclass(CompressionCodec.class);
179      } else {
180        codecClass = DEFAULT_CODEC;
181      }
182      return ReflectionUtils.newInstance(codecClass, conf);
183    } catch (ClassNotFoundException e) {
184      throw new IllegalArgumentException("Compression codec "
185              + request.getCompressCodec() + " was not found.", e);
186    }
187  }
188
189  private static SequenceFile.Writer.Option getOutputPath(final Configuration conf,
190          final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
191    Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName());
192    FileSystem fs = file.getFileSystem(conf);
193    if (fs.exists(file)) {
194      throw new IOException(file + " exists");
195    }
196    return SequenceFile.Writer.file(file);
197  }
198
199  private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
200          final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
201    List<SequenceFile.Writer.Option> rval = new LinkedList<>();
202    rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
203    rval.add(SequenceFile.Writer.valueClass(Result.class));
204    rval.add(getOutputPath(conf, info, request));
205    if (getCompression(request)) {
206      rval.add(SequenceFile.Writer.compression(getCompressionType(request),
207          getCompressionCodec(conf, request)));
208    } else {
209      rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
210    }
211    return rval;
212  }
213
214  private static ExportProtos.ExportResponse processData(final Region region,
215      final Configuration conf, final UserProvider userProvider, final Scan scan,
216      final Token userToken, final List<SequenceFile.Writer.Option> opts) throws IOException {
217    ScanCoprocessor cp = new ScanCoprocessor(region);
218    RegionScanner scanner = null;
219    try (RegionOp regionOp = new RegionOp(region);
220            SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) {
221      scanner = cp.checkScannerOpen(scan);
222      ImmutableBytesWritable key = new ImmutableBytesWritable();
223      long rowCount = 0;
224      long cellCount = 0;
225      List<Result> results = new ArrayList<>();
226      List<Cell> cells = new ArrayList<>();
227      boolean hasMore;
228      do {
229        boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch());
230        if (bypass) {
231          hasMore = false;
232        } else {
233          hasMore = scanner.nextRaw(cells);
234          if (cells.isEmpty()) {
235            continue;
236          }
237          Cell firstCell = cells.get(0);
238          for (Cell cell : cells) {
239            if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
240                firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(),
241                cell.getRowLength()) != 0) {
242              throw new IOException("Why the RegionScanner#nextRaw returns the data of different"
243                  + " rows?? first row="
244                  + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(),
245                    firstCell.getRowLength())
246                  + ", current row="
247                  + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
248            }
249          }
250          results.add(Result.create(cells));
251          cells.clear();
252          cp.postScannerNext(scanner, results, scan.getBatch(), hasMore);
253        }
254        for (Result r : results) {
255          key.set(r.getRow());
256          out.append(key, r);
257          ++rowCount;
258          cellCount += r.size();
259        }
260        results.clear();
261      } while (hasMore);
262      return ExportProtos.ExportResponse.newBuilder()
263              .setRowCount(rowCount)
264              .setCellCount(cellCount)
265              .build();
266    } finally {
267      cp.checkScannerClose(scanner);
268    }
269  }
270
271  private static void checkDir(final FileSystem fs, final Path dir) throws IOException {
272    if (fs.exists(dir)) {
273      throw new RuntimeException("The " + dir + " exists");
274    }
275    if (!fs.mkdirs(dir)) {
276      throw new IOException("Failed to create the " + dir);
277    }
278  }
279
280  private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf,
281          Path dir, final Scan scan, final Token<?> userToken) throws IOException {
282    boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false);
283    String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE,
284            DEFAULT_TYPE.toString());
285    String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC,
286            DEFAULT_CODEC.getName());
287    DelegationToken protoToken = null;
288    if (userToken != null) {
289      protoToken = DelegationToken.newBuilder()
290              .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
291              .setPassword(ByteStringer.wrap(userToken.getPassword()))
292              .setKind(userToken.getKind().toString())
293              .setService(userToken.getService().toString()).build();
294    }
295    LOG.info("compressed=" + compressed
296            + ", compression type=" + compressionType
297            + ", compression codec=" + compressionCodec
298            + ", userToken=" + userToken);
299    ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder()
300            .setScan(ProtobufUtil.toScan(scan))
301            .setOutputPath(dir.toString())
302            .setCompressed(compressed)
303            .setCompressCodec(compressionCodec)
304            .setCompressType(compressionType);
305    if (protoToken != null) {
306      builder.setFsToken(protoToken);
307    }
308    return builder.build();
309  }
310
311  @Override
312  public void start(CoprocessorEnvironment environment) throws IOException {
313    if (environment instanceof RegionCoprocessorEnvironment) {
314      env = (RegionCoprocessorEnvironment) environment;
315      userProvider = UserProvider.instantiate(env.getConfiguration());
316    } else {
317      throw new CoprocessorException("Must be loaded on a table region!");
318    }
319  }
320
321  @Override
322  public void stop(CoprocessorEnvironment env) throws IOException {
323  }
324
325  @Override
326  public Iterable<Service> getServices() {
327    return Collections.singleton(this);
328  }
329
330  @Override
331  public void export(RpcController controller, ExportProtos.ExportRequest request,
332          RpcCallback<ExportProtos.ExportResponse> done) {
333    Region region = env.getRegion();
334    Configuration conf = HBaseConfiguration.create(env.getConfiguration());
335    conf.setStrings("io.serializations", conf.get("io.serializations"),
336        ResultSerialization.class.getName());
337    try {
338      Scan scan = validateKey(region.getRegionInfo(), request);
339      Token userToken = null;
340      if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) {
341        LOG.warn("Hadoop security is enable, but no found of user token");
342      } else if (userProvider.isHadoopSecurityEnabled()) {
343        userToken = new Token(request.getFsToken().getIdentifier().toByteArray(),
344                request.getFsToken().getPassword().toByteArray(),
345                new Text(request.getFsToken().getKind()),
346                new Text(request.getFsToken().getService()));
347      }
348      ExportProtos.ExportResponse response = processData(region, conf, userProvider,
349        scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request));
350      done.run(response);
351    } catch (IOException e) {
352      CoprocessorRpcUtils.setControllerException(controller, e);
353      LOG.error(e.toString(), e);
354    }
355  }
356
357  private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request)
358      throws IOException {
359    Scan scan = ProtobufUtil.toScan(request.getScan());
360    byte[] regionStartKey = region.getStartKey();
361    byte[] originStartKey = scan.getStartRow();
362    if (originStartKey == null
363            || Bytes.compareTo(originStartKey, regionStartKey) < 0) {
364      scan.setStartRow(regionStartKey);
365    }
366    byte[] regionEndKey = region.getEndKey();
367    byte[] originEndKey = scan.getStopRow();
368    if (originEndKey == null
369            || Bytes.compareTo(originEndKey, regionEndKey) > 0) {
370      scan.setStartRow(regionEndKey);
371    }
372    return scan;
373  }
374
375  private static class RegionOp implements Closeable {
376    private final Region region;
377
378    RegionOp(final Region region) throws IOException {
379      this.region = region;
380      region.startRegionOperation();
381    }
382
383    @Override
384    public void close() throws IOException {
385      region.closeRegionOperation();
386    }
387  }
388
389  private static class ScanCoprocessor {
390    private final HRegion region;
391
392    ScanCoprocessor(final Region region) {
393      this.region = (HRegion) region;
394    }
395
396    RegionScanner checkScannerOpen(final Scan scan) throws IOException {
397      RegionScanner scanner;
398      if (region.getCoprocessorHost() == null) {
399        scanner = region.getScanner(scan);
400      } else {
401        region.getCoprocessorHost().preScannerOpen(scan);
402        scanner = region.getScanner(scan);
403        scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
404      }
405      if (scanner == null) {
406        throw new IOException("Failed to open region scanner");
407      }
408      return scanner;
409    }
410
411    void checkScannerClose(final InternalScanner s) throws IOException {
412      if (s == null) {
413        return;
414      }
415      if (region.getCoprocessorHost() == null) {
416        s.close();
417        return;
418      }
419      region.getCoprocessorHost().preScannerClose(s);
420      try {
421        s.close();
422      } finally {
423        region.getCoprocessorHost().postScannerClose(s);
424      }
425    }
426
427    boolean preScannerNext(final InternalScanner s,
428            final List<Result> results, final int limit) throws IOException {
429      if (region.getCoprocessorHost() == null) {
430        return false;
431      } else {
432        Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, limit);
433        return bypass == null ? false : bypass;
434      }
435    }
436
437    boolean postScannerNext(final InternalScanner s,
438            final List<Result> results, final int limit, boolean hasMore)
439            throws IOException {
440      if (region.getCoprocessorHost() == null) {
441        return false;
442      } else {
443        return region.getCoprocessorHost().postScannerNext(s, results, limit, hasMore);
444      }
445    }
446  }
447
448  private static class SecureWriter implements Closeable {
449    private final PrivilegedWriter privilegedWriter;
450
451    SecureWriter(final Configuration conf, final UserProvider userProvider,
452        final Token userToken, final List<SequenceFile.Writer.Option> opts)
453        throws IOException {
454      User user = getActiveUser(userProvider, userToken);
455      try {
456        SequenceFile.Writer sequenceFileWriter =
457            user.runAs((PrivilegedExceptionAction<SequenceFile.Writer>) () ->
458                SequenceFile.createWriter(conf,
459                    opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
460        privilegedWriter = new PrivilegedWriter(user, sequenceFileWriter);
461      } catch (InterruptedException e) {
462        throw new IOException(e);
463      }
464    }
465
466    void append(final Object key, final Object value) throws IOException {
467      privilegedWriter.append(key, value);
468    }
469
470    private static User getActiveUser(final UserProvider userProvider, final Token userToken)
471        throws IOException {
472      User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
473      if (user == null && userToken != null) {
474        LOG.warn("No found of user credentials, but a token was got from user request");
475      } else if (user != null && userToken != null) {
476        user.addToken(userToken);
477      }
478      return user;
479    }
480
481    @Override
482    public void close() throws IOException {
483      privilegedWriter.close();
484    }
485  }
486
487  private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>,
488          Closeable {
489    private final User user;
490    private final SequenceFile.Writer out;
491    private Object key;
492    private Object value;
493
494    PrivilegedWriter(final User user, final SequenceFile.Writer out) {
495      this.user = user;
496      this.out = out;
497    }
498
499    void append(final Object key, final Object value) throws IOException {
500      if (user == null) {
501        out.append(key, value);
502      } else {
503        this.key = key;
504        this.value = value;
505        try {
506          user.runAs(this);
507        } catch (InterruptedException ex) {
508          throw new IOException(ex);
509        }
510      }
511    }
512
513    @Override
514    public Boolean run() throws Exception {
515      out.append(key, value);
516      return true;
517    }
518
519    @Override
520    public void close() throws IOException {
521      out.close();
522    }
523  }
524
525  public final static class Response {
526    private final long rowCount;
527    private final long cellCount;
528
529    private Response(ExportProtos.ExportResponse r) {
530      this.rowCount = r.getRowCount();
531      this.cellCount = r.getCellCount();
532    }
533
534    public long getRowCount() {
535      return rowCount;
536    }
537
538    public long getCellCount() {
539      return cellCount;
540    }
541
542    @Override
543    public String toString() {
544      StringBuilder builder = new StringBuilder(35);
545      return builder.append("rowCount=")
546             .append(rowCount)
547             .append(", cellCount=")
548             .append(cellCount)
549             .toString();
550    }
551  }
552}