All Projects → chubbyjiang → Spark_DB_Connector

chubbyjiang / Spark_DB_Connector

Licence: other
Use Scala API to read/write data from different databases,HBase,MySQL,etc.

Programming Languages

scala
5932 projects

Projects that are alternatives of or similar to Spark DB Connector

Stream Reactor
Streaming reference architecture for ETL with Kafka and Kafka-Connect. You can find more on http://lenses.io on how we provide a unified solution to manage your connectors, most advanced SQL engine for Kafka and Kafka Streams, cluster monitoring and alerting, and more.
Stars: ✭ 753 (+3037.5%)
Mutual labels:  connector, hbase
Connectors
Connectors simplify connecting to standalone and CloudFoundry services
Stars: ✭ 28 (+16.67%)
Mutual labels:  connector
NoSQLDataEngineering
NoSQL Data Engineering
Stars: ✭ 25 (+4.17%)
Mutual labels:  hbase
Real-time-log-analysis-system
🐧基于spark streaming+flume+kafka+hbase的实时日志处理分析系统(分为控制台版本和基于springboot、Echarts等的Web UI可视化版本)
Stars: ✭ 31 (+29.17%)
Mutual labels:  hbase
xdu-cloudcourse-web
西电云计算课程大作业Web端代码示例
Stars: ✭ 26 (+8.33%)
Mutual labels:  hbase
dpkb
大数据相关内容汇总,包括分布式存储引擎、分布式计算引擎、数仓建设等。关键词:Hadoop、HBase、ES、Kudu、Hive、Presto、Spark、Flink、Kylin、ClickHouse
Stars: ✭ 123 (+412.5%)
Mutual labels:  hbase
phoenix
Apache Phoenix / Hbase Spring Boot Microservices
Stars: ✭ 23 (-4.17%)
Mutual labels:  hbase
asyncmy
A fast asyncio MySQL/MariaDB driver with replication protocol support
Stars: ✭ 126 (+425%)
Mutual labels:  connector
DirectLineAndroidSample
Android Sample for Direct Line API - Microsoft Bot Framework
Stars: ✭ 21 (-12.5%)
Mutual labels:  connector
hdocdb
HBase as a JSON Document Database
Stars: ✭ 24 (+0%)
Mutual labels:  hbase
DataX-src
DataX 是异构数据广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
Stars: ✭ 21 (-12.5%)
Mutual labels:  hbase
replicator
MySQL Replicator. Replicates MySQL tables to Kafka and HBase, keeping the data changes history in HBase.
Stars: ✭ 41 (+70.83%)
Mutual labels:  hbase
hyper-proxy
A proxy connector for Hyper-based crates
Stars: ✭ 73 (+204.17%)
Mutual labels:  connector
Lidea
大型分布式系统实时监控平台
Stars: ✭ 28 (+16.67%)
Mutual labels:  hbase
hbase-prometheus-monitoring
No description or website provided.
Stars: ✭ 19 (-20.83%)
Mutual labels:  hbase
spark-connector
A connector for Apache Spark to access Exasol
Stars: ✭ 13 (-45.83%)
Mutual labels:  connector
OpenSCAD connectors
Simple, parametric APIs for connectors such as corner brackets and t-joints. Specializing on connectors for aluminum extrusion connectors.
Stars: ✭ 29 (+20.83%)
Mutual labels:  connector
apsconnect-cli
Warning: EOL for January 2021. CloudBlue Connect CLI tool to automate APS Package management in the CloudBlue Commerce instance (hub).
Stars: ✭ 13 (-45.83%)
Mutual labels:  connector
dockerfiles
Multi docker container images for main Big Data Tools. (Hadoop, Spark, Kafka, HBase, Cassandra, Zookeeper, Zeppelin, Drill, Flink, Hive, Hue, Mesos, ... )
Stars: ✭ 29 (+20.83%)
Mutual labels:  hbase
hbase-native-client
Apache HBase Native Client
Stars: ✭ 30 (+25%)
Mutual labels:  hbase

Spark Database Connector

New Feature

  • List写入HBase支持Kerberos认证
  • 升级HBase Client API为1.2.0版本

隐藏处理各种数据库的连接细节,使用Scala API在Spark中简易地处理数据库连接的读写操作。

相关测试环境信息:

  • Scala 2.11.8/2.10.5
  • Spark 1.6.0
  • HBase 0.98.4
  • Jdbc Driver 5.1.35

目前支持的有:

  • HBase
  • MySQL

添加Maven引用:

<dependency>
    <groupId>info.xiaohei.www</groupId>
    <artifactId>spark-database-connector_2.11</artifactId>
    <version>1.0.0</version>
</dependency>

Scala 2.10版本使用:

<dependency>
    <groupId>info.xiaohei.www</groupId>
    <artifactId>spark-database-connector_2.10</artifactId>
    <version>1.0.0</version>
</dependency>

HBase

设置HBase host

通过以下三种任意方式设置HBase host地址

1、在spark-submit中设置命令:

spark-submit --conf spark.hbase.host=your-hbase-host

2、在Scala代码中设置:

val sparkConf = new SparkConf()
sparkConf.set("spark.hbase.host", "your-hbase-host")
val sc = new SparkContext(sparkConf)

3、在JVM参数中设置:

java -Dspark.hbase.host=your-hbase-host -jar ....

设置hbase-site.xml文件读取路径(可选)

如果有读取hbase-site.xml文件的需求时,可以通过设置下面的选项进行指定:

spark.hbase.config=your-hbase-config-path

设置该选项的方式同上 注意:需要将hbase-site.xml文件添加到当前项目可识别的resource路径中,否则将无法读取,使用默认配置

向HBase写入数据

导入隐式转换:

import info.xiaohei.spark.connector.hbase._

Spark RDD写入HBase

任何Spark RDD对象都能直接操作写入HBase,例如:

val rdd = sc.parallelize(1 to 100)
            .map(i => (s"rowkey-${i.toString}", s"column1-${i.toString}", "column2"))

这个RDD包含了100个三元组类型的数据,写入HBase时,第一个元素为rowkey,剩下的元素依次为各个列的值:

rdd.toHBase("mytable")
      .insert("col1", "col2")
      .inColumnFamily("columnFamily")
      .save()

(1)使用RDD的toHBase函数传入要写入的表名
(2)insert函数传入要插入的各个列名
(3)inColumnFamily函数传入这些列所在的列族名
(4)最后save函数将该RDD保存在HBase中

如果col2和col1的列族不一样,可以在insert传入列名时单独指定:

rdd.toHBase("mytable")
      .insert("col1", "otherColumnFamily:col2")
      .inColumnFamily("defaultColumnFamily")
      .save()

列族名和列名之间要用冒号(:)隔开,其他列需要指定列名时使用的方式一致

Scala集合/序列写入HBase

val dataList  = Seq[(String, String)](
      ("00001475304346643896037", "kgJkm0euSbe"),
      ("00001475376619355219953", "kiaR40qzI8o"),
      ("00001475458728618943637", "kgCoW0hgzXO"),
      ("00001475838363931738019", "kqiHu0WNJC0")

    )

//创建隐式变量
implicit val hbaseConf = HBaseConf.createConf("hbase-host")
//如果实在spark程序操作可以通过以下的方式
implicit val hbaseConf = HBaseConf.createFromSpark(sc)

dataList.toHBase("mytable")
	.insert("col1", "col2")
	.inColumnFamily("columnFamily")
	.save()

使用方式和RDD写入HBase的操作类似,注意,隐式变量不能在spark的foreachPartition等算子中定义

以上的方式将使用HTable的put list批量将集合中的数据一次全部put到HBase中,如果写入HBase时想使用缓存区的方式,需要另外添加几个参数:

dataList.toHBase("mytable"
      //该参数指定写入时的autoFlush为false
      , Some(false, false)
      //该参数指定写入缓冲区的大小
      , Some(5 * 1024 * 1024))
      .insert("col1", "col2")
      .inColumnFamily("columnFamily")
      .save()

使用该方式时,集合中的每个数据都会被put一次,但是关闭了自动刷写,所以只有当缓冲区满了之后才会批量向HBase写入

写入时为Rowkey添加salt前缀

rdd.toHBase("mytable")
      .insert("col1", "otherColumnFamily:col2")
      .inColumnFamily("defaultColumnFamily")
      //添加salt
      .withSalt(saltArray)
      .save()

saltArray是一个字符串数组,简单的例如0-9的字符串表示,由使用者自己定义

使用withSalt函数之后,在写入HBase时会为rowkey添加一个saltArray中的随机串,注意:为了更好的支持HBase部分键扫描(rowkey左对齐),数组中的所有元素长度都应该相等

取随机串的方式有两种:

  • 1.计算当前的rowkey的hashCode的16进制表示并对saltArray的长度取余数,得到saltArray中的一个随机串作为salt前缀添加到rowkey
  • 2.使用随机数生成器获得不超过saltArray长度的数字作为下标取数组中的值

当前使用的是第一种方式

读取HBase数据

导入隐式转换:

import info.xiaohei.spark.connector.hbase._

读取HBase的数据操作需要通过sc来进行:

val hbaseRdd = sc.fromHBase[(String, String, String)]("mytable")
      .select("col1", "col2")
      .inColumnFamily("columnFamily")
      .withStartRow("startRow")
      .withEndRow("endRow")
      //当rowkey中有随机的salt前缀时,将salt数组传入即可自动解析
      //得到的rowkey将会是原始的,不带salt前缀的
      .withSalt(saltArray)

(1)使用sc的fromHBase函数传入要读取数据的表名,该函数需要指定读取数据的类型信息
(2)select函数传入要读取的各个列名
(3)inColumnFamily函数传入这些列所在的列族名
(4)withStartRow和withEndRow将设置rowkey的扫描范围,可选操作 (5)之后就可以在hbaseRdd上执行Spark RDD的各种算子操作

上面的例子中,fromHBase的泛型类型为三元组,但是select中只读取了两列值,因此,该三元组中第一个元素将是rowkey的值,其他元素按照列的顺序依次类推

当你不需要读取rowkey的值时,只需要将fromHBase的泛型类型改为二元组

即读取的列数为n,泛型类型为n元组时,列名和元组中的各个元素相对应 读取的列数为n,泛型类型为n+1元组时,元组的第一个元素为rowkey

当各个列位于不同列族时,设置列族的方式同写入HBase一致

SQL On HBase

借助SQLContext的DataFrame接口,在组件中可以轻易实现SQL On HBase的功能。

上例中的hbaseRdd是从HBase中读取出来的数据,在此RDD的基础上进行转换操作:

//创建org.apache.spark.sql.Row类型的RDD
val rowRdd = hbaseRdd.map(r => Row(r._1, r._2, r._3))
val sqlContext = new SQLContext(sc)
val df = sqlContext.createDataFrame(
      rowRdd,
      StructType(Array(StructField("col1", StringType), StructField("col2", StringType), StructField("col3", StringType)))
    )
df.show()

df.registerTempTable("mytable")
sqlContext.sql("select col1 from mytable").show()

使用case class查询/读取HBase的数据

使用内置的隐式转换可以处理基本数据类型和元组数据,当有使用case class的需求时,需要额外做一些准备工作

定义如下的case class:

case class MyClass(name: String, age: Int)

如果想达到以下的效果:

val classRdd = sc.fromHBase[MyClass]("tableName")
    .select("name","age")
    .inColumnFamily("info")

classRdd.map{
    c =>
        (c.name,c.age)
}

或者以下的效果:

//classRdd的类型为RDD[MyClass]
classRdd.toHBase("tableName")
    .insert("name","age")
    .inColumnFamily("info")
    .save()

需要另外实现能够解析自定义case class的隐式方法:

implicit def myReaderConversion: DataReader[MyClass] = new CustomDataReader[(String, Int), MyClass] {
    override def convert(data: (String, Int)): MyClass = MyClass(data._1, data._2)
  }

implicit def myWriterConversion: DataWriter[MyClass] = new CustomDataWriter[MyClass, (String, Int)] {
    override def convert(data: MyClass): (String, Int) = (data.name, data.age)
  }

该隐式方法返回一个DataReader/DataWriter 重写CustomDataReader/CustomDataWriter中的convert方法 将case class转换为一个元组或者将元组转化为case class即可

带有Kerberos认证的HBase

除了上述过程中写HBase需要的配置外,还需要指定以下三个配置:

  • spark.hbase.krb.principal:认证的principal用户名
  • spark.hbase.krb.keytab:keytab文件路径(各个节点都存在且路径保持一致)
  • spark.hbase.config:hbase-site.xml文件路径

写入HBase时将会使用提供给的krb信息进行认证

当前仅支持无缝读取启用了Kerberos认证的HBase 写入时有一定限制,如要使用RDD的foreachPartition入库:

rdd.foreachPartition{
    data =>
        data.toList.toHBase("table").insert("columns")//...
}

注意,foreachPartition中的toList操作将会把分区中的所有数据加载到内存中,如果数据量过大可能会造成OOM,增加Executor的内存即可

TODO:RDD的读写接口目前还未实现Kerberos认证

MySQL

除了可以将RDD/集合写入HBase之外,还可以在普通的程序中进行MySQL的相关操作

在conf中设置相关信息

1、Spark程序中操作

在SparkConf中设置以下的信息:

sparkConf
  .set("spark.mysql.host", "your-host")
  .set("spark.mysql.username", "your-username")
  .set("spark.mysql.password", "your-passwd")
  .set("spark.mysql.port", "db-port")
  .set("spark.mysql.db", "database-name")

//创建MySqlConf的隐式变量
implicit val mysqlConf = MysqlConf.createFromSpark(sc)

关于这个隐式变量的说明:在RDD的foreachPartition或者mapPartitions等操作时,因为涉及到序列化的问题,默认的对MySqlConf的隐式转化操作会出现异常问题,所以需要显示的声明一下这个变量,其他不涉及网络序列化传输的操作可以省略这步

HBase小节中的设置属性的方法在这里也适用

2、普通程序中操作

创建MysqlConf,并设置相关属性:

//创建MySqlConf的隐式变量
implicit val mysqlConf = MysqlConf.createConf(
      "your-host",
      "username",
      "password",
      "port",
      "db-name"
    )

在普通程序中操作时一定要显示声明MysqlConf这个隐式变量

写入MySQL

导入隐式转换:

import info.xiaohei.spark.connector.mysql._

之后任何Iterable类型的数据都可以直接写入MySQL中:

list.toMysql("table-name")
  //插入的列名
  .insert("columns")
  //where条件,如age=1
  .where("where-conditions")
  .save()

在Spark程序中从MySQL读取数据

val res = sc.fromMysql[(Int,String,Int)]("table-name")
  .select("id","name","age")
  .where("where-conditions")
  .get

在普通程序中从MySQL读取数据

//普通程序读取关系型数据库入口
val dbEntry = new RelationalDbEntry

val res = dbEntry.fromMysql[(Int,String,Int)]("table-name")
  .select("id","name","age")
  .where("where-conditions")
  .get

创建数据库入口之后的操作和spark中的流程一致

case class解析

如果需要使用自定义的case class解析/写入MySQL,例如:

case class Model(id: Int, name: String, age: Int)

基本流程和hbase小节中差不多,定义隐式转换:

implicit def myExecutorConversion: DataExecutor[Model] = new CustomDataExecutor[Model, (Int, String, Int)]() {
    override def convert(data: Model): (Int, String, Int) = (data.id, data.name, data.age)
}

implicit def myMapperConversion: DataMapper[Model] = new CustomDataMapper[(Int, String, Int), Model]() {
    override def convert(data: (Int, String, Int)): Model = Model(data._1, data._2, data._3)
 }

之后可以直接使用:

val entry = new RelationalDbEntry
val res = entry.fromMysql[Model]("test")
  .select("id", "name", "age")
  .get
res.foreach(x => println(s"id:${x.id},name:${x.name},age:${x.age}"))
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].