Cassandra開發入門文檔第四部分(集合類型、元組類型、時間序列、計數列)

Cassandra 提供了三種集合類型,分別是Set,List,Map
Set: 非重複集,存儲了一組類型相同的不重複元素,當被查詢時會返回排好序的結果,可是內部構成是無序的值,應該是在查詢時對結果進行了排序。
List: 列表,查詢時會按照元素在list中的index順序來返回結果,能夠存儲多個重複的值。
Map:哈希Key-Value鍵值對,提供了名字到值的映射java

-- 開始工做:
bin/cqlsh localhost
-- 查看全部的鍵空間:
DESCRIBE keyspaces
-- 使用建立的鍵空間:
USE myks;
-- 查看已有表:
describe tables;
-- 查看錶結構:
describe table user_status_updates;

 

Set

-- 修改表結構,增長一個列,用於存儲評星用戶記錄

ALTER TABLE "user_status_updates"
ADD "starred_by_users" text;

-- 查詢出一個空記錄
SELECT "starred_by_users"
FROM "user_status_updates"
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

-- 修改記錄,增長評星用戶
UPDATE "user_status_updates"
SET "starred_by_users" = '["bob"]'
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

-- 事實上,能夠直接定義列的類型爲集合列,而不是定義爲Text類型
ALTER TABLE "user_status_updates"
DROP "starred_by_users";
-- 注意一下:SET<text>類型
ALTER TABLE "user_status_updates"
ADD "starred_by_userss" SET<text>;

-- 修改記錄方法1,增長評星用戶,此次是集合,使用{}來存儲多條數據
UPDATE "user_status_updates"
SET "starred_by_userss" = {'bob'}
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

-- 修改記錄方法2,用+
UPDATE "user_status_updates"
SET "starred_by_userss" = "starred_by_userss" + {'carol'}
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "starred_by_userss" = "starred_by_userss" + {'dave'}
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

-- 修改記錄方法2,用-
UPDATE "user_status_updates"
SET "starred_by_userss" = "starred_by_users" - {'dave'}
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "starred_by_userss" = "starred_by_userss" + {'carol'}
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

-- 多加幾個爲了測試排序
UPDATE "user_status_updates"
SET "starred_by_userss" = "starred_by_userss" + {'alice'}
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

SELECT "starred_by_userss"
FROM "user_status_updates"
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

 

查詢結果發現,是通過了排序:
starred_by_userss
-----------------------------------
{'alice', 'bob', 'carol', 'dave'}git

集合列表List

和上面的差很少,區別是容許重複,而且沒有排序。github

ALTER TABLE "user_status_updates"
ADD "shared_by" LIST<text>;

UPDATE "user_status_updates"
SET "shared_by" = ['bob']
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "shared_by" = "shared_by" + ['carol']
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "shared_by" = ['dave'] + "shared_by"
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "shared_by"[1] = 'robert'
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "shared_by"[3] = 'maurice'
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "shared_by" = "shared_by" - ['carol']
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;


--刪除記錄的方法是按照index順序下標進行刪除
DELETE "shared_by"[0]
FROM "user_status_updates"
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

UPDATE "user_status_updates"
SET "shared_by" = "shared_by" + ['arol']
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

-- 查詢
SELECT "shared_by"
FROM "user_status_updates"
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

 

查詢結果發現,沒有排序:
shared_by
----------------------------
['dave', 'robert', 'arol']web

Map

存儲鍵值對,鍵是惟一和無序的。數據庫

ALTER TABLE "users"
ADD social_identities MAP<text,bigint>;

UPDATE "users"
SET "social_identities" = {'twitter': 353637}
WHERE "username" = 'alice';

UPDATE "users"
SET "social_identities"['instagram'] = 9839025,
"social_identities"['yo'] = 25
WHERE "username" = 'alice';

UPDATE "users"
SET "social_identities"['twitter'] = 2725634
WHERE "username" = 'alice';

DELETE "social_identities"['instagram']
FROM "users"
WHERE "username" = 'alice';


INSERT INTO "users" (
"username", "email", "encrypted_password",
"social_identities", "version"
) VALUES (
'ivan',
'ivan@gmail.com',
0x48acb738ece5780f37b626a0cb64928b,
{'twitter': 875958, 'instagram': 109550},
NOW()
);

 

使用TTL

UPDATE users USING TTL <computed_ttl>
SET todo['2012-10-1'] = 'find water' WHERE user_id = 'frodo';
INSERT INTO users
(user_name, password)
VALUES ('cbrown', 'ch@ngem4a') USING TTL 86400;

在設定的computed_ttl數值秒後,數據會自動刪除。api

使用集合類型要注意:
1.集合的每一項最大是64K。
2.保持集合內的數據不要太大,省得Cassandra 查詢延時過長,只因Cassandra 查詢時會讀出整個集合內的數據,集合在內部不會進行分頁,集合的目的是存儲小量數據。
3.不要向集合插入大於64K的數據,不然只有查詢到前64K數據,其它部分會丟失。數據結構

正確的查詢姿式分佈式

若是查詢條件where跟隨集合列的時候會報錯,是由於沒有創建索引
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"ide

-- 正確的查詢姿式,先建立索引
CREATE INDEX ON "user_status_updates" ("starred_by_userss");

SELECT * FROM "user_status_updates"
WHERE "starred_by_userss" CONTAINS 'alice';

-- map類型也是

CREATE INDEX ON "users" (KEYS("social_identities"));

SELECT "username", "social_identities"
FROM users
WHERE "social_identities" CONTAINS KEY 'twitter';

SELECT "shared_by"[2]
FROM "user_status_updates"
WHERE "username" = 'alice'
AND "id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02;

SELECT "social_identities"['twitter']
FROM "users"
WHERE "username" = 'alice';

SELECT * FROM "user_status_updates"
WHERE "username" = 'alice'
ORDER BY "id" ASC
LIMIT 2;


DROP INDEX user_social_identities_idx;
ALTER TABLE "users" DROP social_identities;

ALTER TABLE "users" ADD social_identities set<text>;

 

元組和自定義類型

 
 
-- 元組

ALTER
TABLE "users" ADD "education" frozen <tuple<text, int>>; ALTER TABLE "users" DROP "education"; ALTER TABLE "users" ADD "education" tuple<text, int>; UPDATE "users" SET "education" = ('Big Data University', 2019) WHERE "username" = 'alice'; UPDATE "users" SET "education" = ('Cassandra College', null, null) WHERE "username" = 'bob'; UPDATE "users" SET "education" = ('BDU') WHERE "username" = 'alice'; UPDATE "users" SET "education" = ('Big Data University', 2003) WHERE "username" = 'alice'; CREATE INDEX ON "users" ("education"); SELECT "username", "education" FROM users; SELECT "username", "education" FROM users WHERE "education" = ('Big Data University', 2003); -- 自定義類型 CREATE TYPE "education_information" ( "school_name" text, "graduation_year" int ); ALTER TABLE "users" DROP "education"; ALTER TABLE "users" ADD "education" frozen <"education_information">; UPDATE "users" SET "education" = { "school_name": 'Big Data University', "graduation_year": 2003 } WHERE "username" = 'alice'; CREATE INDEX ON "users" ("education"); SELECT "username", "education" FROM "users" WHERE "education" = { "school_name": 'Big Data University', "graduation_year": 2003 }; SELECT "username", "education"."school_name" FROM "users" WHERE "username" = 'alice'; ALTER TABLE "users" ADD "telephone_numbers" map<text, set<text>>; ALTER TABLE "users" ADD "telephone_numbers" map<text, frozen<set<text>>>; UPDATE "users" SET "telephone_numbers"['home'] = {'123456789', '123789456'} WHERE "username" = 'alice'; UPDATE "users" SET "telephone_numbers"['office'] = {'123654789', '123987456'} WHERE "username" = 'alice'; ALTER TABLE "users" ADD "education_history" set<frozen<"education_information">>; UPDATE "users" SET "education_history" = {{ "school_name": 'Big Data University', "graduation_year": 2003 },{ "school_name": 'Cassandra College', "graduation_year": 2005 }} WHERE "username" = 'alice';

 

時間序列數據庫


目前業界時間序列數據庫能夠分紅兩類,基於現有的數據庫或者專門爲時間序列數據寫的數據庫。
有不少時間序列數據庫是基於 Cassandra 的, KairosDB 是其中比較早的一個。 InfluxDB 是專用於時間序列的數據庫。
另外還有十幾種時間序列數據庫,都是基於Cassandra,見https://xephonhq.github.io/awesome-time-series-database/?language=All&backend=Cassandra函數

一個簡單的時間序列數據結構

CREATE TABLE IF NOT EXISTS naive.metrics (
metric_name text, metric_timestamp timestamp, value int,
PRIMARY KEY (metric_name, metric_timestamp))
INSERT INTO naive.metrics (metric_name, metric_timestamp, value) VALUES (cpu, 2017/03/17:13:24:00:20, 10.2) 
INSERT INTO naive.metrics (metric_name, metric_timestamp, value) VALUES (mem, 2017/03/17:13:24:00:20, 80.3)

上圖顯示了使用 Cassandra 存儲時間序列數據時 naive 的表結構, Cluster Key 存儲時間戳,列的值存儲實際的數值。 它 naive 之處在於序列和 Cassandra 的物理行是一一對應的。 當單一序列的數據點超過 Cassandra 的限制(20億)時就會崩潰。
一個更加成熟的表結構是把一個時間序列按時間範圍分區,(KairosDB 按照 3 周來劃分,可是能夠根據數據量進行不定長的劃分)。 爲了存儲分區的信息,須要一張額外的表。 同時在 naive 裏序列的名稱只是一個簡單的字符串,若是須要按照多種條件進行篩選的話,須要存儲更多的鍵值對,而且對於這些鍵值對須要創建索引以提升查詢速度。

更復雜的例子:

一個雙分區列的例子,("status_update_username", "status_update_id")是聯合分區列,observed_at是簇分區列,也是時間序列,類型爲timeuuid

CREATE TABLE "status_update_views" (

"status_update_username" text,
"status_update_id" timeuuid,
"observed_at" timeuuid,
"client_type" text,
PRIMARY KEY (
("status_update_username", "status_update_id"),
"observed_at"
)
);

-- 插入數據
INSERT INTO "status_update_views" (
"status_update_username", "status_update_id",
"observed_at", "client_type"
) VALUES (
'alice', 76e7a4d0-e796-11e3-90ce-5f98e903bf02,
85a53d10-4cc3-11e4-a7ff-5f98e903bf02,
'web'
);
-- 查詢
SELECT "observed_at", "client_type"
FROM "status_update_views"
WHERE "status_update_username" = 'alice'
AND "status_update_id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02
AND "observed_at" >= MINTIMEUUID('2014-10-05 00:00:00+0000')
AND "observed_at" < MINTIMEUUID('2014-10-06 00:00:00+0000');
-- 查詢計數
SELECT COUNT(1)
FROM "status_update_views"
WHERE "status_update_username" = 'alice'
AND "status_update_id" = 76e7a4d0-e796-11e3-90ce-5f98e903bf02
AND "observed_at" >= MINTIMEUUID('2014-10-05 00:00:00+0000')
AND "observed_at" < MINTIMEUUID('2014-10-06 00:00:00+0000');

 

計數表counter

有一些計數類型的應用,好比某個頁面被點擊了多少次,或9月的每一天,狀態更新了多少次。通常地說,咱們但願將每日整體視圖計數存儲在一個結構中,該結構容許咱們在給定的時間範圍內輕鬆檢索計數。咱們不須要存儲關於每一個視圖事件的離散信息;只需知道天天發生了多少視圖就足夠了。Cassandra很是擅長作這個。


我我的認爲這種高性能、低存儲空間的計數應用交給Redis會更好,Cassandra要處理分佈式鎖,有比較多的侷限(http://rockthecode.io/blog/highly-available-counters-using-cassandra/),Cassandra仍是作它擅長的列存儲、時間序列就行了。

-- 注意,counter類型
-- year是分區列,date爲簇列

CREATE TABLE "daily_status_update_views" (
"year" int,
"date" timestamp,
"total_views" counter,
"web_views" counter,
"mobile_views" counter,
"api_views" counter,
PRIMARY KEY (("year"), "date")
);

SELECT "date", "total_views"
FROM "daily_status_update_views"
WHERE "year" = 2014
AND "date" >= '2014-09-01'
AND "date" < '2014-09-30';

UPDATE "daily_status_update_views"
SET "total_views" = "total_views" + 1,
"web_views" = "web_views" + 1
WHERE "year" = 2014
AND "date" = '2014-10-05 00:00:00+0000';

SELECT * FROM "daily_status_update_views";

-- 在嘗試添加的時候會報錯,緣由是counter表只容許update,不許insert
-- InvalidRequest: Error from server: code=2200 [Invalid query] message="INSERT statements are not allowed on counter tables, use UPDATE instead"

INSERT INTO "daily_status_update_views"
("year", "date", "total_views")
VALUES (2014, '2014-02-01 00:00:00+0000', 500);

-- 正確的姿式
UPDATE "daily_status_update_views"
SET "total_views" = "total_views" + 500
WHERE "year" = 2014
AND "date" = '2014-02-01 00:00:00+0000';

DELETE FROM "daily_status_update_views"
WHERE "year" = 2014
AND "date" = '2014-02-01 00:00:00+0000';

UPDATE "daily_status_update_views"
SET "total_views" = "total_views" + 100
WHERE "year" = 2014
AND "date" = '2014-02-01 00:00:00+0000';

-- 在嘗試修改表定義的時候會報錯,只能增長counter類型的列
-- ConfigurationException: Cannot add a non counter column (last_view_time) in a counter column family

ALTER TABLE "daily_status_update_views"
ADD "last_view_time" timestamp;

 

用戶定義函數

比較簡單,很少說了。感受應用的地方很少。

CREATE OR REPLACE FUNCTION selectCity(location text) 
CALLED ON NULL INPUT 
RETURNS text 
LANGUAGE java 
AS ' 
if (location == null)
return null;
else
return location.split(",")[0];
';

SELECT username, selectCity(location) FROM "users";

CREATE OR REPLACE FUNCTION selectCity(location text) 
RETURNS NULL ON NULL INPUT 
RETURNS text 
LANGUAGE java 
AS ' 
return location.split(",")[0];
';

INSERT INTO "status_update_views" ("status_update_username", "status_update_id", "observed_at", "client_type") VALUES ('alice', 76e7a4d0-e796-11e3-90ce-5f98e903bf02, NOW(), 'web');
INSERT INTO "status_update_views" ("status_update_username", "status_update_id", "observed_at", "client_type") VALUES ('alice', 76e7a4d0-e796-11e3-90ce-5f98e903bf02, NOW(), 'web');
INSERT INTO "status_update_views" ("status_update_username", "status_update_id", "observed_at", "client_type") VALUES ('alice', 76e7a4d0-e796-11e3-90ce-5f98e903bf02, NOW(), 'mobile');
INSERT INTO "status_update_views" ("status_update_username", "status_update_id", "observed_at", "client_type") VALUES ('alice', 76e7a4d0-e796-11e3-90ce-5f98e903bf02, NOW(), 'mobile');
INSERT INTO "status_update_views" ("status_update_username", "status_update_id", "observed_at", "client_type") VALUES ('alice', 76e7a4d0-e796-11e3-90ce-5f98e903bf02, NOW(), 'api');


CREATE OR REPLACE FUNCTION state_group_and_count (state map<text, int>, client_type text)
CALLED ON NULL INPUT
RETURNS map<text, int>
LANGUAGE java AS '
Integer count = (Integer) state.get(client_type); 
if (count == null) 
count = 1; 
else 
count++; 
state.put(client_type, count); 
return state; 
';

CREATE OR REPLACE AGGREGATE group_and_count (text) 
SFUNC state_group_and_count
STYPE map<text, int> 
INITCOND {};

 

SELECT status_update_username, status_update_id, group_and_count(client_type) 
FROM status_update_views 
WHERE status_update_username='alice' AND status_update_id=76e7a4d0-e796-11e3-90ce-5f98e903bf02;


SELECT status_update_username, status_update_id, group_and_count(client_type) 
FROM status_update_views 
WHERE status_update_username='alice' AND status_update_id=76e7a4d0-e796-11e3-90ce-5f98e903bf02 
AND "observed_at" >= MINTIMEUUID('2016-12-21 00:00:00+0000') 
AND "observed_at" < MINTIMEUUID('2016-12-22 00:00:00+0000');
相關文章
相關標籤/搜索