001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.stream.Collectors;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.Cell.Type;
030import org.apache.hadoop.hbase.CellBuilderFactory;
031import org.apache.hadoop.hbase.CellBuilderType;
032import org.apache.hadoop.hbase.ClientMetaTableAccessor;
033import org.apache.hadoop.hbase.ClientMetaTableAccessor.QueryType;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.MetaTableAccessor;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
046import org.apache.hadoop.hbase.master.RegionState;
047import org.apache.hadoop.hbase.master.RegionState.State;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.yetus.audience.InterfaceAudience;
051
052/**
053 * Helper class for storing replication barriers in family 'rep_barrier' of meta table.
054 * <p/>
055 * See SerialReplicationChecker on how to make use of the barriers.
056 */
057@InterfaceAudience.Private
058public final class ReplicationBarrierFamilyFormat {
059
060  public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
061
062  private static final byte ESCAPE_BYTE = (byte) 0xFF;
063
064  private static final byte SEPARATED_BYTE = 0x00;
065
066  private ReplicationBarrierFamilyFormat() {
067  }
068
069  public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException {
070    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
071      .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(HConstants.SEQNUM_QUALIFIER)
072      .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(Bytes.toBytes(openSeqNum))
073      .build());
074  }
075
076  private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) {
077    for (byte b : regionName) {
078      if (b == ESCAPE_BYTE) {
079        out.write(ESCAPE_BYTE);
080      }
081      out.write(b);
082    }
083  }
084
085  public static byte[] getParentsBytes(List<RegionInfo> parents) {
086    ByteArrayOutputStream bos = new ByteArrayOutputStream();
087    Iterator<RegionInfo> iter = parents.iterator();
088    writeRegionName(bos, iter.next().getRegionName());
089    while (iter.hasNext()) {
090      bos.write(ESCAPE_BYTE);
091      bos.write(SEPARATED_BYTE);
092      writeRegionName(bos, iter.next().getRegionName());
093    }
094    return bos.toByteArray();
095  }
096
097  private static List<byte[]> parseParentsBytes(byte[] bytes) {
098    List<byte[]> parents = new ArrayList<>();
099    ByteArrayOutputStream bos = new ByteArrayOutputStream();
100    for (int i = 0; i < bytes.length; i++) {
101      if (bytes[i] == ESCAPE_BYTE) {
102        i++;
103        if (bytes[i] == SEPARATED_BYTE) {
104          parents.add(bos.toByteArray());
105          bos.reset();
106          continue;
107        }
108        // fall through to append the byte
109      }
110      bos.write(bytes[i]);
111    }
112    if (bos.size() > 0) {
113      parents.add(bos.toByteArray());
114    }
115    return parents;
116  }
117
118  public static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
119    byte[] value = getParentsBytes(parents);
120    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
121      .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
122      .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build());
123  }
124
125  public static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
126    throws IOException {
127    Put put = new Put(regionInfo.getRegionName(), ts);
128    addReplicationBarrier(put, openSeqNum);
129    return put;
130  }
131
132  public static final class ReplicationBarrierResult {
133    private final long[] barriers;
134    private final RegionState.State state;
135    private final List<byte[]> parentRegionNames;
136
137    ReplicationBarrierResult(long[] barriers, State state, List<byte[]> parentRegionNames) {
138      this.barriers = barriers;
139      this.state = state;
140      this.parentRegionNames = parentRegionNames;
141    }
142
143    public long[] getBarriers() {
144      return barriers;
145    }
146
147    public RegionState.State getState() {
148      return state;
149    }
150
151    public List<byte[]> getParentRegionNames() {
152      return parentRegionNames;
153    }
154
155    @Override
156    public String toString() {
157      return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" + state
158        + ", parentRegionNames="
159        + parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", "))
160        + "]";
161    }
162  }
163
164  private static long getReplicationBarrier(Cell c) {
165    return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
166  }
167
168  public static long[] getReplicationBarriers(Result result) {
169    return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
170      .stream().mapToLong(ReplicationBarrierFamilyFormat::getReplicationBarrier).sorted().distinct()
171      .toArray();
172  }
173
174  private static ReplicationBarrierResult getReplicationBarrierResult(Result result) {
175    long[] barriers = getReplicationBarriers(result);
176    byte[] stateBytes = result.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER);
177    RegionState.State state =
178      stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null;
179    byte[] parentRegionsBytes =
180      result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
181    List<byte[]> parentRegionNames =
182      parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList();
183    return new ReplicationBarrierResult(barriers, state, parentRegionNames);
184  }
185
186  public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn,
187    TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException {
188    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
189    byte[] metaStopKey =
190      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
191    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey)
192      .addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)
193      .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true)
194      .setCaching(10);
195    try (Table table = conn.getTable(TableName.META_TABLE_NAME);
196      ResultScanner scanner = table.getScanner(scan)) {
197      for (Result result;;) {
198        result = scanner.next();
199        if (result == null) {
200          return new ReplicationBarrierResult(new long[0], null, Collections.emptyList());
201        }
202        byte[] regionName = result.getRow();
203        // TODO: we may look up a region which has already been split or merged so we need to check
204        // whether the encoded name matches. Need to find a way to quit earlier when there is no
205        // record for the given region, for now it will scan to the end of the table.
206        if (
207          !Bytes.equals(encodedRegionName, Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))
208        ) {
209          continue;
210        }
211        return getReplicationBarrierResult(result);
212      }
213    }
214  }
215
216  public static long[] getReplicationBarriers(Connection conn, byte[] regionName)
217    throws IOException {
218    try (Table table = conn.getTable(TableName.META_TABLE_NAME)) {
219      Result result = table.get(new Get(regionName)
220        .addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
221        .readAllVersions());
222      return getReplicationBarriers(result);
223    }
224  }
225
226  public static List<Pair<String, Long>> getTableEncodedRegionNameAndLastBarrier(Connection conn,
227    TableName tableName) throws IOException {
228    List<Pair<String, Long>> list = new ArrayList<>();
229    MetaTableAccessor.scanMeta(conn,
230      ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REPLICATION),
231      ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REPLICATION),
232      QueryType.REPLICATION, r -> {
233        byte[] value =
234          r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER);
235        if (value == null) {
236          return true;
237        }
238        long lastBarrier = Bytes.toLong(value);
239        String encodedRegionName = RegionInfo.encodeRegionName(r.getRow());
240        list.add(Pair.newPair(encodedRegionName, lastBarrier));
241        return true;
242      });
243    return list;
244  }
245
246  public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn,
247    TableName tableName) throws IOException {
248    List<String> list = new ArrayList<>();
249    MetaTableAccessor.scanMeta(conn,
250      ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REPLICATION),
251      ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REPLICATION),
252      QueryType.REPLICATION, new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
253        list.add(RegionInfo.encodeRegionName(r.getRow()));
254        return true;
255      });
256    return list;
257  }
258}