当前位置:   article > 正文

Kafka中shell命令使用(创建、查看、修改和删除Topic,模拟创建生产者和消费者)_kafka 创建topic

kafka 创建topic

1、 使用Kafka命令来创建Topic

 执行./kafka-topics.sh会有下述参数提示:

  1. Create, delete, describe, or change a topic.
  2. Option Description
  3. ------ -----------
  4. --alter Alter the number of partitions,
  5. replica assignment, and/or
  6. configuration for the topic.
  7. --at-min-isr-partitions if set when describing topics, only
  8. show partitions whose isr count is
  9. equal to the configured minimum. Not
  10. supported with the --zookeeper
  11. option.
  12. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect
  13. connect to> to. In case of providing this, a
  14. direct Zookeeper connection won't be
  15. required.
  16. --command-config <String: command Property file containing configs to be
  17. config property file> passed to Admin Client. This is used
  18. only with --bootstrap-server option
  19. for describing and altering broker
  20. configs.
  21. --config <String: name=value> A topic configuration override for the
  22. topic being created or altered.The
  23. following is a list of valid
  24. configurations:
  25. cleanup.policy
  26. compression.type
  27. delete.retention.ms
  28. file.delete.delay.ms
  29. flush.messages
  30. flush.ms
  31. follower.replication.throttled.
  32. replicas
  33. index.interval.bytes
  34. leader.replication.throttled.replicas
  35. max.compaction.lag.ms
  36. max.message.bytes
  37. message.downconversion.enable
  38. message.format.version
  39. message.timestamp.difference.max.ms
  40. message.timestamp.type
  41. min.cleanable.dirty.ratio
  42. min.compaction.lag.ms
  43. min.insync.replicas
  44. preallocate
  45. retention.bytes
  46. retention.ms
  47. segment.bytes
  48. segment.index.bytes
  49. segment.jitter.ms
  50. segment.ms
  51. unclean.leader.election.enable
  52. See the Kafka documentation for full
  53. details on the topic configs.It is
  54. supported only in combination with --
  55. create if --bootstrap-server option
  56. is used.
  57. --create Create a new topic.
  58. --delete Delete a topic
  59. --delete-config <String: name> A topic configuration override to be
  60. removed for an existing topic (see
  61. the list of configurations under the
  62. --config option). Not supported with
  63. the --bootstrap-server option.
  64. --describe List details for the given topics.
  65. --disable-rack-aware Disable rack aware replica assignment
  66. --exclude-internal exclude internal topics when running
  67. list or describe command. The
  68. internal topics will be listed by
  69. default
  70. --force Suppress console prompts
  71. --help Print usage information.
  72. --if-exists if set when altering or deleting or
  73. describing topics, the action will
  74. only execute if the topic exists.
  75. Not supported with the --bootstrap-
  76. server option.
  77. --if-not-exists if set when creating topics, the
  78. action will only execute if the
  79. topic does not already exist. Not
  80. supported with the --bootstrap-
  81. server option.
  82. --list List all available topics.
  83. --partitions <Integer: # of partitions> The number of partitions for the topic
  84. being created or altered (WARNING:
  85. If partitions are increased for a
  86. topic that has a key, the partition
  87. logic or ordering of the messages
  88. will be affected). If not supplied
  89. for create, defaults to the cluster
  90. default.
  91. --replica-assignment <String: A list of manual partition-to-broker
  92. broker_id_for_part1_replica1 : assignments for the topic being
  93. broker_id_for_part1_replica2 , created or altered.
  94. broker_id_for_part2_replica1 :
  95. broker_id_for_part2_replica2 , ...>
  96. --replication-factor <Integer: The replication factor for each
  97. replication factor> partition in the topic being
  98. created. If not supplied, defaults
  99. to the cluster default.
  100. --topic <String: topic> The topic to create, alter, describe
  101. or delete. It also accepts a regular
  102. expression, except for --create
  103. option. Put topic name in double
  104. quotes and use the '\' prefix to
  105. escape regular expression symbols; e.
  106. g. "test\.topic".
  107. --topics-with-overrides if set when describing topics, only
  108. show topics that have overridden
  109. configs
  110. --unavailable-partitions if set when describing topics, only
  111. show partitions whose leader is not
  112. available
  113. --under-min-isr-partitions if set when describing topics, only
  114. show partitions whose isr count is
  115. less than the configured minimum.
  116. Not supported with the --zookeeper
  117. option.
  118. --under-replicated-partitions if set when describing topics, only
  119. show under replicated partitions
  120. --version Display Kafka version.
  121. --zookeeper <String: hosts> DEPRECATED, The connection string for
  122. the zookeeper connection in the form
  123. host:port. Multiple hosts can be
  124. given to allow fail-over.

创建Topic命令:若命令中缺少某些参数,命令执行时会提示的。 

./kafka-topics.sh --create --zookeeper node1:2181,node2:2182,node3:2183 --topic test_kafka --partitions 3 --replication-factor 2

参数说明:

  • --create:表示创建操作
  • --zookeeper:指定zookeeper的地址
  • --topic:指定topic的名称
  • --partitions:指定分片数
  • --replication-factor:指定每个分片的副本数量

2、查看Topic

2.1 查看有哪些Topic

./kafka-topics.sh --list --zookeeper node1:2181,node2:2182,node3:2183

 2.2 查看某一个Topic的详细信息

./kafka-topics.sh --describe --zookeeper node1:2181,node2:2182,node3:2183 --topic test_kafka

 以上这些信息来源于:zookeeper

3. 修改Topic

        可以增加分片的数量,不能减少分片的数量,不支持副本的修改。

./kafka-topics.sh --alter --zookeeper node1:2181,node2:2182,node3:2183 --topic test_kafka --partitions 5

4、 删除Topic

./kafka-topics.sh --delete --zookeeper node1:2181,node2:2182,node3:2183 --topic test_kafka

 当没有数据时,先逻辑删除,等待一小会直接物理删除。

5、模拟一个生产者

 当执行 ./kafka-console-producer.sh命令时会提示下述参数:

  1. This tool helps to read data from standard input and publish it to Kafka.
  2. Option Description
  3. ------ -----------
  4. --batch-size <Integer: size> Number of messages to send in a single
  5. batch if they are not being sent
  6. synchronously. (default: 200)
  7. --broker-list <String: broker-list> REQUIRED: The broker list string in
  8. the form HOST1:PORT1,HOST2:PORT2.
  9. --compression-codec [String: The compression codec: either 'none',
  10. compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'.
  11. If specified without value, then it
  12. defaults to 'gzip'
  13. --help Print usage information.
  14. --line-reader <String: reader_class> The class name of the class to use for
  15. reading lines from standard in. By
  16. default each line is read as a
  17. separate message. (default: kafka.
  18. tools.
  19. ConsoleProducer$LineMessageReader)
  20. --max-block-ms <Long: max block on The max time that the producer will
  21. send> block for during a send request
  22. (default: 60000)
  23. --max-memory-bytes <Long: total memory The total memory used by the producer
  24. in bytes> to buffer records waiting to be sent
  25. to the server. (default: 33554432)
  26. --max-partition-memory-bytes <Long: The buffer size allocated for a
  27. memory in bytes per partition> partition. When records are received
  28. which are smaller than this size the
  29. producer will attempt to
  30. optimistically group them together
  31. until this size is reached.
  32. (default: 16384)
  33. --message-send-max-retries <Integer> Brokers can fail receiving the message
  34. for multiple reasons, and being
  35. unavailable transiently is just one
  36. of them. This property specifies the
  37. number of retires before the
  38. producer give up and drop this
  39. message. (default: 3)
  40. --metadata-expiry-ms <Long: metadata The period of time in milliseconds
  41. expiration interval> after which we force a refresh of
  42. metadata even if we haven't seen any
  43. leadership changes. (default: 300000)
  44. --producer-property <String: A mechanism to pass user-defined
  45. producer_prop> properties in the form key=value to
  46. the producer.
  47. --producer.config <String: config file> Producer config properties file. Note
  48. that [producer-property] takes
  49. precedence over this config.
  50. --property <String: prop> A mechanism to pass user-defined
  51. properties in the form key=value to
  52. the message reader. This allows
  53. custom configuration for a user-
  54. defined message reader.
  55. --request-required-acks <String: The required acks of the producer
  56. request required acks> requests (default: 1)
  57. --request-timeout-ms <Integer: request The ack timeout of the producer
  58. timeout ms> requests. Value must be non-negative
  59. and non-zero (default: 1500)
  60. --retry-backoff-ms <Integer> Before each retry, the producer
  61. refreshes the metadata of relevant
  62. topics. Since leader election takes
  63. a bit of time, this property
  64. specifies the amount of time that
  65. the producer waits before refreshing
  66. the metadata. (default: 100)
  67. --socket-buffer-size <Integer: size> The size of the tcp RECV size.
  68. (default: 102400)
  69. --sync If set message send requests to the
  70. brokers are synchronously, one at a
  71. time as they arrive.
  72. --timeout <Integer: timeout_ms> If set and the producer is running in
  73. asynchronous mode, this gives the
  74. maximum amount of time a message
  75. will queue awaiting sufficient batch
  76. size. The value is given in ms.
  77. (default: 1000)
  78. --topic <String: topic> REQUIRED: The topic id to produce
  79. messages to.
  80. --version Display Kafka version.

生产者命令:

./kafka-console-producer.sh --broker-list node1:9091,node2:9092,node3:9093 --topic test_kafka

6、模拟一个消费者

当执行 ./kafka-console-consumer.sh命令时会提示下述参数:

  1. This tool helps to read data from Kafka topics and outputs it to standard output.
  2. Option Description
  3. ------ -----------
  4. --bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
  5. connect to>
  6. --consumer-property <String: A mechanism to pass user-defined
  7. consumer_prop> properties in the form key=value to
  8. the consumer.
  9. --consumer.config <String: config file> Consumer config properties file. Note
  10. that [consumer-property] takes
  11. precedence over this config.
  12. --enable-systest-events Log lifecycle events of the consumer
  13. in addition to logging consumed
  14. messages. (This is specific for
  15. system tests.)
  16. --formatter <String: class> The name of a class to use for
  17. formatting kafka messages for
  18. display. (default: kafka.tools.
  19. DefaultMessageFormatter)
  20. --from-beginning If the consumer does not already have
  21. an established offset to consume
  22. from, start with the earliest
  23. message present in the log rather
  24. than the latest message.
  25. --group <String: consumer group id> The consumer group id of the consumer.
  26. --help Print usage information.
  27. --isolation-level <String> Set to read_committed in order to
  28. filter out transactional messages
  29. which are not committed. Set to
  30. read_uncommittedto read all
  31. messages. (default: read_uncommitted)
  32. --key-deserializer <String:
  33. deserializer for key>
  34. --max-messages <Integer: num_messages> The maximum number of messages to
  35. consume before exiting. If not set,
  36. consumption is continual.
  37. --offset <String: consume offset> The offset id to consume from (a non-
  38. negative number), or 'earliest'
  39. which means from beginning, or
  40. 'latest' which means from end
  41. (default: latest)
  42. --partition <Integer: partition> The partition to consume from.
  43. Consumption starts from the end of
  44. the partition unless '--offset' is
  45. specified.
  46. --property <String: prop> The properties to initialize the
  47. message formatter. Default
  48. properties include:
  49. print.timestamp=true|false
  50. print.key=true|false
  51. print.value=true|false
  52. key.separator=<key.separator>
  53. line.separator=<line.separator>
  54. key.deserializer=<key.deserializer>
  55. value.deserializer=<value.
  56. deserializer>
  57. Users can also pass in customized
  58. properties for their formatter; more
  59. specifically, users can pass in
  60. properties keyed with 'key.
  61. deserializer.' and 'value.
  62. deserializer.' prefixes to configure
  63. their deserializers.
  64. --skip-message-on-error If there is an error when processing a
  65. message, skip it instead of halt.
  66. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is
  67. available for consumption for the
  68. specified interval.
  69. --topic <String: topic> The topic id to consume on.
  70. --value-deserializer <String:
  71. deserializer for values>
  72. --version Display Kafka version.
  73. --whitelist <String: whitelist> Regular expression specifying
  74. whitelist of topics to include for
  75. consumption.

消费者命令:

./kafka-console-consumer.sh --bootstrap-server node1:9091,node2:9092,node3:9093 --topic test_kafka

指定消费者消费哪个分区的数据:

./kafka-console-consumer.sh --bootstrap-server node1:9091,node2:9092,node3:9093 --topic test_kafka --partition 0

7、测试生产者和消费者

 出现上述情形,说明测试成功!

注意:当生产者和消费者命令执行时的topic不存在时,会新建的。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/929501
推荐阅读
相关标签
  

闽ICP备14008679号