日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Riak - 使用篇(1)

發布時間:2023/12/14 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Riak - 使用篇(1) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

分布式高可用鍵值對數據庫Riak - 使用篇(1)

請先參考Riak - 安裝運維篇(1)安裝部署并啟動Riak集群(3個Node)。
Riak默認有兩種端口,一種是protobuf端口,還有一種是HTTP Restful端口。
以前的Riak client java API會支持兩種端口。最新的Riak client Java API作了很多改造,比如說利用netty4作為網絡通信框架,簡化了API代碼,并且只支持Protobuf端口。理由如下:

  • 利用Protocol Buffers端口會快25%左右
  • HTTP接口不支持基于證書的認證
  • HTTP協議抽象不夠詳細
  • 我們之后主要使用riak client2.0.5和Riak的protobuf端口進行開發使用Riak客戶端,在某些情況下,會穿插一些Restful端口使用。因為某些功能在riak client2.0.5還未實現或者實現的不完整:

  • MapReduce
  • 二級索引查詢
  • 列出所有keys
  • 列出所有buckets
  • 1. 建立與Riak集群的連接

    建立maven項目,添加如下依賴:

    <properties><!-- NIO framework --><netty_version>5.0.0.Alpha2</netty_version><!-- riak --><riak-client_version>2.0.5</riak-client_version><!-- Encoding --><!-- fast json --><jackson-core_version>2.7.3</jackson-core_version><!-- log --><slf4j_version>1.7.2</slf4j_version> </properties> <dependencies><dependency><groupId>com.basho.riak</groupId><artifactId>riak-client</artifactId><version>${riak-client_version}</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty_version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j_version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson-core_version}</version></dependency> </dependencies>

    之后新建ClusterRiakClient類:

    package io.timberwolf.cache;import com.basho.riak.client.api.RiakClient; import com.basho.riak.client.api.cap.Quorum; import com.basho.riak.client.api.commands.kv.FetchValue; import com.basho.riak.client.api.commands.kv.StoreValue; import com.basho.riak.client.core.RiakCluster; import com.basho.riak.client.core.RiakNode; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace;import java.net.UnknownHostException; import java.util.LinkedList; import java.util.concurrent.ExecutionException;/** * The client class of Riak * * @author Hash Zhang * @version 0.0.0 * @see @https://github.com/basho/riak-java-client/wiki/RiakClient-%26-Cluster-Node-Builders-%28v2.0%29 */ //implements AutoCloseable是為了利用Java7的新特性,try-with-resources public class ClusterRiakClient implements AutoCloseable{private final static int DEFAULTMAXCONNECTIONS = 50;private final static int DEFAULTMINCONNECTIONS = 10;private final static int RETRIES = 5;private int maxConnections = DEFAULTMAXCONNECTIONS;private int minConnections = DEFAULTMINCONNECTIONS;private int retries = DEFAULTMINCONNECTIONS;private RiakClient riakClient;/** * 構建Riak集群的客戶端 * * @param hosts (hosts格式:127.0.0.1:10017,127.0.0.1:10027,127.0.0.1:10037) * @throws UnknownHostException */public ClusterRiakClient(String hosts) throws UnknownHostException {String[] addresses = hosts.split(",");//使用RiakNode Builder用于構建每個RiakNodeRiakNode.Builder riakNodeBuilder = new RiakNode.Builder().withMinConnections(minConnections).withMaxConnections(maxConnections);//構建每個RiakNode并保存在list中LinkedList<RiakNode> nodes = new LinkedList<RiakNode>();for (int i = 0; i < addresses.length; i++) {int j = addresses[i].indexOf(":");nodes.add(riakNodeBuilder.withRemoteAddress(addresses[i].substring(0, j)).withRemotePort(Integer.parseInt(addresses[i].substring(j + 1))).build());}//構建Riak集群并啟動,注意,必須調用start()RiakCluster riakCliuster = RiakCluster.builder(nodes).withExecutionAttempts(retries).build();riakCliuster.start();riakClient = new RiakClient(riakCliuster);}public void close() throws Exception {riakClient.shutdown();} }

    將控制臺(Console)日志級別設為debug,編寫測試main:

    public static void main(String[] args) throws Exception {ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");clusterRiakClient.close(); }

    運行,查看控制臺輸出

    04-21 08:38:40.061 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10017 04-21 08:38:40.147 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10027 04-21 08:38:40.173 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10037 04-21 08:38:40.174 INFO [main] (RiakCluster.java:142) -RiakCluster is starting. 04-21 08:38:40.175 INFO [main] (RiakCluster.java:149) -RiakCluster is shutting down. 04-21 08:38:40.677 INFO [pool-1-thread-2] (RiakCluster.java:428) -All operations have completed 04-21 08:38:40.677 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10017 04-21 08:38:40.677 INFO [pool-1-thread-1] (RiakCluster.java:416) -Retrier shutting down. 04-21 08:38:40.677 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10017 04-21 08:38:40.683 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10027 04-21 08:38:40.683 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10027 04-21 08:38:40.685 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10037 04-21 08:38:40.685 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10037 04-21 08:38:40.688 INFO [pool-1-thread-1] (RiakCluster.java:305) -RiakCluster has shut down

    可以看出,Riak集群連接成功,并成功關閉連接。
    如果日志級別為Debug,你可以看出,Riak客戶端使用了Netty客戶端連接的Riak集群
    這里,Riakclient和RiakCluster還有實際的Riak集群之間的關系如下圖所示:


    2. 將值放在桶中

    Riak是一種鍵值的存儲方式,所以必須提供鍵值對才能保存數據。為了防止鍵沖突,Riak運用了桶的概念,用戶可以將鍵放入不同的桶中(相當于鍵的namespace)

    首先,我們建立一個簡單的POJO類,保存快遞員的基本信息。

    /** * User POJO * * @author Hash Zhang * @version 0.0.0 */ public class User {private String username;private String password;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;} }

    在ClusterRiakClient中添加如下方法:

    /** * * 插入POJO對象 * * @param bucket 桶 * @param key 鍵 * @param value 值 * @throws ExecutionException * @throws InterruptedException * @throws JsonProcessingException */ public void set(String bucket, String key, Object value) throws ExecutionException, InterruptedException, JsonProcessingException {Location location = new Location(new Namespace(bucket), key);RiakObject riakObject = new RiakObject();riakObject.setValue(BinaryValue.create(OBJECT_MAPPER.writeValueAsBytes(value)));StoreValue sv = new StoreValue.Builder(riakObject).withLocation(location).build();StoreValue.Response svResponse = this.riakClient.execute(sv); }/** * 取得POJO對象 * * @param bucket 桶 * @param key 鍵 * @return FetchValue.Response * @throws ExecutionException * @throws InterruptedException */ public RiakObject get(String bucket, String key) throws ExecutionException, InterruptedException {Location location = new Location(new Namespace(bucket), key);FetchValue fv = new FetchValue.Builder(location).build();return this.riakClient.execute(fv).getValue(RiakObject.class); }

    Location通過Bucket和Key確定唯一位置。編寫測試main

    public static void main(String[] args) throws Exception {ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");User user1 = new User();user1.setUsername("zhxhash").setPassword("123456");clusterRiakClient.set("users", "user1", user1);RiakObject riakObject = clusterRiakClient.get("users", "user1");User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(),User.class);System.out.println(getUser1.getUsername()+"|"+getUser1.getPassword());clusterRiakClient.close(); }

    結果:

    04-21 09:31:03.213 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10017 04-21 09:31:03.240 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10027 04-21 09:31:03.264 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10037 04-21 09:31:03.266 INFO [main] (RiakCluster.java:142) -RiakCluster is starting. zhxhash|123456 04-21 09:31:04.048 INFO [main] (RiakCluster.java:149) -RiakCluster is shutting down. 04-21 09:31:04.549 INFO [pool-1-thread-2] (RiakCluster.java:428) -All operations have completed 04-21 09:31:04.549 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10017 04-21 09:31:04.549 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10017 04-21 09:31:04.555 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10027 04-21 09:31:04.556 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10027 04-21 09:31:04.558 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10037 04-21 09:31:04.558 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10037 04-21 09:31:04.562 INFO [pool-1-thread-2] (RiakCluster.java:305) -RiakCluster has shut down 04-21 09:31:04.564 INFO [pool-1-thread-1] (RiakCluster.java:416) -Retrier shutting down.

    以上是比較簡單的增加和查詢的實現,還有異步實現的方式。

    /** * 異步插入POJO對象 * * @param bucket 桶 * @param key 鍵 * @param value 值 * @return RiakFuture<StoreValue.Response, Location> * @throws JsonProcessingException */ public RiakFuture<StoreValue.Response, Location> asyncSet(String bucket, String key, Object value) throws JsonProcessingException {Location location = new Location(new Namespace(bucket), key);RiakObject riakObject = new RiakObject();riakObject.setValue(BinaryValue.create(OBJECT_MAPPER.writeValueAsBytes(value)));StoreValue sv = new StoreValue.Builder(riakObject).withLocation(location).build();return this.riakClient.executeAsync(sv); } /** * 異步取得POJO對象 * * @param bucket * @param key * @return RiakFuture<FetchValue.Response, Location> */ public RiakFuture<FetchValue.Response, Location> asyncGet(String bucket, String key) {Location location = new Location(new Namespace(bucket), key);FetchValue fv = new FetchValue.Builder(location).build();return this.riakClient.executeAsync(fv); }

    測試代碼:

    public static void main(String[] args) throws Exception {final ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");final CountDownLatch countDownLatch = new CountDownLatch(1);Thread thread = new Thread() {@Overridepublic void run() {User user1 = new User();user1.setUsername("zhxhash").setPassword("123456");RiakFuture<StoreValue.Response, Location> riakFuture = null;while(true) {try {riakFuture = clusterRiakClient.asyncSet("users", "user1", user1);riakFuture.await(100L, TimeUnit.MILLISECONDS);} catch (JsonProcessingException | InterruptedException e) {e.printStackTrace();}if (riakFuture.isDone() && riakFuture.isSuccess()){countDownLatch.countDown();break;}}}};thread.start();countDownLatch.await();RiakFuture<FetchValue.Response, Location> riakFuture = null;while(true) {try {riakFuture = clusterRiakClient.asyncGet("users", "user1");riakFuture.await(100L, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}if (riakFuture.isDone() && riakFuture.isSuccess()){RiakObject riakObject = riakFuture.get().getValue(RiakObject.class);User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(),User.class);System.out.println(getUser1.getUsername()+"|"+getUser1.getPassword());break;}}clusterRiakClient.close(); }

    輸出結果和之前同步的應該一樣。
    我們還可以刪除某個鍵值對,同樣的,有同步和異步兩種實現:

    /** * 刪除一個鍵值對 * * @param bucket 桶 * @param key 鍵 * @throws ExecutionException * @throws InterruptedException */ public void remove(String bucket, String key) throws ExecutionException, InterruptedException {Location location = new Location(new Namespace(bucket), key);DeleteValue dv = new DeleteValue.Builder(location).build();this.riakClient.execute(dv); }/** * 異步刪除一個鍵值對 * * @param bucket 桶 * @param key 鍵 * @return RiakFuture<Void, Location> */ public RiakFuture<Void, Location> asyncRemove(String bucket, String key){Location location = new Location(new Namespace(bucket), key);DeleteValue dv = new DeleteValue.Builder(location).build();return this.riakClient.executeAsync(dv); }

    測試:

    public static void main(String[] args) throws Exception {ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");clusterRiakClient.remove("users", "user1");RiakObject riakObject = clusterRiakClient.get("users", "user1");if (riakObject!=null) {User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(), User.class);System.out.println(getUser1.getUsername() + "|" + getUser1.getPassword());} else{System.out.println("Not Found");}clusterRiakClient.close(); }

    更新比較特殊,我們放到之后的章節去說
    取得一個bucket下的所有鍵:

    /** * 取得一個bucket下所有鍵 * * @param bucket 桶 * @return List<String> * @throws ExecutionException * @throws InterruptedException */ public List<String> getKeys(String bucket) throws ExecutionException, InterruptedException {Namespace ns = new Namespace(bucket);ListKeys lk = new ListKeys.Builder(ns).build();ListKeys.Response response = this.riakClient.execute(lk);List<String> keys = new LinkedList<>();for (Location l : response){keys.add(l.getKeyAsString());}return keys; }

    轉載于:https://my.oschina.net/u/3747772/blog/1588941

    總結

    以上是生活随笔為你收集整理的Riak - 使用篇(1)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。