追逐繁星的孩子

お帰りなさい

首页 标签 归档 分类 关于
KSQL语法文档
日期 2018-05-18   |    标签 Kafka   |    评论

最近研究了Kafka及针对Kafka的一个开源的KSQL引擎。KSQL旨在利用类SQL语法进行kafka流计算处理。关于KSQL的具体语法进行了如下整理。PS:主要是文档的翻译加上自己的一些实践。具体请参考官方文档:https://docs.confluent.io/current/ksql/docs/syntax-reference.html

注意:该篇只整理KSQL语法,另外的关于KSQL的介绍使用或者具体实践后续看情况整理。

kafka topic(根据以下2个kafka topic的格式进行演示)

  1. topic名称: clickstream

    • 演示数据

      • key: "222.203.236.146"
      • value
{
        "remote_user":"-",
        "request":"GET /index.html HTTP/1.1",
        "referrer":"-",
        "agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36",
        "bytes":"4096",
        "ip":"111.203.236.146",
        "time":"17/五月/2018:09:00:57 +0000",
        "userid":36,
        "_time":1526547657671,
        "status":"404"
    }

  1. topic名称: clickstream_users

    • 演示数据

      • key: "1"
      • value
{
        "registered_at":"1464669154810",
        "user_id":1,
        "city":"Frankfurt",
        "level":"Silver",
        "last_name":"Garrity",
        "first_name":"Ferd",
        "username":"AlanGreta_GG66"
    }

KSQL支持的字段类型

  • BOOLEAN
  • INTEGER
  • BIGINT
  • DOUBLE
  • VARCHAR(STRING)
  • ARRAY(JSON and AVRO only)
  • MAP (JSON and AVRO only) ]

注意:创建stream或者table时KSQL都会默认添加以下两个字段

  1. ROWKEY:
    • 对应kafka的message key
  2. ROWTIME:
    • 对应kafka的message timestamp,时间窗口根据这个值计算时间

根据topic 'clickstream' 创建stream 'click'

格式:

CREATE STREAM stream_name (  column_name data_type  [, ...] )
  WITH ( property_name = expression [, ...] );

具体案例:

CREATE STREAM click(remote_user VARCHAR,request VARCHAR,referrer VARCHAR,agent VARCHAR,bytes VARCHAR,ip VARCHAR,time VARCHAR,userid BIGINT,_time BIGINT,status VARCHAR) WITH(KAFKA_TOPIC='clickstream',VALUE_FORMAT='JSON')

根据topic 'clickstream_users' 创建table 'users'

格式:

CREATE TABLE table_name (  column_name data_type  [, ...] )
  WITH ( property_name = expression [, ...] );

具体案例:

CREATE TABLE users(registered_at BIGINT,user_id INTEGER,city VARCHAR,level VARCHAR,last_name VARCHAR,first_name VARCHAR,username VARCHAR) WITH(KAFKA_TOPIC='clickstream_users',VALUE_FORMAT='JSON',KEY='user_id')
关键字参数 描述
KAFKA_TOPIC 指定对应topic名字
VALUE_FORMAT 存在以下三种格式: JSON(主要)、DELIMITED(逗号分隔)、AVRO
KEY(VARCHAR类型) 对应kafka的message key,创建table时必填,创建stream是可选的(最好不填)
TIMESTAMP 覆盖默认生成的ROWTIME

根据现有的'click'和'users'创建stream 'click_users'

格式:

CREATE STREAM stream_name
  [WITH ( property_name = expression [, ...] )]
  AS SELECT  select_expr [, ...]
  FROM from_item
  [ LEFT JOIN join_table ON join_criteria ]
  [ WHERE condition ]
  [PARTITION BY column_name];

具体案例:

CREATE STREAM click_users WITH (KAFKA_TOPIC='click_users',VALUE_FORMAT='JSON',PARTITIONS=4,REPLICAS=1) AS SELECT u.username,c.* FROM click c LEFT JOIN users u ON c.userid=u.user_id WHERE 1=1 PARTITION BY username

根据现有的'click'创建table 'click_user_count'

格式:

CREATE TABLE table_name
  [WITH ( property_name = expression [, ...] )]
  AS SELECT  select_expr [, ...]
  FROM from_item
  [ WINDOW window_expression ]
  [ WHERE condition ]
  [ GROUP BY grouping_expression ]
  [ HAVING having_expression ];

具体案例:

CREATE TABLE click_user_count WITH (KAFKA_TOPIC='click_user_count',VALUE_FORMAT='JSON',PARTITIONS=4,REPLICAS=1) AS SELECT userid,count(*) FROM click WINDOW TUMBLING (size 10 seconds) WHERE 1=1 GROUP BY userid HAVING count(*)>1
关键字参数 描述
KAFKA_TOPIC 指定创建的topic名字,如果不指定则默认取stream名字的大写
VALUE_FORMAT 存在以下三种格式: JSON(主要)、DELIMITED(逗号分隔)、AVRO
PARTITIONS 创建的topic的partition数量,不指定则默认取ksql.sink.partitions=4,可通过配置文件或者命令行SET命令自行配置
REPLICAS 创建的topic的replica的数量,不指定则默认取input stream/table的replica值
TIMESTAMP 覆盖默认生成的ROWTIME
PARTITION BY 指定click_users的ROWKEY,这个语句不是在select语法里面的,所以指定的列名是select之后的列名

执行sql查询

格式:

SELECT select_expr [, ...]
  FROM from_item
  [ LEFT JOIN join_table ON join_criteria ]
  [ WINDOW window_expression ]
  [ WHERE condition ]
  [ GROUP BY grouping_expression ]
  [ HAVING having_expression ];

具体案例

SELECT c.userid,count(*) FROM click c LEFT JOIN users u ON c.userid=u.user_id WINDOW TUMBLING (size 10 seconds) WHERE 1=1 GROUP BY c.userid HAVING count(*)>1
  • WINDOW(时间窗口支持以下三种):
    • HOPPING: 比如(SIZE 20 SECONDS,ADVANCE BY 5 SECONDS)表示每5秒钟统计之前20秒的内容
    • TUMBLING: 特殊的HOPPING(SIZE 20 SECONDS) SIZE与ADVANCE的值一样
    • SESSION: 比如(20 SECONDS) 表示某个key 20秒不活动则完成一个session,之后的记录都计算在下一个session中

一些常用操作

操作 代码
删除stream DROP STREAM click;
删除table DROP TABLE users;
显示stream/table的列名和类型 DESCRIBE click;
显示stream/table的列名和类型以及kafka topic详细信息 DESCRIBE EXTENDED click;
显示某个查询的执行计划 query_id通过show queries获取 EXPLAIN query_id;
显示kafka topic内容(从头取数据) PRINT 'click' FROM BEGINNING;
显示kafka topic内容(取最新数据) PRINT 'click';
打印topics SHOW/LIST TOPICS;
打印streams SHOW/LIST STREAMS;
打印tables SHOW/LIST TABLES;
打印queries SHOW QUERIES;
打印KSQL 配置 SHOW PROPERTIES;

内置SQL方法

方法名 举例 描述
ABS ABS(col1) 绝对值
ARRAYCONTAINS ARRAYCONTAINS('[1, 2, 3]', col1) 给定的列的值是否在给定的json或者数组中,返回true/false
CEIL CEIL(col1) 取上限
FLOOR FLOOR(col1) 取下限
CONCAT CONCAT(col1, '_hello') 字符串拼接
EXTRACTJSONFIELD EXTRACTJSONFIELD(message, '$.log.cloud') 提取json字符串内部的value
LCASE LCASE(col1) 字符串小写
UCASE UCASE(col1) 字符串大写
LEN LEN(coll) 字符串长度
RANDOM RANDOM() 0.0~1.0之间的随即DOUBLE
ROUND ROUND(col1) 四舍五入
TIMESTAMPTOSTRING STRINGTOTIMESTAMP(col1, 'yyyy-MM-dd HH:mm:ss.SSS') 根据给定的format格式将string形式的timestamp转换成BIGINT表示
TIMESTAMPTOSTRING TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') 根据给定的format格式将BIGINT形式的timestamp转换成string表示
SUBSTRING SUBSTRING(col1, 2, 5) 字符串截取
TRIM TRIM(col1) 去除字符串两边空格

内置聚合方法

方法名 例子 描述
COUNT COUNT(col1) 计算总数
MAX MAX(col1) 最大值
MIN MIN(col1) 最小值
SUM SUM(col1) 总数
TOPK TOPK(col1, k) 返回给定字段的top k个value
TOPKDISTINCT TOPKDISTINCT(col1, k) 返回给定字段的top k个不重复value