Riak - 使用篇(1)
分布式高可用鍵值對數據庫Riak - 使用篇(1)
請先參考Riak - 安裝運維篇(1)安裝部署并啟動Riak集群(3個Node)。
Riak默認有兩種端口,一種是protobuf端口,還有一種是HTTP Restful端口。
以前的Riak client java API會支持兩種端口。最新的Riak client Java API作了很多改造,比如說利用netty4作為網絡通信框架,簡化了API代碼,并且只支持Protobuf端口。理由如下:
我們之后主要使用riak client2.0.5和Riak的protobuf端口進行開發使用Riak客戶端,在某些情況下,會穿插一些Restful端口使用。因為某些功能在riak client2.0.5還未實現或者實現的不完整:
1. 建立與Riak集群的連接
建立maven項目,添加如下依賴:
<properties><!-- NIO framework --><netty_version>5.0.0.Alpha2</netty_version><!-- riak --><riak-client_version>2.0.5</riak-client_version><!-- Encoding --><!-- fast json --><jackson-core_version>2.7.3</jackson-core_version><!-- log --><slf4j_version>1.7.2</slf4j_version> </properties> <dependencies><dependency><groupId>com.basho.riak</groupId><artifactId>riak-client</artifactId><version>${riak-client_version}</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty_version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j_version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson-core_version}</version></dependency> </dependencies>之后新建ClusterRiakClient類:
package io.timberwolf.cache;import com.basho.riak.client.api.RiakClient; import com.basho.riak.client.api.cap.Quorum; import com.basho.riak.client.api.commands.kv.FetchValue; import com.basho.riak.client.api.commands.kv.StoreValue; import com.basho.riak.client.core.RiakCluster; import com.basho.riak.client.core.RiakNode; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace;import java.net.UnknownHostException; import java.util.LinkedList; import java.util.concurrent.ExecutionException;/** * The client class of Riak * * @author Hash Zhang * @version 0.0.0 * @see @https://github.com/basho/riak-java-client/wiki/RiakClient-%26-Cluster-Node-Builders-%28v2.0%29 */ //implements AutoCloseable是為了利用Java7的新特性,try-with-resources public class ClusterRiakClient implements AutoCloseable{private final static int DEFAULTMAXCONNECTIONS = 50;private final static int DEFAULTMINCONNECTIONS = 10;private final static int RETRIES = 5;private int maxConnections = DEFAULTMAXCONNECTIONS;private int minConnections = DEFAULTMINCONNECTIONS;private int retries = DEFAULTMINCONNECTIONS;private RiakClient riakClient;/** * 構建Riak集群的客戶端 * * @param hosts (hosts格式:127.0.0.1:10017,127.0.0.1:10027,127.0.0.1:10037) * @throws UnknownHostException */public ClusterRiakClient(String hosts) throws UnknownHostException {String[] addresses = hosts.split(",");//使用RiakNode Builder用于構建每個RiakNodeRiakNode.Builder riakNodeBuilder = new RiakNode.Builder().withMinConnections(minConnections).withMaxConnections(maxConnections);//構建每個RiakNode并保存在list中LinkedList<RiakNode> nodes = new LinkedList<RiakNode>();for (int i = 0; i < addresses.length; i++) {int j = addresses[i].indexOf(":");nodes.add(riakNodeBuilder.withRemoteAddress(addresses[i].substring(0, j)).withRemotePort(Integer.parseInt(addresses[i].substring(j + 1))).build());}//構建Riak集群并啟動,注意,必須調用start()RiakCluster riakCliuster = RiakCluster.builder(nodes).withExecutionAttempts(retries).build();riakCliuster.start();riakClient = new RiakClient(riakCliuster);}public void close() throws Exception {riakClient.shutdown();} }將控制臺(Console)日志級別設為debug,編寫測試main:
public static void main(String[] args) throws Exception {ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");clusterRiakClient.close(); }運行,查看控制臺輸出
04-21 08:38:40.061 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10017 04-21 08:38:40.147 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10027 04-21 08:38:40.173 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10037 04-21 08:38:40.174 INFO [main] (RiakCluster.java:142) -RiakCluster is starting. 04-21 08:38:40.175 INFO [main] (RiakCluster.java:149) -RiakCluster is shutting down. 04-21 08:38:40.677 INFO [pool-1-thread-2] (RiakCluster.java:428) -All operations have completed 04-21 08:38:40.677 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10017 04-21 08:38:40.677 INFO [pool-1-thread-1] (RiakCluster.java:416) -Retrier shutting down. 04-21 08:38:40.677 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10017 04-21 08:38:40.683 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10027 04-21 08:38:40.683 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10027 04-21 08:38:40.685 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10037 04-21 08:38:40.685 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10037 04-21 08:38:40.688 INFO [pool-1-thread-1] (RiakCluster.java:305) -RiakCluster has shut down可以看出,Riak集群連接成功,并成功關閉連接。
如果日志級別為Debug,你可以看出,Riak客戶端使用了Netty客戶端連接的Riak集群
這里,Riakclient和RiakCluster還有實際的Riak集群之間的關系如下圖所示:
2. 將值放在桶中
Riak是一種鍵值的存儲方式,所以必須提供鍵值對才能保存數據。為了防止鍵沖突,Riak運用了桶的概念,用戶可以將鍵放入不同的桶中(相當于鍵的namespace)
首先,我們建立一個簡單的POJO類,保存快遞員的基本信息。
/** * User POJO * * @author Hash Zhang * @version 0.0.0 */ public class User {private String username;private String password;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;} }在ClusterRiakClient中添加如下方法:
/** * * 插入POJO對象 * * @param bucket 桶 * @param key 鍵 * @param value 值 * @throws ExecutionException * @throws InterruptedException * @throws JsonProcessingException */ public void set(String bucket, String key, Object value) throws ExecutionException, InterruptedException, JsonProcessingException {Location location = new Location(new Namespace(bucket), key);RiakObject riakObject = new RiakObject();riakObject.setValue(BinaryValue.create(OBJECT_MAPPER.writeValueAsBytes(value)));StoreValue sv = new StoreValue.Builder(riakObject).withLocation(location).build();StoreValue.Response svResponse = this.riakClient.execute(sv); }/** * 取得POJO對象 * * @param bucket 桶 * @param key 鍵 * @return FetchValue.Response * @throws ExecutionException * @throws InterruptedException */ public RiakObject get(String bucket, String key) throws ExecutionException, InterruptedException {Location location = new Location(new Namespace(bucket), key);FetchValue fv = new FetchValue.Builder(location).build();return this.riakClient.execute(fv).getValue(RiakObject.class); }Location通過Bucket和Key確定唯一位置。編寫測試main
public static void main(String[] args) throws Exception {ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");User user1 = new User();user1.setUsername("zhxhash").setPassword("123456");clusterRiakClient.set("users", "user1", user1);RiakObject riakObject = clusterRiakClient.get("users", "user1");User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(),User.class);System.out.println(getUser1.getUsername()+"|"+getUser1.getPassword());clusterRiakClient.close(); }結果:
04-21 09:31:03.213 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10017 04-21 09:31:03.240 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10027 04-21 09:31:03.264 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10037 04-21 09:31:03.266 INFO [main] (RiakCluster.java:142) -RiakCluster is starting. zhxhash|123456 04-21 09:31:04.048 INFO [main] (RiakCluster.java:149) -RiakCluster is shutting down. 04-21 09:31:04.549 INFO [pool-1-thread-2] (RiakCluster.java:428) -All operations have completed 04-21 09:31:04.549 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10017 04-21 09:31:04.549 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10017 04-21 09:31:04.555 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10027 04-21 09:31:04.556 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10027 04-21 09:31:04.558 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10037 04-21 09:31:04.558 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10037 04-21 09:31:04.562 INFO [pool-1-thread-2] (RiakCluster.java:305) -RiakCluster has shut down 04-21 09:31:04.564 INFO [pool-1-thread-1] (RiakCluster.java:416) -Retrier shutting down.以上是比較簡單的增加和查詢的實現,還有異步實現的方式。
/** * 異步插入POJO對象 * * @param bucket 桶 * @param key 鍵 * @param value 值 * @return RiakFuture<StoreValue.Response, Location> * @throws JsonProcessingException */ public RiakFuture<StoreValue.Response, Location> asyncSet(String bucket, String key, Object value) throws JsonProcessingException {Location location = new Location(new Namespace(bucket), key);RiakObject riakObject = new RiakObject();riakObject.setValue(BinaryValue.create(OBJECT_MAPPER.writeValueAsBytes(value)));StoreValue sv = new StoreValue.Builder(riakObject).withLocation(location).build();return this.riakClient.executeAsync(sv); } /** * 異步取得POJO對象 * * @param bucket * @param key * @return RiakFuture<FetchValue.Response, Location> */ public RiakFuture<FetchValue.Response, Location> asyncGet(String bucket, String key) {Location location = new Location(new Namespace(bucket), key);FetchValue fv = new FetchValue.Builder(location).build();return this.riakClient.executeAsync(fv); }測試代碼:
public static void main(String[] args) throws Exception {final ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");final CountDownLatch countDownLatch = new CountDownLatch(1);Thread thread = new Thread() {@Overridepublic void run() {User user1 = new User();user1.setUsername("zhxhash").setPassword("123456");RiakFuture<StoreValue.Response, Location> riakFuture = null;while(true) {try {riakFuture = clusterRiakClient.asyncSet("users", "user1", user1);riakFuture.await(100L, TimeUnit.MILLISECONDS);} catch (JsonProcessingException | InterruptedException e) {e.printStackTrace();}if (riakFuture.isDone() && riakFuture.isSuccess()){countDownLatch.countDown();break;}}}};thread.start();countDownLatch.await();RiakFuture<FetchValue.Response, Location> riakFuture = null;while(true) {try {riakFuture = clusterRiakClient.asyncGet("users", "user1");riakFuture.await(100L, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}if (riakFuture.isDone() && riakFuture.isSuccess()){RiakObject riakObject = riakFuture.get().getValue(RiakObject.class);User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(),User.class);System.out.println(getUser1.getUsername()+"|"+getUser1.getPassword());break;}}clusterRiakClient.close(); }輸出結果和之前同步的應該一樣。
我們還可以刪除某個鍵值對,同樣的,有同步和異步兩種實現:
測試:
public static void main(String[] args) throws Exception {ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");clusterRiakClient.remove("users", "user1");RiakObject riakObject = clusterRiakClient.get("users", "user1");if (riakObject!=null) {User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(), User.class);System.out.println(getUser1.getUsername() + "|" + getUser1.getPassword());} else{System.out.println("Not Found");}clusterRiakClient.close(); }更新比較特殊,我們放到之后的章節去說
取得一個bucket下的所有鍵:
轉載于:https://my.oschina.net/u/3747772/blog/1588941
總結
以上是生活随笔為你收集整理的Riak - 使用篇(1)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: crh寄存器_STM32的CRH、CRL
- 下一篇: 递归方法实现最大公约数