From e9e323339121bf1078f2b20bb1d47de9c13b9496 Mon Sep 17 00:00:00 2001 From: "juke.mini666" Date: Fri, 20 Dec 2019 14:56:14 +0900 Subject: [PATCH 1/2] Arrangement code --- .../hadoop/hbase/spark/HBaseContext.scala | 79 ++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 890e67f8..066d5a84 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl} import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType} -import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.util.{Bytes, RegionSplitter} import org.apache.hadoop.mapred.JobConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.client._ import scala.reflect.ClassTag import org.apache.spark.{SerializableWritable, SparkContext} import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, -TableInputFormat, IdentityTableMapper} +TableInputFormat, IdentityTableMapper, TableSnapshotInputFormat} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.spark.streaming.dstream.DStream @@ -463,6 +463,81 @@ class HBaseContext(@transient val sc: SparkContext, (r: (ImmutableBytesWritable, Result)) => r) } + /** + * + * @param snapshotName the name of the snapshot to scan + * @param scans the HBase scan object to use to read data from HBase + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, + * and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @return New RDD with results from scan + */ + def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String): + RDD[(ImmutableBytesWritable, Result)] + = hbaseRDDForSnapshot(snapshotName, scans, restoreDir, null, 1) + + /** + * This function will use the native HBase TableSnapshotInputFormat with the + * given scan object to generate a new RDD + * + * @param snapshotName the name of the snapshot to scan + * @param scans the HBase scan object to use to read data from HBase + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, + * and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @param splitAlgo SplitAlgorithm to be used when generating InputSplits + * @param numSplitsPerRegion how many input splits to generate per one region + * @return New RDD with results from scan + */ + def hbaseRDDForSnapshot(snapshotName: String, scans: Scan, restoreDir: String, + splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int): + RDD[(ImmutableBytesWritable, Result)] = { + hbaseRDDForSnapshot[(ImmutableBytesWritable, Result)]( + snapshotName, + scans, + restoreDir, + splitAlgo, + numSplitsPerRegion, + (r: (ImmutableBytesWritable, Result)) => r) + } + + /** + * This function will use the native HBase TableSnapshotInputFormat with the + * given scan object to generate a new RDD + * + * @param snapshotName the name of the snapshot to scan + * @param scans the HBase scan object to use to read data from HBase + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, + * and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @param splitAlgo SplitAlgorithm to be used when generating InputSplits + * @param numSplitsPerRegion how many input splits to generate per one region + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return new RDD with results from scan + */ + def hbaseRDDForSnapshot[U: ClassTag](snapshotName: String, scans: Scan, restoreDir: String, + splitAlgo: RegionSplitter.SplitAlgorithm, numSplitsPerRegion: Int, + f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + val job: Job = Job.getInstance(getConf(broadcastedConf)) + + TableMapReduceUtil.initCredentials(job) + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scans, + classOf[IdentityTableMapper], null, null, job, true, new Path(restoreDir), splitAlgo, numSplitsPerRegion) + + val jconf = new JobConf(job.getConfiguration) + SparkHadoopUtil.get.addCredentials(jconf) + new NewHBaseRDD(sc, + classOf[TableSnapshotInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result], + job.getConfiguration, + this).map(f) + } + /** * underlining wrapper all foreach functions in HBaseContext */ From da13a6f0dd6e5a76c2416f1e429434fda735d72c Mon Sep 17 00:00:00 2001 From: "juke.mini666" Date: Fri, 20 Dec 2019 15:05:43 +0900 Subject: [PATCH 2/2] Arrangement code --- .../scala/org/apache/hadoop/hbase/spark/HBaseContext.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala index 066d5a84..a29ea34d 100644 --- a/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala +++ b/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala @@ -22,7 +22,7 @@ import java.util import java.util.UUID import javax.management.openmbean.KeyAlreadyExistsException -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience import org.apache.hadoop.hbase.fs.HFileSystem import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.io.compress.Compression @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._ import org.apache.hadoop.hbase.client._ + import scala.reflect.ClassTag import org.apache.spark.{SerializableWritable, SparkContext} import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, @@ -46,9 +47,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.spark.streaming.dstream.DStream import java.io._ + import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem} +import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, Path} + import scala.collection.mutable /**