controllers/flinkcluster/sharder.go (64 lines of code) (raw):

package flinkcluster import ( "errors" "hash/fnv" "regexp" "strconv" ) const bigPrime = 31393 var ( podNameRegex, _ = regexp.Compile("(.*-)([0-9]+)$") ) type Sharder struct { TotalShards int // Total number of shards Shard int // Shard ordinal for this instance } func getResource(namespace, name string) string { return namespace + "/" + name } func (s *Sharder) IsOwnedByMe(namespace, name string) bool { return s.GetShard(namespace, name) == s.Shard } func (s *Sharder) GetShard(namespace, name string) int { resource := getResource(namespace, name) h := fnv1aHash(resource) indx := (int(h) * bigPrime) % s.TotalShards return indx } func fnv1aHash(s string) uint32 { h := fnv.New32a() h.Write([]byte(s)) return h.Sum32() } func getShardFromPod(podName string) (int, error) { m := podNameRegex.FindStringSubmatch(podName) if m == nil || len(m) != 3 { return 0, errors.New("pod-name is not in the expected format") } shard, err := strconv.Atoi(m[2]) if err != nil { return 0, err } return shard, nil } func NewSharderFromEnv(totalShardsStr string, podName string) (*Sharder, error) { // If totalShards or podName is not set, assume sharding is disabled. if totalShardsStr == "" || podName == "" { totalShardsStr = "1" podName = "flink-operator-0" } totalShards, err := strconv.Atoi(totalShardsStr) if err != nil { return nil, err } // Extract index from statefulset pod name shard, err := getShardFromPod(podName) if err != nil { return nil, err } return NewSharder(totalShards, shard), nil } func NewSharder(totalShards, shard int) *Sharder { return &Sharder{ TotalShards: totalShards, Shard: shard, } }