云计算技术

spark hbase Spark2.1.1

测试条件

以下是我的PC信息

依赖:

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>

1. 第一种方法
每次写进一条,调用API

/**
* Puts some data in the table.
*
* @param put The data to put.
* @throws IOException if a remote or network exception occurs.
* @since 0.20.0
*/
void put(Put put) throws IOException

我的代码

" hljs avrasm">import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Word {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("hbase"))
val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
rdd.foreachPartition(x => {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "172.17.11.85,172.17.11.86,172.17.11.87")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set("hbase.defaults.for.version.skip", "true")
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val table = hbaseConn.getTable(TableName.valueOf("word"))
x.foreach(value => {
var put = new Put(Bytes.toBytes(value.toString))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(value.toString))
table.put(put)
})
})
}
}

第一条的时间戳:1497973306787

最后一条的时间戳:1497973505273

时间戳之差1497973505273-1497973306787=198486

2.第二种方法
批量写入Hbase,使用的API:

/**
* {@inheritDoc}
* @throws IOException
*/
@Override
public void put(final List<Put> puts) throws IOException {
getBufferedMutator().mutate(puts);
if (autoFlush) {
flushCommits();
}
}

我的代码:

" hljs avrasm">import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, HTable, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Word {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("hbase"))
val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
rdd.map(value => {
var put = new Put(Bytes.toBytes(value.toString))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(value.toString))
put
}).foreachPartition(iterator => {
var jobConf = new JobConf(HBaseConfiguration.create())
jobConf.set("hbase.zookeeper.quorum", "172.17.11.85,172.17.11.86,172.17.11.87")
jobConf.set("zookeeper.znode.parent", "/hbase")
jobConf.setOutputFormat(classOf[TableOutputFormat])
val table = new HTable(jobConf, TableName.valueOf("word"))
import scala.collection.JavaConversions._
table.put(seqAsJavaList(iterator.toSeq))
})
}
}

第一条数据的时间戳是

" hljs cs"> 0 column=f1:c1, timestamp=1498013677545, value=0
1 column=f1:c1, timestamp=1498013677545, value=1
10 column=f1:c1, timestamp=1498013677545, value=10
100 column=f1:c1, timestamp=1498013677545, value=100
1000 column=f1:c1, timestamp=1498013677545, value=1000

第9999条数据写进Hbase的时间戳是:

" hljs livecodeserver"> 108993 column=f1:c1, timestamp=1498013680244, value=108993
108994 column=f1:c1, timestamp=1498013680244, value=108994
108995 column=f1:c1, timestamp=1498013680244, value=108995
9999 row(s) in 1.2730 seconds

时间戳之差t=1498013680244-1498013677545=2699

3.第三种方法

将写进Hbase转换为Mapreduce任务

我的代码:

" hljs avrasm">import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Word {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("hbase"))
val conf = HBaseConfiguration.create()
var jobConf = new JobConf(conf)
jobConf.set("hbase.zookeeper.quorum", "172.17.11.85,172.17.11.86,172.17.11.87")
jobConf.set("zookeeper.znode.parent", "/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "word")
jobConf.setOutputFormat(classOf[TableOutputFormat])
val rdd = sc.makeRDD(Array(1)).flatMap(_ => 0 to 1000000)
rdd.map(x => {
var put = new Put(Bytes.toBytes(x.toString))
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x.toString))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
}
}

第一条的时间戳:

" hljs fix">0 column=f1:c1, timestamp=1498014877635, value=0

最后一条的时间戳

" hljs cs"> 108993 column=f1:c1, timestamp=1498014879526, value=108993
108994 column=f1:c1, timestamp=1498014879526, value=108994
108995 column=f1:c1, timestamp=1498014879526, value=108995

时间戳之差t=1498014879526-1498014877635=1891

4.总结

通过以上对比可以看出,在其他条件相同的情况下
第三种方法(1498014879526-1498014877635=1891)>第二种方法(1498013680244-1498013677545=2699)>第一种方法(1497973505273-1497973306787=198486)

最优方法是第三种