赞
踩
- package org.sunny.two;
-
- import kafka.admin.AdminClient;
- import kafka.admin.AdminUtils;
- import kafka.admin.TopicCommand;
- import kafka.server.ConfigType;
- import kafka.utils.ZkUtils;
-
- import org.apache.kafka.common.security.JaasUtils;
- import scala.collection.JavaConversions;
-
- import java.util.*;
-
-
- public class TopicsController {
-
- /*
- 创建主题
- kafka-topics.sh --zookeeper localhost:2181 --create
- --topic kafka-action --replication-factor 2 --partitions 3
- */
- public static void createTopic(TopicConfig config){
- ZkUtils zkUtils = null;
- try {
- zkUtils = ZkUtils.apply(config.getZookeeper(),30000,
- 30000, JaasUtils.isZkSecurityEnabled());
- System.out.println(config);
- if (!AdminUtils.topicExists(zkUtils,config.getTopicName())){
- AdminUtils.createTopic(zkUtils,config.getTopicName(),config.getPartitions(),
- config.getReplica

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。