赞
踩
背景:kafka在无预期的爆发式流量增长,会造成集群机器的高负载、io打满等情况,此时副本同步严重滞后,无法增速,扩容broker的情况下,副本也没法迁移,新扩分区也是默认分配分区到所有的broker上,不会明显减轻流量压力进行分摊,此时陷入了死循环,无法解决问题。
但是通过我们指定broker进行新扩分区的副本分配,可以使得新分区全部分配在新的broker上,既不需要迁移数据,也可以分摊流量,可以完美的解决这个问题。
通过原有的副本副本算法进行改写,遵循原有算法,只是可以指定分区和broker,分区出新的分区副本,然后通过api进行指定新分区副本进行创建
用法:go run AssignReplicasToBrokers.go -broker=xxx:9092 -broker_list=3,,4,5 -topic=test1 -partitions=7 -factor=8
-startPartition可以不指定,会自动根据已有分区计算出下一个分区,
-partitions是topic总分,会通过总分去计算出要扩的新分区,不需要指定
-factor副本可指定为2或者3,不指定就默认给3
-broker_list就是需要指定的brokerid,topic即为需要扩的topic
package main import ( "errors" "flag" "fmt" "math/rand" "strconv" "strings" "github.com/Shopify/sarama" ) type conf struct { broker []string broker_list []int32 topic string startPartition int32 partitions int32 replica [][]int32 factor int fixedStartIndex int32 } func main() { broker := flag.String("broker", "brokerval", "enter broker") broker_list := flag.String("broker_list", "broker_list", "enter broker list") topic := flag.String("topic", "topicval", "enter topic") startPartition := flag.Int("startPartition", 0, "enter startPartition") partitions := flag.Int("partitions", 0, "enter paritions") factor := flag.Int("factor", 0, "enter factor") flag.PrintDefaults() flag.Parse() fmt.Println(*factor) if *factor > 3 || *factor < 2 { fmt.Println("factor不能大于3小于2,设置为默认3") errors.New("factor不能大于3或小于2,设置为默认3") *factor = 3 } br_list := stringToIntSlice(*broker_list, ",") c := conf{broker: strings.Split(*broker, ","), broker_list: br_list, topic: *topic, startPartition: int32(*startPartition), partitions: int32(*partitions), factor: *factor, fixedStartIndex: -1, } // &conf.broker = []string{"11.154.134.110:9092"} // &conf.broker_list = []int{2, 3, 5} // &conf.topic = "tme_spider_access_scheduler_10001_3" // &conf.partitions = int32(4) // &conf.fixedStartIndex = -1 // replica := [][]int32{} // replica = append(replica, []int32{0, 1, 2}) // admin, err := sarama.NewClusterAdmin(c.broker, nil) // if err != nil { // fmt.Println(err) // return // } client, err := sarama.NewClient(c.broker, nil) if err != nil { fmt.Println("client get err", err) return } ntopics, err := client.Topics() if err != nil { errors.New("get cluster topic err!") } if strInSlice(ntopics, c.topic) == false { panic("topic not in this cluster") } // tps := mapset.NewSetFromSlice(ntopics) // if tps.Contains(*topic) == false { // panic("topic not in this cluster") // } pars, err := client.Partitions(*topic) if err != nil { fmt.Println("get topic partition error", err) } topic_pars := len(pars) // if c.startPartition < int32(topic_pars) { // panic("起始分区小于当前的分区") // } c.startPartition = int32(topic_pars) if c.partitions <= int32(topic_pars) { panic("要扩分区数不能小于等于当前分区") } // topicinfo, err := admin.DescribeTopics([]string{*topic}) // if err != nil { // fmt.Println("describe topic err!") // return // } // for _, topicmeta := range topicinfo { // fmt.Println(topicmeta.Partitions) // } new_pars := c.partitions - int32(topic_pars) // err = admin.CreatePartitions(topic, partitions, replica, false) // if err != nil { // fmt.Println("create partition failed", err) // } else { // fmt.Println("create partition success") // } // fmt.Println(replica) newReplicas := assignReplicas(c.broker_list, c.startPartition, new_pars, &c) fmt.Println(newReplicas) admin, err := sarama.NewClusterAdmin(c.broker, nil) if err != nil { fmt.Println("clusteradmin connect error") } err = admin.CreatePartitions(c.topic, c.partitions, newReplicas, false) if err != nil { fmt.Println("create partition failed", err) } else { fmt.Println("create partition success") } } func assignReplicas(broker_list []int32, startPartition int32, newpartition int32, c *conf) [][]int32 { ret := [][]int32{} var startIndex, nextReplicaShift int32 var currentPartitonId int32 if c.fixedStartIndex >= 0 { startIndex = c.fixedStartIndex nextReplicaShift = c.fixedStartIndex } else { startIndex = broker_list[rand.Intn(len(broker_list))] nextReplicaShift = broker_list[rand.Intn(len(broker_list))] } if startPartition > 0 { currentPartitonId = startPartition } else { currentPartitonId = 0 } fmt.Println(startIndex, nextReplicaShift, currentPartitonId) for i := int32(0); i < newpartition; i++ { replicaBuff := []int32{} if currentPartitonId > 0 && currentPartitonId%int32(len(broker_list)) == 0 { nextReplicaShift += 1 } firstReplicaIndex := (currentPartitonId + startIndex) % int32(len(broker_list)) replicaBuff = append(replicaBuff, broker_list[firstReplicaIndex]) for j := 0; j < c.factor-1; j++ { replicaBuff = append(replicaBuff, broker_list[replicaIndex(firstReplicaIndex, nextReplicaShift, int32(j), int32(len(broker_list)))]) } ret = append(ret, replicaBuff) currentPartitonId += 1 } return ret } func replicaIndex(firstReplicaIndex int32, secondReplicaShift int32, replicaIndex int32, nBrokers int32) int32 { shift := 1 + (secondReplicaShift+replicaIndex)%(nBrokers-1) replica := (firstReplicaIndex + shift) % nBrokers return replica } func stringToIntSlice(str string, sep string) (res []int32) { this := strings.Split(str, sep) if len(this) == 0 { return } for _, i := range this { if i == "" { continue } val, err := strconv.ParseInt(i, 10, 32) if err != nil { continue } res = append(res, int32(val)) } return } func strInSlice(sl []string, m string) bool { set := make(map[string]struct{}, len(sl)) for _, v := range sl { set[v] = struct{}{} } _, ok := set[m] return ok }
用法:python create_new_assign_partitions.py --broker=xxx:9092 --broker_list=3,,4,5 --topic=test1 --partitions=10
#!/usr/bin/env python # -*- coding:UTF-8 -*- from kafka.admin import KafkaAdminClient,NewPartitions from kafka import KafkaConsumer import random import argparse class AssignReplicasToBrokers: def __init__(self,nPartitions,brokerlist,replicationFactor,startPartition=0): self.Partitions = nPartitions self.brokers = brokerlist self.factor = replicationFactor self.fixedStartIndex = -1 self.startPartitionId = startPartition if startPartition else -1 self.ret = {} self.min_id = min(self.brokers) def assignReplicas(self): startIndex = self.fixedStartIndex if self.fixedStartIndex >= 0 else random.choice(self.brokers) currentPartitionId = max(0,self.startPartitionId) nextReplicaShift = self.fixedStartIndex if self.fixedStartIndex >= 0 else random.choice(self.brokers) for i in range(self.Partitions): replicaBuffer = [] if currentPartitionId >0 and (currentPartitionId % len(self.brokers) == 0): nextReplicaShift += 1 firstReplicaIndex = (currentPartitionId + startIndex) % len(self.brokers) replicaBuffer.append(self.brokers[firstReplicaIndex]) for j in range(self.factor-1): replicaBuffer.append(self.brokers[self.replicaIndex(firstReplicaIndex,nextReplicaShift,j,len(self.brokers))]) self.ret[currentPartitionId] = replicaBuffer currentPartitionId += 1 return self.ret def replicaIndex(self,firstReplicaIndex,secondReplicaShift,replicaIndex,nBrokers): shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) replica = (firstReplicaIndex + shift) % nBrokers return replica parser = argparse.ArgumentParser(description='broker,broker_list,topic,partitions') parser.add_argument('--broker',type=str,required=True,default='') parser.add_argument('--broker_list',type=str,required=True) parser.add_argument('--partitions',type=int,required=True) parser.add_argument('--topic',type=str,required=True) parser.add_argument('--factor',type=int,default=3) args = parser.parse_args() broker = args.broker broker_list = args.broker_list.split(',') topic = args.topic partitions = args.partitions factor = args.factor if factor >3 or factor <2: print "把factor重置为3" factor = 3 broker_list = [int(i) for i in broker_list if i] admin = KafkaAdminClient(bootstrap_servers=broker) consumer = KafkaConsumer(bootstrap_servers=broker) pars = consumer.partitions_for_topic(topic) num_pars = len(pars) print num_pars startPartition=num_pars newPartition = partitions - num_pars assigns = AssignReplicasToBrokers(newPartition,broker_list,factor,startPartition) reps = assigns.assignReplicas().values() new_partitions = {topic:NewPartitions(partitions,reps)} res=admin.create_partitions(new_partitions) print reps print res
通过此方式可以一键进行topic的新分区副本指定broker进行分配,轻松解决突发大流量造成的机器高负载无法同步和迁移数据的问题,go可以直接执行go bulid成二进制可执行文件,放在包里面随时去使用,python的也是一样,可通过python 文件名加参数去执行,相当于kafka自带的工具啦,完美解决此类问题
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。