9、框架整合
9.1、Java 读写 ClickHouse API
Maven 依赖
<!-- https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.6</version>
</dependency>
Java 读取 ClickHouse 数据
ClickHouseProperties props = new ClickHouseProperties();
props.setUser("");
props.setPassword("");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node01:8123,node02:8123,node03:8123/default", props);
ClickHouseConnection conn = dataSource.getConnection();
ClickHouseStatement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("select id, name from users");
while (rs.next()) {
int id = rs.getInt("id");
String name = rs.getString("name");
System.out.println("id = " + id + ",name = " + name);
}
rs.close();
statement.close();
conn.close();
Java 向 ClickHouse 写入数据
ClickHouseProperties props = new ClickHouseProperties();
props.setUser("");
props.setPassword("");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://192.168.110.150:8123/default", props);
ClickHouseConnection conn = dataSource.getConnection();
PreparedStatement statement = conn.prepareStatement("insert into test values(?, ?, ?)");
statement.setInt(1, 100);
statement.setString(2, "张三");
statement.setInt(3, 30);
boolean success = statement.execute();
System.out.println(success);
statement.close();
conn.close();
9.2、Spark 写入 ClickHouse API
SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse 对应的表中。在 ClickHouse 中需要预先创建好对应的结果表。
Maven 依赖
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.6</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
Spark 代码
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import java.util.Properties
object ClickHouse {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate()
val jsonList: Seq[String] = List[String](
"{\"id\":1,\"name\":\"张三\",\"age\":18}",
"{\"id\":2,\"name\":\"李四\",\"age\":19}",
"{\"id\":3,\"name\":\"王五\",\"age\":20}"
)
//将jsonList数据转换成DataSet
import session.implicits._
val ds: Dataset[String] = jsonList.toDS()
val df: DataFrame = session.read.json(ds)
df.show()
//将结果写往ClickHouse
val url = "jdbc:clickhouse://node01:8123/default"
val table = "test"
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "")
properties.put("socket_timeout", "300000")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
}
}
9.3、Flink 写入 ClickHouse API
可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink在 1.11.0 版本对其 JDBC Connnector 进行了重构:
- 重构之前(1.10.x 及之前版本),包名为
flink-jdbc
。 - 重构之后(1.11.x 及之后版本),包名为
flink-connector-jdbc
。
二者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:
API名称 | flink-jdbc | flink-connector-jdbc |
---|---|---|
DataStream API | 不支持 | 支持 |
Table API & SQL | 支持 | 不支持 |
flink-jdbc
Maven 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
Flink 代码
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到ClickHouse中,只支持Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为1,后期每个并行度满批次需要的条数时,会插入click中
env.setParallelism(1)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取Socket中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line => {
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
})
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将table 对象写入ClickHouse中
//需要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认5000条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册ClickHouse table Sink,设置sink 数据的字段及Schema信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
}
}
flink-connector-jdbc
Maven 依赖
<!-- Flink1.11 后需要 Flink-client包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
Flink 代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入ClickHouse ,目前只支持DataStream API
*/
object FlinkWriteToClickHouse2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
//准备向ClickHouse中插入数据的sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据SQL
insetIntoCkSql,
//设置插入ClickHouse数据的参数
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接ClickHouse的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
}
}
10、可视化工具
TABiX 支持通过浏览器直接连接 ClickHouse,不需要安装其他软件,就可以访问 ClickHouse。有两种使用方式,一种是直接浏览器访问配置。另一种是使用ClickHouse 内嵌方式。
Tabix具有以下特点:
- ⾼亮语法的编辑器。
- ⾃动命令补全。
- 查询命令执⾏的图形分析⼯具。
- 配⾊⽅案选项。
直接浏览器访问
打开http://ui.tabix.io/,配置 ClickHouse。
连接 ClickHouse 的用户名默认为 default
,密码默认为空。
访问 ClickHouse 端口默认为 8123
。
ClickHouse内嵌方式
ClickHouse 自带了配置连接 TABiX,这里通过 ClickHouse Server 节点访问http://ui.tabix.io/网址进行 ClickHouse 界面化操作,可以进入 ClickHouse Server 节点路径 /etc/clickhouse-server
,配置 config.xml
解开“<http_server_default_response>”标签。
重启 ClickHouse Server,可以通过http://node01:8123来访问 TABiX。
发表评论: