BookKeeper 介绍(3)--API

news/2024/9/23 17:11:24

本文主要介绍 BookKeeper 的 API,文中所使用到的软件版本:Java 1.8.0_341、BookKeeper 4.16.5。

 1、引入依赖

<dependency><groupId>org.apache.bookkeeper</groupId><artifactId>bookkeeper-server</artifactId><version>4.16.5</version>
</dependency>

2、Ledger API

2.1、传统 API

2.1.1、创建客户端

public void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群configuration.setAddEntryTimeout(2000);bookKeeper = new BookKeeper(configuration);
}

2.1.2、创建 ledger 并添加 entry

//LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());
LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes());
log.info("ledgerId={}", ledgerHandle.getId());
for (int i = 0; i < 10; i++) {long id = ledgerHandle.addEntry(("abc" + i).getBytes());log.info("id={}", id);
}ledgerHandle.close();

创建 ledger 时可以指定存储该 ledger 的节点个数、副本个数、几个副本 ack 表示写入成功。

2.1.3、从 ledger 中读取 entry

public void readLedger() throws Exception {LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes());Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());while (ledgerEntrys.hasMoreElements()) {LedgerEntry ledgerEntry = ledgerEntrys.nextElement();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry()));}ledgerHandle.close();
}

2.1.4、删除 ledger

public void deleteLedger() throws Exception {bookKeeper.deleteLedger(0L);
}

2.1.5、完整代码

package com.abc.demo.bk;import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.Enumeration;@Slf4j
public class LedgerCase {private BookKeeper bookKeeper;@Beforepublic void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setZkServers("10.49.196.33:2181");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");configuration.setAddEntryTimeout(2000);bookKeeper = new BookKeeper(configuration);}@Afterpublic void closeBookKeeper() throws Exception {bookKeeper.close();}@Testpublic void createLedger() throws Exception {//LedgerHandle ledgerHandle = bookKeeper.createLedger(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());LedgerHandle ledgerHandle = bookKeeper.createLedger(BookKeeper.DigestType.MAC, "123".getBytes());log.info("ledgerId={}", ledgerHandle.getId());for (int i = 0; i < 10; i++) {long id = ledgerHandle.addEntry(("abc" + i).getBytes());log.info("id={}", id);}ledgerHandle.close();}@Testpublic void readLedger() throws Exception {LedgerHandle ledgerHandle = bookKeeper.openLedger(0L, BookKeeper.DigestType.MAC, "123".getBytes());Enumeration<LedgerEntry> ledgerEntrys = ledgerHandle.readEntries(0, ledgerHandle.getLastAddConfirmed());while (ledgerEntrys.hasMoreElements()) {LedgerEntry ledgerEntry = ledgerEntrys.nextElement();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntry()));}ledgerHandle.close();}@Testpublic void deleteLedger() throws Exception {bookKeeper.deleteLedger(0L);}
}
LedgerCase.java

2.2、新 API

自 4.6 版本开始,BookKeeper 提供了一个新的客户端 API,利用了Java 8 的 CompletableFuture 功能。引入了 WriteHandle、WriteAdvHandle 和 ReadHandle 来替换通用的 LedgerHandle。新的 API 在 org.apache.bookkeeper.client.api 包中,应该只使用该包中定义的接口。4.6 版本的新 API 仍然是实验性的,并可能在后续的次要版本中进行更改。

2.2.1、创建客户端

public void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");//单机//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");//集群configuration.setAddEntryTimeout(2000);bookKeeper = BookKeeper.newBuilder(configuration).build();
}

2.2.2、创建 ledger 并添加 entry

public void write() throws Exception {WriteHandle writeHandle = bookKeeper.newCreateLedgerOp().withDigestType(DigestType.MAC).withPassword("123".getBytes()).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).execute().get();log.info("ledgerId={}", writeHandle.getId());for (int i = 0; i < 10; i++) {long id = writeHandle.append(("bcd" + i).getBytes());log.info("id={}", id);}writeHandle.close();
}

2.2.3、从 ledger 中读取 entry

public void read() throws Exception {ReadHandle readHandle = bookKeeper.newOpenLedgerOp().withLedgerId(1L).withDigestType(DigestType.MAC).withPassword("123".getBytes()).execute().get();LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed());Iterator<LedgerEntry> iterator = ledgerEntries.iterator();while (iterator.hasNext()) {LedgerEntry ledgerEntry = iterator.next();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes()));}readHandle.close();
}

2.2.4、删除 ledger

public void delete() throws Exception {bookKeeper.newDeleteLedgerOp().withLedgerId(101L).execute().get();
}

2.2.5、完整代码

package com.abc.demo.bk;import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.*;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.Iterator;@Slf4j
public class LedgerNewCase {private BookKeeper bookKeeper;@Beforepublic void createBookKeeper() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setZkServers("10.49.196.33:2181");//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");configuration.setAddEntryTimeout(2000);bookKeeper = BookKeeper.newBuilder(configuration).build();}@Afterpublic void closeBookKeeper() throws Exception {bookKeeper.close();}@Testpublic void write() throws Exception {WriteHandle writeHandle = bookKeeper.newCreateLedgerOp().withDigestType(DigestType.MAC).withPassword("123".getBytes()).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).execute().get();log.info("ledgerId={}", writeHandle.getId());for (int i = 0; i < 10; i++) {long id = writeHandle.append(("bcd" + i).getBytes());log.info("id={}", id);}writeHandle.close();}@Testpublic void read() throws Exception {ReadHandle readHandle = bookKeeper.newOpenLedgerOp().withLedgerId(1L).withDigestType(DigestType.MAC).withPassword("123".getBytes()).execute().get();LedgerEntries ledgerEntries = readHandle.read(0, readHandle.getLastAddConfirmed());Iterator<LedgerEntry> iterator = ledgerEntries.iterator();while (iterator.hasNext()) {LedgerEntry ledgerEntry = iterator.next();log.info("ledgerId={},entryId={},entry={}", ledgerEntry.getLedgerId(), ledgerEntry.getEntryId(), new String(ledgerEntry.getEntryBytes()));}readHandle.close();}@Testpublic void delete() throws Exception {bookKeeper.newDeleteLedgerOp().withLedgerId(101L).execute().get();}
}
LedgerNewCase.java

3、Advanced Ledger API

在 4.5.0 版本中,BookKeeper 引入了一些高级 API 用于高级功能。高级 API 和普通 API 主要区别在写 entry,读 entry 是一致的。

3.1、LedgerHandleAdv

LedgerHandleAdv 是 LedgerHandle 的高级扩展,在创建时可以指定 LedgerId,在添加 entry 时需要传入 entryId。

public void ledgerHandleAdv() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");configuration.setAddEntryTimeout(2000);org.apache.bookkeeper.client.BookKeeper bookKeeper = new org.apache.bookkeeper.client.BookKeeper(configuration);//LedgerHandleAdv ledgerHandle = (LedgerHandleAdv) bookKeeper.createLedgerAdv(3, 2, 2, BookKeeper.DigestType.MAC, "123".getBytes());LedgerHandleAdv ledgerHandleAdv = (LedgerHandleAdv) bookKeeper.createLedgerAdv(100L, 3, 2, 2, org.apache.bookkeeper.client.BookKeeper.DigestType.MAC, "123".getBytes(), Collections.emptyMap());log.info("ledgerId={}", ledgerHandleAdv.getId());for (int i = 0; i < 10; i++) {long id = ledgerHandleAdv.addEntry(i, ("abc" + i).getBytes()); //entry id 需从 0 开始log.info("id={}", id);}ledgerHandleAdv.close();bookKeeper.close();
}

3.2、4.6 版本新 API 的 LedgerHandleAdv

public void writeAdvHandle() throws Exception {ClientConfiguration configuration = new ClientConfiguration();//configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.33:2181/ledgers");configuration.setMetadataServiceUri("zk+hierarchical://10.49.196.30:2181;10.49.196.31:2181;10.49.196.32:2181/ledgers");org.apache.bookkeeper.client.api.BookKeeper bookKeeper = org.apache.bookkeeper.client.api.BookKeeper.newBuilder(configuration).build();WriteAdvHandle writeAdvHandle = bookKeeper.newCreateLedgerOp().withDigestType(DigestType.MAC).withPassword("123".getBytes()).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2).makeAdv() //CreateBuilder 转为 CreateAdvBuilder.withLedgerId(101L).execute().get();log.info("ledgerId={}", writeAdvHandle.getId());for (int i = 0; i < 10; i++) {long id = writeAdvHandle.write(i, ("bcd" + i).getBytes());log.info("id={}", id);}writeAdvHandle.close();bookKeeper.close();
}

 

 

参考:
https://bookkeeper.apache.org/docs/api/overview

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/46748.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

Phi-3 模型手机部署教程(微软发布的可与GPT-3.5媲美的小模型)

Phi 是微软AI研究院新推出的开源小型语言模型,适用于商业使用,其优势在于体积小、资源需求少,被称为“一个能跑在手机上的大模型”,我们通过Termux和Ollama工具,在我们手机上部署和体验……前面几篇博文,老牛同学和大家一起在个人电脑部署了Qwen2、GLM4、Llama3、ChatTTS…

龙哥量化:通达信近5日涨幅计算

如果您需要代写技术指标公式, 请联系我。 龙哥QQ:591438821 龙哥微信:Long622889 近5日涨幅计算: 一般的涨幅计算公式:涨幅:(C-REF(C,5))/REF(C,5)*100; 计算的有偏差,看截图, 涨幅只有15%,但实际振幅相当猛, 所以我认为计算近多少日涨幅,还是用振幅比较实际些 HH:…

寄存器

一、 1、通用寄存器32位寄存器可以分为后面那个16位寄存器的,16位的还可以分成两个八位的 2、mov的语法3、指令 (1)mov指令(详见2) (2)add指令 (3)sub指令 (4)and指令 (5)or指令 (6)xor指令 (7)not指令

嵌入式技术

嵌入式微处理体系结构 冯诺依曼结构 传统计算机采用冯诺依曼结构,也称普林斯顿结构。是一种将程序指令存储器和数据存储器合并在一起的存储结构。 冯诺依曼结构的计算机程序和数据共用一个存储空间,程序指令存储地址和数据存储地址指向同一个存储器的不同物理位置。 采用单一…

【HDC 2024】华为云开发者联盟驱动应用创新,赋能开发者成长

6月22日,华为云开发者联盟带来了精彩的专题论坛和圆桌讨论活动,更有扫地僧见面会、开发者体验官、极客挑战赛等形式丰富、高频高能的交流互动,让开发者面对面畅聊,迸发思想火花,探索技术的无限可能。本文分享自华为云社区《【HDC 2025】华为云开发者联盟驱动应用创新,赋能…

从零开始学习树莓派(一)

收到raspberrypi zero 2w 后进行的一些初始配置,如何使用无网线,无外接屏幕的方法远程连接树莓派。准备工作 树莓派的型号:RaspberryPi Zero 2w (自带的其他配件:亚克力外壳,HDMI线,GPIO拓展套件,Micro电源,32G内存卡,读卡器,散热片) (一)烧录系统 首先,使用官方…

审听训练与音质主观评价——笔记

Chap01. 声音与人耳听觉 生理听觉 人耳的结构分为外耳、中耳和内耳,如下图:外耳的耳廓首先会对到来的声音进行一定程度的反射,这些细微的变化能够辅助人即使在利用单耳时识别声音的方向(耳廓效应) 由于鼓膜的阻挡,耳道是一个封闭的管子,这种结构具有管共振效应,最佳共振…

鸿蒙生态伙伴SDK市场正式发布,驱动千行百业鸿蒙原生应用开发

6月21-23日,华为开发者大会(HDC 2024)在东莞举办。在22日举办的【鸿蒙生态伙伴SDK】论坛中,正式发布了【鸿蒙生态伙伴SDK市场】(以下简称:伙伴SDK市场),伙伴SDK市场是为开发者提供各类优质、安全SDK的聚合平台,帮助开发者便捷搜索、选取和使用能力丰富的SDK,轻松、高…