您当前的位置: 首页 >  Java

Bulut0907

暂无认证

  • 5浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Apache Pulsar通过Java/Scala API操作tenant、namespace、topic

Bulut0907 发布时间:2022-05-04 07:20:32 ,浏览量:5

目录
  • 1. 添加pom.xml依赖
  • 2. tenant租户的Java/Scala API
  • 3. namespace命令空间的Java/Scala API
  • 4. topic的Java/Scala API

1. 添加pom.xml依赖
        
            org.apache.pulsar
            pulsar-client-all
            2.9.1
        
2. tenant租户的Java/Scala API

示例如下

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.common.policies.data.TenantInfo
import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`
import scala.collection.convert.ImplicitConversions.`set AsJavaSet`

object tenantTest {

  def main(args: Array[String]): Unit = {
    // ===============创建Pulsar的Admin管理对象===================
    val serviceHttpUrl = "http://192.168.23.111:8086,192.168.23.112:8086,192.168.23.113:8086"
    val pulsarAdmin:PulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build()

    val tenantName = "my-tenant"
    // ===============基于Pulsar的Admin对象进行相关的操作================
    // 创建租户
    val allowedClusters = scala.collection.immutable.Set("pulsar-cluster")
    val config:TenantInfo = TenantInfo.builder().allowedClusters(allowedClusters).build()
    pulsarAdmin.tenants().createTenant(tenantName, config)

    // 查看当前有那些租户
    val tenants:Seq[String] = pulsarAdmin.tenants().getTenants().toSeq
    tenants.foreach(println)

    // 删除租户操作
    pulsarAdmin.tenants().deleteTenant(tenantName)

    // =========================关闭管理对象=============================
    pulsarAdmin.close()
  }

}

程序输出如下

my-tenant
public
pulsar
3. namespace命令空间的Java/Scala API

示例如下

import org.apache.pulsar.client.admin.PulsarAdmin
import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`

object namespaceTest {

  def main(args: Array[String]): Unit = {

    // ==========================创建Pulsar的Admin管理对象================
    val serviceHttpUrl = "http://192.168.23.111:8086,192.168.23.112:8086,192.168.23.113:8086"
    val pulsarAdmin:PulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build()

    val tenantName = "my-tenant"
    val namespaceName = s"${tenantName}/my-ns"
    // ===============基于Pulsar的Admin对象进行相关的操作================
    // 创建名称空间
    pulsarAdmin.namespaces().createNamespace(namespaceName)

    // 获取租户下的名称空间列表
    val namespaces:Seq[String] = pulsarAdmin.namespaces().getNamespaces(tenantName).toSeq
    namespaces.foreach(println)

    // 删除名称空间
    pulsarAdmin.namespaces().deleteNamespace(namespaceName)

    // ====================关闭admin对象========================
    pulsarAdmin.close()
  }

}

程序输出如下

my-tenant/my-ns
4. topic的Java/Scala API

示例如下

import org.apache.pulsar.client.admin.PulsarAdmin
import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`

object topicTest {

  def main(args: Array[String]): Unit = {

    // ===========================创建Pulsar的Admin管理对象====================
    val serviceHttpUrl = "http://192.168.23.111:8086,192.168.23.112:8086,192.168.23.113:8086"
    val pulsarAdmin:PulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build()

    val namespaceName = "my-tenant/my-ns"
    val nonPartitionedTopicName = "non-persistent://my-tenant/my-ns/my-non-partitioned-topic"
    val partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic"
    // ============================执行相关的操作=========================
    // 创建无分区的topic
    pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName)

    // 创建有分区的topic
    pulsarAdmin.topics().createPartitionedTopic(partitionedTopicName,3)

    // 修改有分区的Topic的分区数量
    pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName,6)

    // 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topic
    val topicList:Seq[String] = pulsarAdmin.topics().getList(namespaceName).toSeq
    topicList.foreach(println)
    // 查询当前有分区的topic列表
    val partitionedTopicList:Seq[String] = pulsarAdmin.topics().getPartitionedTopicList(namespaceName).toSeq
    partitionedTopicList.foreach(println)


    // 查询有分区的Topic,有多少个分区
    val partitions:Int = pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions
    println(partitions)

    // 删除无分区的Topic
    pulsarAdmin.topics().delete(nonPartitionedTopicName)
    // 删除有分区的Topic
    pulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName)


    // =========================关闭admin对象=============================
    pulsarAdmin.close()

  }

}

程序输出如下

persistent://my-tenant/my-ns/my-partitioned-topic-partition-0
persistent://my-tenant/my-ns/my-partitioned-topic-partition-1
persistent://my-tenant/my-ns/my-partitioned-topic-partition-4
persistent://my-tenant/my-ns/my-partitioned-topic-partition-5
persistent://my-tenant/my-ns/my-partitioned-topic-partition-2
persistent://my-tenant/my-ns/my-partitioned-topic-partition-3
non-persistent://my-tenant/my-ns/my-non-partitioned-topic
persistent://my-tenant/my-ns/my-partitioned-topic
6
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0742s