Documentation
¶
Index ¶
- func AddMembers(ctx context.Context, js jetstream.JetStream, streamName string, ...) ([]string, error)
- func DeleteElastic(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func DeleteMemberMappings(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func DeleteMembers(ctx context.Context, js jetstream.JetStream, streamName string, ...) ([]string, error)
- func DeleteStatic(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func ElasticGetPartitionFilters(config ElasticConsumerGroupConfig, memberName string) []string
- func ElasticIsInMembershipAndActive(ctx context.Context, js jetstream.JetStream, streamName string, ...) (bool, bool, error)
- func ElasticMemberStepDown(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func GeneratePartitionFilters(members []string, maxMembers uint, memberMappings []MemberMapping, ...) []string
- func ListElasticActiveMembers(ctx context.Context, js jetstream.JetStream, streamName string, ...) ([]string, error)
- func ListElasticConsumerGroups(ctx context.Context, js jetstream.JetStream, streamName string) ([]string, error)
- func ListStaticActiveMembers(ctx context.Context, js jetstream.JetStream, streamName string, ...) ([]string, error)
- func ListStaticConsumerGroups(ctx context.Context, js jetstream.JetStream, streamName string) ([]string, error)
- func SetMemberMappings(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- func StaticMemberStepDown(ctx context.Context, js jetstream.JetStream, streamName string, ...) error
- type ConsumerGroupConsumeContext
- type ConsumerGroupMsg
- func (scgMsg *ConsumerGroupMsg) Ack() error
- func (scgMsg *ConsumerGroupMsg) Data() []byte
- func (scgMsg *ConsumerGroupMsg) DoubleAck(ctx context.Context) error
- func (scgMsg *ConsumerGroupMsg) Headers() nats.Header
- func (scgMsg *ConsumerGroupMsg) InProgress() error
- func (scgMsg *ConsumerGroupMsg) Metadata() (*jetstream.MsgMetadata, error)
- func (scgMsg *ConsumerGroupMsg) Nak() error
- func (scgMsg *ConsumerGroupMsg) NakWithDelay(delay time.Duration) error
- func (scgMsg *ConsumerGroupMsg) Reply() string
- func (scgMsg *ConsumerGroupMsg) Subject() string
- func (scgMsg *ConsumerGroupMsg) Term() error
- func (scgMsg *ConsumerGroupMsg) TermWithReason(reason string) error
- type ElasticConsumerGroupConfig
- type ElasticConsumerGroupConsumerInstance
- type MemberMapping
- type StaticConsumerGroupConfig
- type StaticConsumerGroupConsumerInstance
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddMembers ¶
func AddMembers(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, memberNamesToAdd []string) ([]string, error)
AddMembers adds members to an elastic consumer group
func DeleteElastic ¶
func DeleteElastic(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string) error
DeleteElastic Deletes an elastic consumer group
func DeleteMemberMappings ¶
func DeleteMemberMappings(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string) error
DeleteMemberMappings deletes the custom member mappings for an elastic consumer group
func DeleteMembers ¶
func DeleteMembers(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, memberNamesToDrop []string) ([]string, error)
DeleteMembers drops members from an elastic consumer group
func DeleteStatic ¶
func DeleteStatic(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string) error
DeleteStatic Deletes a static consumer group
func ElasticGetPartitionFilters ¶
func ElasticGetPartitionFilters(config ElasticConsumerGroupConfig, memberName string) []string
ElasticGetPartitionFilters For the given ElasticConsumerGroupConfig returns the list of partition filters for the given member
func ElasticIsInMembershipAndActive ¶
func ElasticIsInMembershipAndActive(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, memberName string) (bool, bool, error)
ElasticIsInMembershipAndActive checks if a member is included in the elastic consumer group and is active
func ElasticMemberStepDown ¶
func ElasticMemberStepDown(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, memberName string) error
ElasticMemberStepDown forces the current active (pinned) application instance for a member of an elastic consumer group to step down
func GeneratePartitionFilters ¶
func GeneratePartitionFilters(members []string, maxMembers uint, memberMappings []MemberMapping, memberName string) []string
GeneratePartitionFilters generates the partition filters for a particular member of a consumer group, according to the provided max number of members and the membership
func ListElasticActiveMembers ¶
func ListElasticActiveMembers(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string) ([]string, error)
ListElasticActiveMembers lists the active members of an elastic consumer group
func ListElasticConsumerGroups ¶
func ListElasticConsumerGroups(ctx context.Context, js jetstream.JetStream, streamName string) ([]string, error)
ListElasticConsumerGroups lists the elastic consumer groups for a given stream
func ListStaticActiveMembers ¶
func ListStaticActiveMembers(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string) ([]string, error)
ListStaticActiveMembers lists the active members of a static consumer group
func ListStaticConsumerGroups ¶
func ListStaticConsumerGroups(ctx context.Context, js jetstream.JetStream, streamName string) ([]string, error)
ListStaticConsumerGroups lists the static consumer groups for a given stream
func SetMemberMappings ¶
func SetMemberMappings(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, memberMappings []MemberMapping) error
SetMemberMappings sets the custom member mappings for an elastic consumer group
Types ¶
type ConsumerGroupConsumeContext ¶
type ConsumerGroupConsumeContext interface { Stop() Done() <-chan error }
func ElasticConsume ¶
func ElasticConsume(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, memberName string, messageHandler func(msg jetstream.Msg), config jetstream.ConsumerConfig) (ConsumerGroupConsumeContext, error)
ElasticConsume is the function that will start a go routine to consume messages from the stream (when active)
func StaticConsume ¶
func StaticConsume(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, memberName string, messageHandler func(msg jetstream.Msg), config jetstream.ConsumerConfig) (ConsumerGroupConsumeContext, error)
StaticConsume is the function that will start a go routine to consume messages from the stream (when active)
type ConsumerGroupMsg ¶
type ConsumerGroupMsg struct {
// contains filtered or unexported fields
}
func (*ConsumerGroupMsg) Ack ¶
func (scgMsg *ConsumerGroupMsg) Ack() error
Ack acknowledges a message This tells the server that the message was successfully processed, and it can move on to the next message
func (*ConsumerGroupMsg) Data ¶
func (scgMsg *ConsumerGroupMsg) Data() []byte
Data returns the message body
func (*ConsumerGroupMsg) DoubleAck ¶
func (scgMsg *ConsumerGroupMsg) DoubleAck(ctx context.Context) error
DoubleAck acknowledges a message and waits for ack from server
func (*ConsumerGroupMsg) Headers ¶
func (scgMsg *ConsumerGroupMsg) Headers() nats.Header
Headers returns a map of headers for a message
func (*ConsumerGroupMsg) InProgress ¶
func (scgMsg *ConsumerGroupMsg) InProgress() error
InProgress tells the server that this message is being worked on It resets the redelivery timer on the server
func (*ConsumerGroupMsg) Metadata ¶
func (scgMsg *ConsumerGroupMsg) Metadata() (*jetstream.MsgMetadata, error)
func (*ConsumerGroupMsg) Nak ¶
func (scgMsg *ConsumerGroupMsg) Nak() error
Nak negatively acknowledges a message This tells the server to redeliver the message
func (*ConsumerGroupMsg) NakWithDelay ¶
func (scgMsg *ConsumerGroupMsg) NakWithDelay(delay time.Duration) error
NakWithDelay negatively acknowledges a message This tells the server to redeliver the message after the given `delay` duration
func (*ConsumerGroupMsg) Reply ¶
func (scgMsg *ConsumerGroupMsg) Reply() string
Reply returns a reply subject for a message
func (*ConsumerGroupMsg) Subject ¶
func (scgMsg *ConsumerGroupMsg) Subject() string
Subject returns a subject on which a message is published
func (*ConsumerGroupMsg) Term ¶
func (scgMsg *ConsumerGroupMsg) Term() error
Term tells the server to not redeliver this message, regardless of the value of nats.MaxDeliver
func (*ConsumerGroupMsg) TermWithReason ¶
func (scgMsg *ConsumerGroupMsg) TermWithReason(reason string) error
type ElasticConsumerGroupConfig ¶
type ElasticConsumerGroupConfig struct { MaxMembers uint `json:"max_members"` // The maximum number of members the consumer group can have, i.e. the number of partitions Filter string `json:"filter"` // The filter, used to both filter the message and partition them, must include at least one "*" wildcard PartitioningWildcards []int `json:"partitioning_wildcards"` // The indexes of the wildcards in the filter that will be used for partitioning. For example, if the subject has the pattern `"foo.<key>", then the filter is "foo.*" and the partitioning wildcard is 1. MaxBufferedMsgs int64 `json:"max_buffered_msg,omitempty"` // The max number of messages buffered in the consumer group's stream MaxBufferedBytes int64 `json:"max_buffered_bytes,omitempty"` // The max number of bytes buffered in the consumer group's stream Members []string `json:"members,omitempty"` // The list of members in the consumer group MemberMappings []MemberMapping `json:"member_mappings,omitempty"` // Or the member mappings, which is a list of member names and the partitions that are assigned to them }
ElasticConsumerGroupConfig is the configuration of an elastic consumer group
func CreateElastic ¶
func CreateElastic(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, maxNumMembers uint, filter string, partitioningWildcards []int, maxBufferedMessages int64, maxBufferedBytes int64) (*ElasticConsumerGroupConfig, error)
CreateElastic creates an elastic consumer group Creates the sourcing work queue stream that is going to be used by the members to actually consume messages
func GetElasticConsumerGroupConfig ¶
func GetElasticConsumerGroupConfig(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string) (*ElasticConsumerGroupConfig, error)
GetElasticConsumerGroupConfig gets the consumer group's config from the KV bucket
func (*ElasticConsumerGroupConfig) IsInMembership ¶
func (config *ElasticConsumerGroupConfig) IsInMembership(name string) bool
IsInMembership returns true if the member name is in the current membership of the elastic consumer group
type ElasticConsumerGroupConsumerInstance ¶
type ElasticConsumerGroupConsumerInstance struct { StreamName string ConsumerGroupName string MemberName string Config *ElasticConsumerGroupConfig MessageHandlerCB func(msg jetstream.Msg) // contains filtered or unexported fields }
func (*ElasticConsumerGroupConsumerInstance) Done ¶
func (instance *ElasticConsumerGroupConsumerInstance) Done() <-chan error
Done returns the error (if any) when the consumer group instance is done or an unrecoverable error occurs
func (*ElasticConsumerGroupConsumerInstance) Stop ¶
func (instance *ElasticConsumerGroupConsumerInstance) Stop()
Stop stops the consumer instance
type MemberMapping ¶
type StaticConsumerGroupConfig ¶
type StaticConsumerGroupConfig struct { MaxMembers uint `json:"max_members"` // The maximum number of members the consumer group can have, i.e. the number of partitions Filter string `json:"filter"` // Optional filter Members []string `json:"members,omitempty"` // The list of members in the consumer group (automatically mapped to partitions) MemberMappings []MemberMapping `json:"member_mappings,omitempty"` // Or the member mappings, which is a list of member names and the partitions that are assigned to them }
StaticConsumerGroupConfig is the configuration for a static consumer group
func CreateStatic ¶
func CreateStatic(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string, maxNumMembers uint, filter string, members []string, memberMappings []MemberMapping) (*StaticConsumerGroupConfig, error)
CreateStatic creates a static consumer group
func GetStaticConsumerGroupConfig ¶
func GetStaticConsumerGroupConfig(ctx context.Context, js jetstream.JetStream, streamName string, consumerGroupName string) (*StaticConsumerGroupConfig, error)
GetStaticConsumerGroupConfig gets the static consumer group's config from the KV bucket
func (*StaticConsumerGroupConfig) IsInMembership ¶
func (config *StaticConsumerGroupConfig) IsInMembership(name string) bool
IsInMembership returns true if the member is in the current membership of the static consumer group
type StaticConsumerGroupConsumerInstance ¶
type StaticConsumerGroupConsumerInstance struct { StreamName string ConsumerGroupName string MemberName string Config *StaticConsumerGroupConfig MessageHandlerCB func(msg jetstream.Msg) // contains filtered or unexported fields }
func (*StaticConsumerGroupConsumerInstance) Done ¶
func (instance *StaticConsumerGroupConsumerInstance) Done() <-chan error
Done returns the error (if any) when the consumer group instance is done or an unrecoverable error occurs
func (*StaticConsumerGroupConsumerInstance) Stop ¶
func (instance *StaticConsumerGroupConsumerInstance) Stop()
Stop stops the consumer group instance