001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.coprocessor; 020 021import com.google.protobuf.RpcCallback; 022import com.google.protobuf.RpcController; 023import com.google.protobuf.Service; 024 025import java.io.Closeable; 026import java.io.IOException; 027import java.security.PrivilegedExceptionAction; 028import java.util.ArrayList; 029import java.util.Collections; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Map; 033import java.util.TreeMap; 034 035import org.apache.commons.lang3.ArrayUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CoprocessorEnvironment; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseInterfaceAudience; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 051import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 052import org.apache.hadoop.hbase.ipc.RpcServer; 053import org.apache.hadoop.hbase.ipc.ServerRpcController; 054import org.apache.hadoop.hbase.mapreduce.ExportUtils; 055import org.apache.hadoop.hbase.mapreduce.Import; 056import org.apache.hadoop.hbase.mapreduce.ResultSerialization; 057import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 058import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken; 059import org.apache.hadoop.hbase.protobuf.generated.ExportProtos; 060import org.apache.hadoop.hbase.regionserver.HRegion; 061import org.apache.hadoop.hbase.regionserver.InternalScanner; 062import org.apache.hadoop.hbase.regionserver.Region; 063import org.apache.hadoop.hbase.regionserver.RegionScanner; 064import org.apache.hadoop.hbase.security.User; 065import org.apache.hadoop.hbase.security.UserProvider; 066import org.apache.hadoop.hbase.security.token.FsDelegationToken; 067import org.apache.hadoop.hbase.util.ByteStringer; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.Triple; 070import org.apache.hadoop.io.SequenceFile; 071import org.apache.hadoop.io.Text; 072import org.apache.hadoop.io.compress.CompressionCodec; 073import org.apache.hadoop.io.compress.DefaultCodec; 074import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 075import org.apache.hadoop.security.token.Token; 076import org.apache.hadoop.util.GenericOptionsParser; 077import org.apache.hadoop.util.ReflectionUtils; 078import org.apache.yetus.audience.InterfaceAudience; 079import org.apache.yetus.audience.InterfaceStability; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 083 084/** 085 * Export an HBase table. Writes content to sequence files up in HDFS. Use 086 * {@link Import} to read it back in again. It is implemented by the endpoint 087 * technique. 088 * 089 * @see org.apache.hadoop.hbase.mapreduce.Export 090 */ 091@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) 092@InterfaceStability.Evolving 093public class Export extends ExportProtos.ExportService implements RegionCoprocessor { 094 private static final Logger LOG = LoggerFactory.getLogger(Export.class); 095 private static final Class<? extends CompressionCodec> DEFAULT_CODEC = DefaultCodec.class; 096 private static final SequenceFile.CompressionType DEFAULT_TYPE = 097 SequenceFile.CompressionType.RECORD; 098 private RegionCoprocessorEnvironment env = null; 099 private UserProvider userProvider; 100 101 public static void main(String[] args) throws Throwable { 102 Map<byte[], Response> response = run(HBaseConfiguration.create(), args); 103 System.exit(response == null ? -1 : 0); 104 } 105 106 @VisibleForTesting 107 static Map<byte[], Response> run(final Configuration conf, final String[] args) throws Throwable { 108 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 109 if (!ExportUtils.isValidArguements(args)) { 110 ExportUtils.usage("Wrong number of arguments: " + ArrayUtils.getLength(otherArgs)); 111 return null; 112 } 113 Triple<TableName, Scan, Path> arguments = 114 ExportUtils.getArgumentsFromCommandLine(conf, otherArgs); 115 return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird()); 116 } 117 118 public static Map<byte[], Response> run(final Configuration conf, TableName tableName, 119 Scan scan, Path dir) throws Throwable { 120 FileSystem fs = dir.getFileSystem(conf); 121 UserProvider userProvider = UserProvider.instantiate(conf); 122 checkDir(fs, dir); 123 FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 124 fsDelegationToken.acquireDelegationToken(fs); 125 try { 126 final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir, 127 scan, fsDelegationToken.getUserToken()); 128 try (Connection con = ConnectionFactory.createConnection(conf); 129 Table table = con.getTable(tableName)) { 130 Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR); 131 table.coprocessorService(ExportProtos.ExportService.class, 132 scan.getStartRow(), 133 scan.getStopRow(), 134 (ExportProtos.ExportService service) -> { 135 ServerRpcController controller = new ServerRpcController(); 136 Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR); 137 CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse> 138 rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); 139 service.export(controller, request, rpcCallback); 140 if (controller.failedOnException()) { 141 throw controller.getFailedOn(); 142 } 143 return rpcCallback.get(); 144 }).forEach((k, v) -> result.put(k, new Response(v))); 145 return result; 146 } catch (Throwable e) { 147 fs.delete(dir, true); 148 throw e; 149 } 150 } finally { 151 fsDelegationToken.releaseDelegationToken(); 152 } 153 } 154 155 private static boolean getCompression(final ExportProtos.ExportRequest request) { 156 if (request.hasCompressed()) { 157 return request.getCompressed(); 158 } else { 159 return false; 160 } 161 } 162 163 private static SequenceFile.CompressionType getCompressionType( 164 final ExportProtos.ExportRequest request) { 165 if (request.hasCompressType()) { 166 return SequenceFile.CompressionType.valueOf(request.getCompressType()); 167 } else { 168 return DEFAULT_TYPE; 169 } 170 } 171 172 private static CompressionCodec getCompressionCodec(final Configuration conf, 173 final ExportProtos.ExportRequest request) { 174 try { 175 Class<? extends CompressionCodec> codecClass; 176 if (request.hasCompressCodec()) { 177 codecClass = conf.getClassByName(request.getCompressCodec()) 178 .asSubclass(CompressionCodec.class); 179 } else { 180 codecClass = DEFAULT_CODEC; 181 } 182 return ReflectionUtils.newInstance(codecClass, conf); 183 } catch (ClassNotFoundException e) { 184 throw new IllegalArgumentException("Compression codec " 185 + request.getCompressCodec() + " was not found.", e); 186 } 187 } 188 189 private static SequenceFile.Writer.Option getOutputPath(final Configuration conf, 190 final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException { 191 Path file = new Path(request.getOutputPath(), "export-" + info.getEncodedName()); 192 FileSystem fs = file.getFileSystem(conf); 193 if (fs.exists(file)) { 194 throw new IOException(file + " exists"); 195 } 196 return SequenceFile.Writer.file(file); 197 } 198 199 private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf, 200 final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException { 201 List<SequenceFile.Writer.Option> rval = new LinkedList<>(); 202 rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class)); 203 rval.add(SequenceFile.Writer.valueClass(Result.class)); 204 rval.add(getOutputPath(conf, info, request)); 205 if (getCompression(request)) { 206 rval.add(SequenceFile.Writer.compression(getCompressionType(request), 207 getCompressionCodec(conf, request))); 208 } else { 209 rval.add(SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); 210 } 211 return rval; 212 } 213 214 private static ExportProtos.ExportResponse processData(final Region region, 215 final Configuration conf, final UserProvider userProvider, final Scan scan, 216 final Token userToken, final List<SequenceFile.Writer.Option> opts) throws IOException { 217 ScanCoprocessor cp = new ScanCoprocessor(region); 218 RegionScanner scanner = null; 219 try (RegionOp regionOp = new RegionOp(region); 220 SecureWriter out = new SecureWriter(conf, userProvider, userToken, opts)) { 221 scanner = cp.checkScannerOpen(scan); 222 ImmutableBytesWritable key = new ImmutableBytesWritable(); 223 long rowCount = 0; 224 long cellCount = 0; 225 List<Result> results = new ArrayList<>(); 226 List<Cell> cells = new ArrayList<>(); 227 boolean hasMore; 228 do { 229 boolean bypass = cp.preScannerNext(scanner, results, scan.getBatch()); 230 if (bypass) { 231 hasMore = false; 232 } else { 233 hasMore = scanner.nextRaw(cells); 234 if (cells.isEmpty()) { 235 continue; 236 } 237 Cell firstCell = cells.get(0); 238 for (Cell cell : cells) { 239 if (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), 240 firstCell.getRowLength(), cell.getRowArray(), cell.getRowOffset(), 241 cell.getRowLength()) != 0) { 242 throw new IOException("Why the RegionScanner#nextRaw returns the data of different" 243 + " rows?? first row=" 244 + Bytes.toHex(firstCell.getRowArray(), firstCell.getRowOffset(), 245 firstCell.getRowLength()) 246 + ", current row=" 247 + Bytes.toHex(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 248 } 249 } 250 results.add(Result.create(cells)); 251 cells.clear(); 252 cp.postScannerNext(scanner, results, scan.getBatch(), hasMore); 253 } 254 for (Result r : results) { 255 key.set(r.getRow()); 256 out.append(key, r); 257 ++rowCount; 258 cellCount += r.size(); 259 } 260 results.clear(); 261 } while (hasMore); 262 return ExportProtos.ExportResponse.newBuilder() 263 .setRowCount(rowCount) 264 .setCellCount(cellCount) 265 .build(); 266 } finally { 267 cp.checkScannerClose(scanner); 268 } 269 } 270 271 private static void checkDir(final FileSystem fs, final Path dir) throws IOException { 272 if (fs.exists(dir)) { 273 throw new RuntimeException("The " + dir + " exists"); 274 } 275 if (!fs.mkdirs(dir)) { 276 throw new IOException("Failed to create the " + dir); 277 } 278 } 279 280 private static ExportProtos.ExportRequest getConfiguredRequest(Configuration conf, 281 Path dir, final Scan scan, final Token<?> userToken) throws IOException { 282 boolean compressed = conf.getBoolean(FileOutputFormat.COMPRESS, false); 283 String compressionType = conf.get(FileOutputFormat.COMPRESS_TYPE, 284 DEFAULT_TYPE.toString()); 285 String compressionCodec = conf.get(FileOutputFormat.COMPRESS_CODEC, 286 DEFAULT_CODEC.getName()); 287 DelegationToken protoToken = null; 288 if (userToken != null) { 289 protoToken = DelegationToken.newBuilder() 290 .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) 291 .setPassword(ByteStringer.wrap(userToken.getPassword())) 292 .setKind(userToken.getKind().toString()) 293 .setService(userToken.getService().toString()).build(); 294 } 295 LOG.info("compressed=" + compressed 296 + ", compression type=" + compressionType 297 + ", compression codec=" + compressionCodec 298 + ", userToken=" + userToken); 299 ExportProtos.ExportRequest.Builder builder = ExportProtos.ExportRequest.newBuilder() 300 .setScan(ProtobufUtil.toScan(scan)) 301 .setOutputPath(dir.toString()) 302 .setCompressed(compressed) 303 .setCompressCodec(compressionCodec) 304 .setCompressType(compressionType); 305 if (protoToken != null) { 306 builder.setFsToken(protoToken); 307 } 308 return builder.build(); 309 } 310 311 @Override 312 public void start(CoprocessorEnvironment environment) throws IOException { 313 if (environment instanceof RegionCoprocessorEnvironment) { 314 env = (RegionCoprocessorEnvironment) environment; 315 userProvider = UserProvider.instantiate(env.getConfiguration()); 316 } else { 317 throw new CoprocessorException("Must be loaded on a table region!"); 318 } 319 } 320 321 @Override 322 public void stop(CoprocessorEnvironment env) throws IOException { 323 } 324 325 @Override 326 public Iterable<Service> getServices() { 327 return Collections.singleton(this); 328 } 329 330 @Override 331 public void export(RpcController controller, ExportProtos.ExportRequest request, 332 RpcCallback<ExportProtos.ExportResponse> done) { 333 Region region = env.getRegion(); 334 Configuration conf = HBaseConfiguration.create(env.getConfiguration()); 335 conf.setStrings("io.serializations", conf.get("io.serializations"), 336 ResultSerialization.class.getName()); 337 try { 338 Scan scan = validateKey(region.getRegionInfo(), request); 339 Token userToken = null; 340 if (userProvider.isHadoopSecurityEnabled() && !request.hasFsToken()) { 341 LOG.warn("Hadoop security is enable, but no found of user token"); 342 } else if (userProvider.isHadoopSecurityEnabled()) { 343 userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), 344 request.getFsToken().getPassword().toByteArray(), 345 new Text(request.getFsToken().getKind()), 346 new Text(request.getFsToken().getService())); 347 } 348 ExportProtos.ExportResponse response = processData(region, conf, userProvider, 349 scan, userToken, getWriterOptions(conf, region.getRegionInfo(), request)); 350 done.run(response); 351 } catch (IOException e) { 352 CoprocessorRpcUtils.setControllerException(controller, e); 353 LOG.error(e.toString(), e); 354 } 355 } 356 357 private Scan validateKey(final RegionInfo region, final ExportProtos.ExportRequest request) 358 throws IOException { 359 Scan scan = ProtobufUtil.toScan(request.getScan()); 360 byte[] regionStartKey = region.getStartKey(); 361 byte[] originStartKey = scan.getStartRow(); 362 if (originStartKey == null 363 || Bytes.compareTo(originStartKey, regionStartKey) < 0) { 364 scan.setStartRow(regionStartKey); 365 } 366 byte[] regionEndKey = region.getEndKey(); 367 byte[] originEndKey = scan.getStopRow(); 368 if (originEndKey == null 369 || Bytes.compareTo(originEndKey, regionEndKey) > 0) { 370 scan.setStartRow(regionEndKey); 371 } 372 return scan; 373 } 374 375 private static class RegionOp implements Closeable { 376 private final Region region; 377 378 RegionOp(final Region region) throws IOException { 379 this.region = region; 380 region.startRegionOperation(); 381 } 382 383 @Override 384 public void close() throws IOException { 385 region.closeRegionOperation(); 386 } 387 } 388 389 private static class ScanCoprocessor { 390 private final HRegion region; 391 392 ScanCoprocessor(final Region region) { 393 this.region = (HRegion) region; 394 } 395 396 RegionScanner checkScannerOpen(final Scan scan) throws IOException { 397 RegionScanner scanner; 398 if (region.getCoprocessorHost() == null) { 399 scanner = region.getScanner(scan); 400 } else { 401 region.getCoprocessorHost().preScannerOpen(scan); 402 scanner = region.getScanner(scan); 403 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); 404 } 405 if (scanner == null) { 406 throw new IOException("Failed to open region scanner"); 407 } 408 return scanner; 409 } 410 411 void checkScannerClose(final InternalScanner s) throws IOException { 412 if (s == null) { 413 return; 414 } 415 if (region.getCoprocessorHost() == null) { 416 s.close(); 417 return; 418 } 419 region.getCoprocessorHost().preScannerClose(s); 420 try { 421 s.close(); 422 } finally { 423 region.getCoprocessorHost().postScannerClose(s); 424 } 425 } 426 427 boolean preScannerNext(final InternalScanner s, 428 final List<Result> results, final int limit) throws IOException { 429 if (region.getCoprocessorHost() == null) { 430 return false; 431 } else { 432 Boolean bypass = region.getCoprocessorHost().preScannerNext(s, results, limit); 433 return bypass == null ? false : bypass; 434 } 435 } 436 437 boolean postScannerNext(final InternalScanner s, 438 final List<Result> results, final int limit, boolean hasMore) 439 throws IOException { 440 if (region.getCoprocessorHost() == null) { 441 return false; 442 } else { 443 return region.getCoprocessorHost().postScannerNext(s, results, limit, hasMore); 444 } 445 } 446 } 447 448 private static class SecureWriter implements Closeable { 449 private final PrivilegedWriter privilegedWriter; 450 451 SecureWriter(final Configuration conf, final UserProvider userProvider, 452 final Token userToken, final List<SequenceFile.Writer.Option> opts) 453 throws IOException { 454 User user = getActiveUser(userProvider, userToken); 455 try { 456 SequenceFile.Writer sequenceFileWriter = 457 user.runAs((PrivilegedExceptionAction<SequenceFile.Writer>) () -> 458 SequenceFile.createWriter(conf, 459 opts.toArray(new SequenceFile.Writer.Option[opts.size()]))); 460 privilegedWriter = new PrivilegedWriter(user, sequenceFileWriter); 461 } catch (InterruptedException e) { 462 throw new IOException(e); 463 } 464 } 465 466 void append(final Object key, final Object value) throws IOException { 467 privilegedWriter.append(key, value); 468 } 469 470 private static User getActiveUser(final UserProvider userProvider, final Token userToken) 471 throws IOException { 472 User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent()); 473 if (user == null && userToken != null) { 474 LOG.warn("No found of user credentials, but a token was got from user request"); 475 } else if (user != null && userToken != null) { 476 user.addToken(userToken); 477 } 478 return user; 479 } 480 481 @Override 482 public void close() throws IOException { 483 privilegedWriter.close(); 484 } 485 } 486 487 private static class PrivilegedWriter implements PrivilegedExceptionAction<Boolean>, 488 Closeable { 489 private final User user; 490 private final SequenceFile.Writer out; 491 private Object key; 492 private Object value; 493 494 PrivilegedWriter(final User user, final SequenceFile.Writer out) { 495 this.user = user; 496 this.out = out; 497 } 498 499 void append(final Object key, final Object value) throws IOException { 500 if (user == null) { 501 out.append(key, value); 502 } else { 503 this.key = key; 504 this.value = value; 505 try { 506 user.runAs(this); 507 } catch (InterruptedException ex) { 508 throw new IOException(ex); 509 } 510 } 511 } 512 513 @Override 514 public Boolean run() throws Exception { 515 out.append(key, value); 516 return true; 517 } 518 519 @Override 520 public void close() throws IOException { 521 out.close(); 522 } 523 } 524 525 public final static class Response { 526 private final long rowCount; 527 private final long cellCount; 528 529 private Response(ExportProtos.ExportResponse r) { 530 this.rowCount = r.getRowCount(); 531 this.cellCount = r.getCellCount(); 532 } 533 534 public long getRowCount() { 535 return rowCount; 536 } 537 538 public long getCellCount() { 539 return cellCount; 540 } 541 542 @Override 543 public String toString() { 544 StringBuilder builder = new StringBuilder(35); 545 return builder.append("rowCount=") 546 .append(rowCount) 547 .append(", cellCount=") 548 .append(cellCount) 549 .toString(); 550 } 551 } 552}