YMatrix 4.2 新特性解读之 UPSERT

2021-09-03 · YMatrix Team
#产品动态

1. 前言

MatrixDB 4.2 版本发布后,MatrixGate 推出了一个新特性 — UPSERT

本文将详细介绍 UPSERT 语义及 MatrixGate 新特性的使用方法。

什么是UPSERT?

UPSERT 就是 UPDATE 和 INSERT 的结合。

实现一条SQL内自动插入或者更新:如果记录不存在则插入,如果记录存在则更新。

这是应用开发中最常见的场景之一,通过UPSERT语法可以大幅简化开发代码的复杂度,提升效率。

在众多场景下,都免不了使用UPSERT功能,比如:

  • 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
  • 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入

而 UPSERT 语义则直接实现了该逻辑。下面演示一下UPSERT的用法。

2. UPSERT 的使用方法

准备数据表

DROP TABLE IF EXISTS upsert_demo;

CREATE TABLE upsert_demo (
    ts    timestamp
  , tagid int
  , c1    int
  , c2    float4
  , UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);

注意 ⚠️

为了数据库能够使用 UPSERT 功能,记得要在表的“设备 id +时间戳”上创建 UNIQUE 约束。

使用 UPSERT 语义接入数据

SQL 方式

先来演示下,如何通过直接执行 SQL 语句的方式进行 UPSERT,对于使用 libpq 或 JDBC 等来连接数据库的用户,可以直接参考如下 SQL 语法来操作。

假设设备指标 c1 和 c2 分两个批次发送,则使用 SQL 来进行数据填充。

INSERT INTO upsert_demo VALUES ('2020-11-11', 1, 10, NULL)
    ON CONFLICT (ts, tagid)
    DO UPDATE SET
        c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
        c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);

INSERT INTO upsert_demo VALUES ('2020-11-11', 1, NULL, 20.1)
    ON CONFLICT (ts, tagid)
    DO UPDATE SET
        c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
        c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);

如上两条 SQL 语句里都包含了唯一索引列:ts 和 tagid,并且值相同。SQL1 只包含了 c1 列,c2 列为空;SQL2 只包含了 c2 列,c1 列为空。

ON CONFLICT 子句指定了唯一索引包含的列,用来判断数据行是 INSERT 还是 UPDATE。

c1=coalesce(EXCLUDED.c1,upsert_demo.c1)的含义是取 EXCLUDED.c1 和 upsert_demo.c1 中,第一个不为 NULL 的值。

EXCLUDED.c1 是原行的值,upsert_demo.c1 是新插入的值。即如果原数据行存在,且列值不为空则使用原值;否则使用新插入的值。

通过这种方式,实现了相同设备、相同时间的指标数据合并。

test=# select * from upsert_demo ;
         ts          | tagid | c1 |  c2
---------------------+-------+----+------
 2020-11-11 00:00:00 |     1 | 10 | 20.1
(1 row)

查询 upsert_demo 可以看到,c1 和 c2 的数据是期望的结果。

通过如上演示可以看到,使用 UPSERT 语义,可以通过单条 SQL 实现根据数据是否存在来选择是插入还是更新,大大简化了开发人员的工作。

MatrixGate 方式

相比使用 SQL 方式来接入,MatrixGate 作为 MatrixDB 高性能数据接入工具,在 4.2 版本中也加入了对 UPSERT 语义的支持,所以在生产环境中,更加推荐用户选择使用 MatrixGate 方式接入,性能可以提升百倍。

下面演示如何使用 MatrixGate 的 UPSERT 语义:

1. 准备数据文件

文件1: /tmp/upsert_demo1.dat

ts|tagid|c1|c2
2020-11-11|1|10|

文件1: /tmp/upsert_demo2.dat

ts|tagid|c1|c2
2020-11-11|1||20.1
2020-11-11|2||100.5
2020-11-11|2|200|
2. 执行 Gate

目标数据库假设为 test,端口为 5432 重点在 --upsert-key 参数 载入第一个文件(为了验证结果,载入前已将表中原数据清空)

tail -n +2 /tmp/upsert_demo1.dat | mxgated --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

结果

test=# select * from upsert_demo ;
         ts          | tagid | c1 | c2
---------------------+-------+----+----
 2020-11-11 00:00:00 |     1 | 10 |
(1 row)

载入第二个文件

tail -n +2 /tmp/upsert_demo2.dat | mxgated --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

结果

test=# select * from upsert_demo;
         ts          | tagid | c1  |  c2
---------------------+-------+-----+-------
 2020-11-11 00:00:00 |     1 |  10 |  20.1
 2020-11-11 00:00:00 |     2 | 200 | 100.5
(2 rows)

从结果可以看到,ts和tagid相同的行数据,进行了合并。

3. Q&A

3.总结

通过如上演示可以看到,UPSERT 语义对于时序数据指标分批发送的场景非常有用,会大大的减少开发人员的负担,而在 MatrixDB 4.2 版本中 MatrixGate 高性能数据接入工具也支持了该特性,欢迎大家下载使用。