代码示例

前面介绍了如何创建表并插入、查询数据,所有操作都是使用psql在命令行终端下进行的。在实际开发过程中,开发者大都是用编程语言连接数据库并进行DML与DDL操作。本节将介绍如何使用Java、Python、Golang和C连接数据库并进行查询。

MatrixDB在通信协议上与PostgreSQL兼容,所以使用与PostgreSQL相同的方式连接即可。


1. JAVA

使用Java连接MatrixDB,首先要下载JDBC包,https://jdbc.postgresql.org/download.html

示例代码如下:

package main;
import java.sql.*;

public class Main {
    public static void main(String []args) {
        Connection connection=null;
        Statement statement =null;
        try{
            String url="jdbc:postgresql://127.0.0.1:5432/mxadmin";
            String user="mxadmin";
            String password = "abcd";
            Class.forName("org.postgresql.Driver");
            connection= DriverManager.getConnection(url, user, password);

            // 创建表
            String sql="CREATE TABLE data ("
                    + "time timestamp,"
                    + "tag_id int,"
                    + "metrics1 float8,"
                    + "metrics2 float8,"
                    + "metrics3 float8"
                    + ")Distributed by(tag_id)";
            statement=connection.createStatement();
            statement.execute(sql);

            // 插入数据
            sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)";
            statement.execute(sql);

            // 查询结果
            sql = "SELECT * FROM data";
            ResultSet resultSet=statement.executeQuery(sql);
            while(resultSet.next()){
                String ts=resultSet.getString(1);
                int tag_id = resultSet.getInt(2);
                double metrics1 = resultSet.getDouble(3);
                double metrics2 = resultSet.getDouble(4);
                double metrics3 = resultSet.getDouble(5);
                System.out.printf("%s %d %f %f %f\n", ts, tag_id, metrics1, metrics2, metrics3);
            }
        }catch(Exception e){
            throw new RuntimeException(e);
        }finally{
            try{
                statement.close();
            }
            catch(SQLException e){
                e.printStackTrace();
                throw new RuntimeException(e);
            }finally{
                try{
                    connection.close();
                }
                catch(SQLException e){
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
    }

}


2. Python

使用python连接MatrixDB,首先要安装psycopg2:

pip3 install psycopg2

示例代码如下:

# -*- coding: utf-8 -*-
import psycopg2

class MatrixDB(object):
    def __init__(self):
        self.host = "127.0.0.1"
        self.user = "mxadmin"
        self.database = "mxadmin"
        self.port = "5432"
        self.password = "abcd"

    def get_conn(self):
        conn = psycopg2.connect(database=self.database,
                                user=self.user,
                                password=self.password,
                                host=self.host,
                                port=self.port)
        return conn

    def create_table(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "CREATE TABLE data(" \
                "time timestamp," \
                "tag_id int," \
                "metrics1 float8," \
                "metrics2 float8," \
                "metrics3 float8" \
                ")Distributed by(tag_id)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def insert(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)"
        cursor.execute(sql)
        conn.commit()
        conn.close()

    def select(self):
        conn = self.get_conn()
        cursor = conn.cursor()
        sql = "SELECT * FROM data"
        cursor.execute(sql)
        data = cursor.fetchone()
        conn.commit()
        conn.close()
        return data

if __name__ == '__main__':
    mxdb = MatrixDB()
    mxdb.create_table()
    mxdb.insert()
    print (mxdb.select())


3. Golang

Golang连接MatrixDB依赖github.com/lib/pq

示例代码如下:

package main

import (
    "database/sql"
    "fmt"
    "time"

    _ "github.com/lib/pq"
)

type Data struct {
    Time time.Time
    TagId int32
    Metrics1 float64
    Metrics2 float64
    Metrics3 float64
}

func getConn() (*sql.DB, error) {
    psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "127.0.0.1", 5432, "mxadmin", "", "mxadmin")

    db, err := sql.Open("postgres", psqlInfo)
    if err != nil {
        return nil, err
    }

    return db, nil
}

func createTable() error {
    db, _ := getConn()
    defer db.Close()

    _, err := db.Exec(`
CREATE TABLE data(
    time timestamp,
    tag_id int,
    metrics1 float8,
    metrics2 float8,
    metrics3 float8
)Distributed by(tag_id)
`)
    return err
}

func insertData() error {
    db, _ := getConn()
    defer db.Close()

    _, err := db.Exec(`INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)`)
    return err
}

func selectData() ([]Data, error) {
    db, _ := getConn()
    defer db.Close()

    datas := make([]Data, 0)
    rows, err := db.Query(`SELECT * FROM data`)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    for rows.Next() {
        data := Data{}
        err = rows.Scan(&data.Time, &data.TagId, &data.Metrics1, &data.Metrics2, &data.Metrics3)
        if err != nil {
            return nil, err
        }
        datas = append(datas, data)
    }

    return datas, err
}

func main() {
    createTable()
    insertData()
    datas, _ := selectData()
    fmt.Println(datas)
}


4. C

使用C语言连接MatrixDB,需要依赖libpq库,该库随MatrixDB一起安装,在$GPHOME目录下。 在编译的时候要指定头文件路径和库路径:

gcc -I $GPHOME/include -L $GPHOME/lib -lpq main.c -o main

示例代码如下:

#include <stdio.h>
#include "libpq-fe.h"

void PQresultPrint(PGresult *res)
{
    int nFields = PQnfields(res);
    int nTuples = PQntuples(res);
    int i, j;
    for (i=0; i<nTuples; i++)
    {
        for (j=0; j<nFields; j++)
        {
            printf("%s ", PQgetvalue(res, i, j));
        }
        printf("\n");
    }
}

PGconn *GetConn()
{
    PGconn *conn;
    char str[128];

    sprintf(str, "host=%s port=%d dbname=%s user=%s password=%s",
            "127.0.0.1",
            5432,
            "mxadmin",
            "mxadmin",
            "abcd"
            );

    // 建立连接
    conn = PQconnectdb(str);
    if(PQstatus(conn) == CONNECTION_BAD)
    {
        fprintf(stderr,"数据库连接失败!");
        fprintf(stderr,"%s",PQerrorMessage(conn));
        return NULL;
    }
    return conn;
}

PGresult *ExecuteQuery(const char *query)
{

    PGconn *conn;
    PGresult *res;

    conn = GetConn();
    if (conn == NULL)
        return NULL;

    // 执行SQL
    res = PQexec(conn, query);
    if (PQresultStatus(res) == PGRES_FATAL_ERROR)
    {
        fprintf(stderr, "%s", PQerrorMessage(conn));
        return NULL;
    }

    PQfinish(conn);
    return res;
}

PGresult *CreateTable()
{
    const char *sql =
"CREATE TABLE data \
( \
    time timestamp, \
    tag_id int, \
    metrics1 float8, \
    metrics2 float8, \
    metrics3 float8 \
)Distributed by(tag_id)";
    return ExecuteQuery(sql);
}

PGresult *InsertData()
{
    const char *sql = "INSERT INTO data VALUES(now(), 1, 1.1, 1.2, 1.3)";
    return ExecuteQuery(sql);
}

PGresult *SelectData()
{
    const char *sql = "SELECT * FROM data";
    return ExecuteQuery(sql);
}

int main()
{
    PGresult *res;
    res = CreateTable();
    if (res != NULL)
        PQclear(res);
    res = InsertData();
    if (res != NULL)
        PQclear(res);
    res = SelectData();
    if (res != NULL) {
        PQresultPrint(res);
        PQclear(res);
    }
    return 0;
}