YMatrix 文档
快速上手
集群部署
SQL参考
- ABORT
- ALTER_DATABASE
- ALTER_EXTENSION
- ALTER_EXTERNAL_TABLE
- ALTER_FOREIGN_DATA_WRAPPER
- ALTER_FOREIGN_TABLE
- ALTER_FUNCTION
- ALTER_INDEX
- ALTER_RESOURCE_GROUP
- ALTER_RESOURCE_QUEUE
- ALTER_ROLE
- ALTER_RULE
- ALTER_SCHEMA
- ALTER_SEQUENCE
- ALTER_SERVER
- ALTER_TABLE
- ALTER_TABLESPACE
- ALTER_TYPE
- ALTER_USER_MAPPING
- ALTER_VIEW
- ANALYZE
- BEGIN
- CHECKPOINT
- COMMIT
- COPY
- CREATE_DATABASE
- CREATE_EXTENSION
- CREATE_EXTERNAL_TABLE
- CREATE_FOREIGN_DATA_WRAPPER
- CREATE_FOREIGN_TABLE
- CREATE_FUNCTION
- CREATE_INDEX
- CREATE_RESOURCE_GROUP
- CREATE_RESOURCE_QUEUE
- CREATE_ROLE
- CREATE_RULE
- CREATE_SCHEMA
- CREATE_SEQUENCE
- CREATE_SERVER
- CREATE_TABLE
- CREATE_TABLE_AS
- CREATE_TABLESPACE
- CREATE_TYPE
- CREATE_USER_MAPPING
- CREATE_VIEW
- DELETE
- DROP_DATABASE
- DROP_EXTENSION
- DROP_EXTERNAL_TABLE
- DROP_FOREIGN_DATA_WRAPPER
- DROP_FOREIGN_TABLE
- DROP_FUNCTION
- DROP_INDEX
- DROP_RESOURCE_GROUP
- DROP_RESOURCE_QUEUE
- DROP_ROLE
- DROP_RULE
- DROP_SCHEMA
- DROP_SEQUENCE
- DROP_SERVER
- DROP_TABLE
- DROP_TABLESPACE
- DROP_TYPE
- DROP_USER_MAPPING
- DROP_VIEW
- END
- EXPLAIN
- GRANT
- INSERT
- LOAD
- LOCK
- REINDEX
- RELEASE_SAVEPOINT
- RESET
- REVOKE
- ROLLBACK_TO_SAVEPOINT
- ROLLBACK
- SAVEPOINT
- SELECT INTO
- SET ROLE
- SET TRANSACTION
- SET
- SHOW
- START TRANSACTION
- TRUNCATE
- UPDATE
- VACUUM
工具指南
Kafka接入
MatrixDB支持kafka无缝连接功能,可将kafka数据持续自动导入到MatrixDB表中,并支持图形化操作。
接入的数据格式包括CSV和JSON。本节将以一个最简单的例子,来介绍如何用MatrixDB管理平台来接入Kafka数据。
1. 准备工作
1.1 创建Kafka Topic
假设我们在本地9092端口启动了Kafka服务,通过下面命令创建一个测试Topic:
bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092
1.2 写入测试数据
然后写入几个测试数据:
bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C
通过上面命令向新创建的csv_test写入了4条数据,分别为4条逗号分割的CSV行。
1.3 创建接入目标表
连接数据库mydb:
[mxadmin@mdw ~]$ psql mydb
psql (12)
Type "help" for help.
mydb=#
创建测试表:
CREATE TABLE csv_table (
id SERIAL,
no INT,
city VARCHAR(45),
score DOUBLE PRECISION,
create_time TIMESTAMPTZ
) DISTRIBUTED BY(id);
2. 创建接入过程
准备工作完毕,下面演示如何使用图形化界面做接入。
在浏览器里输入MatrixGate所在机器的ip(默认是mdw的ip)、端口和datastream后缀:
http://<IP>:8240/datastream
进入到如下页面,然后点击“下一步”:
从页面上边的导航栏可以看到整个过程分6步:连接、选择、解析、配置、映射、提交。
2.1 连接
当前进行的是第一步:连接。在数据源里输入Kafka服务器地址,然后点击“下一步”。
2.2 选择
选择界面选择Kafka的Topic,这里选择刚才创建的csv_test
Topic。同时右边选择Topic对应的格式,CSV格式还需要指定分隔符。
选择好Topic后,会看到Topic里的数据。默认会选前10条作为样例。因为我们只向Topic里写入4条,所以这里可以看到全部4条数据。选择其中一条来做后面的解析和映射。
2.3 解析
解析页面会对刚才选择的样例数据进行解析,在右边会看到通过分隔符分割后的索引号和对应的值。确认无误后点击“下一步”:
2.4 配置
配置页面中,选择该Topic的数据接入到哪个目标表,自行选择数据库、模式和表后,会展示出表结构以及选择插入模式。
2.5 映射
映射页面进行源数据与目标表的映射,即CSV列与数据库表列的映射。操作过程为从源数据中选择一列,然后在目标表中选择映射的列。这样,在右边的映射规则中会出现一条新的映射规则,确认后点击保存。
按照这个步骤将所有列完成映射即可。
这里需要以下注意几点:
一、当列映射类型不兼容时,会出现如下提示:
二、当目标表字段选择时间戳类型且源数据选择字段的值为Unix时间戳时,输出值右边会显示时间戳格式转换复选框,默认为选中且可用状态,输出值结果自动转换为数据库时间戳格式。
三、如果CSV列数与数据库表列数不一致,或者无法一一对应,则允许出现未被映射的源数据列和数据库表列。未被映射的源数据列会被丢弃;未被映射的目标表列则按照建表规则填充NULL或默认值。
2.6 提交
完成映射后,进入到最后一个页面“提交”。该页面汇总了刚才选择的所有结果,使用时间戳格式转换的映射字段规则箭头上方会出现“数据格式转换”的文字。确认无误后点“提交”。
最后进入到完成页面:
JSON数据格式
目前Kafka数据接入支持两种数据格式:CSV和JSON。刚才演示了CSV格式的接入,下面对比一下JSON格式在接入时的差别。
选择页面中,选择格式里选择JSON。
解析页面可以看出,JSON列的索引使用$
符号开始,.
加列名来索引,索引规则参照JSONPath。
因为JSON支持多层级,所以映射页面里可选择不同层级下的列:
3. 查看接入数据
按照上面流程创建接入过程并保存提交后,就完成了数据接入的过程,之前写入到Topic的数据和后面新接入的数据都会流向目标表中。退回到主界面会看到刚才创建的数据流信息:
点击详情则会进入到详细信息页面:
下面登录数据库验证数据已正确接入:
mydb=# select * from csv_table;
id | no | city | score | create_time
----+----+----------+--------+------------------------
1 | 1 | Beijing | 123.05 | 2022-04-27 07:13:03+00
2 | 2 | Shanghai | 112.95 | 2022-04-27 07:13:03+00
3 | 3 | Shenzhen | 100 | 2022-04-27 07:13:03+00
4 | 4 | Guangxi | 88.5 | 2022-04-27 07:13:03+00
(4 rows)