interpreter

package
v1.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2025 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const MaxOfAllVersions = SyncUpdateRPCUseLocalActivity
View Source
const StartingVersionContinueAsNewOnNoStates = 4

StartingVersionContinueAsNewOnNoStates Fix ContinueAsNew bug

View Source
const StartingVersionExecutingStateIdMode = 6

StartingVersionExecutingStateIdMode Changed default rule of upserting SAs

View Source
const StartingVersionNoIwfGlobalVersionSearchAttribute = 7

StartingVersionNoIwfGlobalVersionSearchAttribute Removed upserting IwfGlobalWorkflowVersion SA

View Source
const StartingVersionOptimizedUpsertSearchAttribute = 2

StartingVersionOptimizedUpsertSearchAttribute Optimized upserting SAs

View Source
const StartingVersionRenamedStateApi = 3

StartingVersionRenamedStateApi Renamed state API see: https://github.com/indeedeng/iwf/pull/242/files

View Source
const StartingVersionTemporal26SDK = 5

StartingVersionTemporal26SDK Upgraded Temporal SDK version which brought changes to update handler see: https://github.com/indeedeng/iwf/releases/tag/v1.11.0

View Source
const StartingVersionUsingGlobalVersioning = 1

StartingVersionUsingGlobalVersioning First global version

View Source
const StartingVersionYieldOnConditionalComplete = 8

StartingVersionYieldOnConditionalComplete Bug fix to where published messages could be lost

View Source
const SyncUpdateRPCUseLocalActivity = 9

SyncUpdateRPCUseLocalActivity Always use local activities for sync update based RPC

Variables

This section is empty.

Functions

func DeterministicKeys added in v1.15.0

func DeterministicKeys[K constraints.Ordered, V any](m map[K]V) []K

DeterministicKeys returns the keys of a map in deterministic (sorted) order. To be used in for loops in workflows for deterministic iteration.

func DrainReceivedButUnprocessedInternalChannelsFromStateApis added in v1.14.0

func DrainReceivedButUnprocessedInternalChannelsFromStateApis(
	ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, versioner *GlobalVersioner,
) error

func DumpWorkflowInternal added in v1.5.0

func DumpWorkflowInternal(
	ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest,
) (*iwfidl.WorkflowDumpResponse, error)

func InvokeWorkerRpc added in v1.8.0

func IsDeciderTriggerConditionMet added in v1.5.0

func IsDeciderTriggerConditionMet(
	commandReq iwfidl.CommandRequest,
	completedTimerCmds map[int]service.InternalTimerStatus,
	completedSignalCmds map[int]*iwfidl.EncodedObject,
	completedInterStateChannelCmds map[int]*iwfidl.EncodedObject,
) bool

func LastCaller added in v1.5.0

func LastCaller() string

func LoadInternalsFromPreviousRun added in v1.5.0

func LoadInternalsFromPreviousRun(
	ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32,
) (*service.ContinueAsNewDumpResponse, error)

func SetQueryHandlers added in v1.5.0

func SetQueryHandlers(
	ctx interfaces.UnifiedContext,
	provider interfaces.WorkflowProvider,
	timerProcessor interfaces.TimerProcessor,
	persistenceManager *PersistenceManager,
	internalChannel *InternalChannel,
	signalReceiver *SignalReceiver,
	continueAsNewer *ContinueAsNewer,
	workflowConfiger *config.WorkflowConfiger,
	basicInfo service.BasicInfo,
) error

func StateApiExecute added in v1.5.0

func StateApiExecute(
	ctx context.Context,
	backendType service.BackendType,
	input service.StateDecideActivityInput,
	searchAttributes []iwfidl.SearchAttribute,
) (*iwfidl.WorkflowStateDecideResponse, error)

func StateApiWaitUntil added in v1.5.0

func StateApiWaitUntil(
	ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, searchAttributes []iwfidl.SearchAttribute,
) (*iwfidl.WorkflowStateStartResponse, error)

func StateDecide

StateDecide is deprecated. Will be removed in next release

func StateStart

func StateStart(
	ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, searchAttributes []iwfidl.SearchAttribute,
) (*iwfidl.WorkflowStateStartResponse, error)

StateStart is Deprecated, will be removed in next release

func WaitForStateCompletionWorkflowImpl added in v1.8.0

func WaitForStateCompletionWorkflowImpl(
	ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider,
) (*service.WaitForStateCompletionWorkflowOutput, error)

Types

type ContinueAsNewer added in v1.3.0

type ContinueAsNewer struct {
	StateExecutionToResumeMap map[string]service.StateExecutionResumeInfo // stateExeId to StateExecutionResumeInfo
	// contains filtered or unexported fields
}

func NewContinueAsNewer added in v1.3.0

func NewContinueAsNewer(
	provider interfaces.WorkflowProvider,
	interStateChannel *InternalChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
	persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector,
	timerProcessor interfaces.TimerProcessor,
) *ContinueAsNewer

func (*ContinueAsNewer) AddPotentialStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) AddPotentialStateExecutionToResume(
	stateExecutionId string, state iwfidl.StateMovement, stateExecLocals []iwfidl.KeyValue,
	commandRequest iwfidl.CommandRequest,
	completedTimerCommands map[int]service.InternalTimerStatus,
	completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject,
)

func (*ContinueAsNewer) DecreaseInflightOperation added in v1.8.0

func (c *ContinueAsNewer) DecreaseInflightOperation()

func (*ContinueAsNewer) DrainThreads added in v1.5.0

func (c *ContinueAsNewer) DrainThreads(ctx interfaces.UnifiedContext) error

func (*ContinueAsNewer) GetSnapshot added in v1.15.0

func (*ContinueAsNewer) HasAnyStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) HasAnyStateExecutionToResume() bool

func (*ContinueAsNewer) IncreaseInflightOperation added in v1.8.0

func (c *ContinueAsNewer) IncreaseInflightOperation()

func (*ContinueAsNewer) RemoveStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) RemoveStateExecutionToResume(stateExecutionId string)

func (*ContinueAsNewer) SetQueryHandlersForContinueAsNew added in v1.3.0

func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx interfaces.UnifiedContext) error

type GlobalVersioner added in v1.5.0

type GlobalVersioner struct {
	// contains filtered or unexported fields
}

GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api

func NewGlobalVersioner added in v1.5.0

func NewGlobalVersioner(
	workflowProvider interfaces.WorkflowProvider, ctx interfaces.UnifiedContext,
) (*GlobalVersioner, error)

func (*GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates added in v1.8.0

func (p *GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates() bool

func (*GlobalVersioner) IsAfterVersionOfExecutingStateIdMode added in v1.11.1

func (p *GlobalVersioner) IsAfterVersionOfExecutingStateIdMode() bool

func (*GlobalVersioner) IsAfterVersionOfNoIwfGlobalVersionSearchAttribute added in v1.14.0

func (p *GlobalVersioner) IsAfterVersionOfNoIwfGlobalVersionSearchAttribute() bool

func (*GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool

func (*GlobalVersioner) IsAfterVersionOfRenamedStateApi added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool

func (*GlobalVersioner) IsAfterVersionOfSyncUpdateRPCUseLocalActivity added in v1.17.0

func (p *GlobalVersioner) IsAfterVersionOfSyncUpdateRPCUseLocalActivity() bool

func (*GlobalVersioner) IsAfterVersionOfTemporal26SDK added in v1.11.0

func (p *GlobalVersioner) IsAfterVersionOfTemporal26SDK() bool

func (*GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning() bool

func (*GlobalVersioner) IsAfterVersionOfYieldOnConditionalComplete added in v1.14.0

func (p *GlobalVersioner) IsAfterVersionOfYieldOnConditionalComplete() bool

func (*GlobalVersioner) IsUsingGlobalVersionSearchAttribute added in v1.14.0

func (p *GlobalVersioner) IsUsingGlobalVersionSearchAttribute() bool

func (*GlobalVersioner) UpsertGlobalVersionSearchAttribute added in v1.5.0

func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error

type InternalChannel added in v1.15.0

type InternalChannel struct {
	// contains filtered or unexported fields
}

func NewInternalChannel added in v1.15.0

func NewInternalChannel() *InternalChannel

func RebuildInternalChannel added in v1.15.0

func RebuildInternalChannel(refill map[string][]*iwfidl.EncodedObject) *InternalChannel

func (*InternalChannel) GetAllReceived added in v1.15.0

func (i *InternalChannel) GetAllReceived() map[string][]*iwfidl.EncodedObject

func (*InternalChannel) GetInfos added in v1.15.0

func (i *InternalChannel) GetInfos() map[string]iwfidl.ChannelInfo

func (*InternalChannel) HasData added in v1.15.0

func (i *InternalChannel) HasData(channelName string) bool

func (*InternalChannel) ProcessPublishing added in v1.15.0

func (i *InternalChannel) ProcessPublishing(publishes []iwfidl.InterStateChannelPublishing)

func (*InternalChannel) Retrieve added in v1.15.0

func (i *InternalChannel) Retrieve(channelName string) *iwfidl.EncodedObject

type OutputCollector added in v1.5.0

type OutputCollector struct {
	// contains filtered or unexported fields
}

func NewOutputCollector added in v1.5.0

func NewOutputCollector(initOutputs []iwfidl.StateCompletionOutput) *OutputCollector

func (*OutputCollector) Add added in v1.5.0

func (*OutputCollector) GetAll added in v1.5.0

type PersistenceManager

type PersistenceManager struct {
	// contains filtered or unexported fields
}

func NewPersistenceManager

func NewPersistenceManager(
	provider interfaces.WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute,
	useMemo bool,
) *PersistenceManager

func RebuildPersistenceManager added in v1.2.0

func RebuildPersistenceManager(
	provider interfaces.WorkflowProvider,
	dolist []iwfidl.KeyValue, salist []iwfidl.SearchAttribute,
	useMemo bool,
) *PersistenceManager

func (*PersistenceManager) CheckDataAndSearchAttributesKeysAreUnlocked added in v1.8.0

func (am *PersistenceManager) CheckDataAndSearchAttributesKeysAreUnlocked(dataAttrKeysToCheck, searchAttrKeysToCheck []string) bool

func (*PersistenceManager) GetAllDataAttributes added in v1.17.0

func (am *PersistenceManager) GetAllDataAttributes() []iwfidl.KeyValue

func (*PersistenceManager) GetAllSearchAttributes

func (am *PersistenceManager) GetAllSearchAttributes() []iwfidl.SearchAttribute

func (*PersistenceManager) GetDataAttributesByKey added in v1.17.0

func (*PersistenceManager) LoadDataAttributes added in v1.17.0

func (am *PersistenceManager) LoadDataAttributes(
	ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy,
) []iwfidl.KeyValue

func (*PersistenceManager) LoadSearchAttributes

func (am *PersistenceManager) LoadSearchAttributes(
	ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy,
) []iwfidl.SearchAttribute

func (*PersistenceManager) ProcessUpsertDataAttribute added in v1.17.0

func (am *PersistenceManager) ProcessUpsertDataAttribute(ctx interfaces.UnifiedContext, attributes []iwfidl.KeyValue) error

func (*PersistenceManager) ProcessUpsertSearchAttribute

func (am *PersistenceManager) ProcessUpsertSearchAttribute(
	ctx interfaces.UnifiedContext, attributes []iwfidl.SearchAttribute,
) error

func (*PersistenceManager) UnlockPersistence added in v1.5.1

func (am *PersistenceManager) UnlockPersistence(
	saPolicy *iwfidl.PersistenceLoadingPolicy, daPolicy *iwfidl.PersistenceLoadingPolicy,
)

type SignalReceiver added in v1.3.0

type SignalReceiver struct {
	// contains filtered or unexported fields
}

func NewSignalReceiver added in v1.3.0

func NewSignalReceiver(
	ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, interStateChannel *InternalChannel,
	stateRequestQueue *StateRequestQueue,
	persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *cont.ContinueAsNewCounter,
	workflowConfiger *config.WorkflowConfiger,
	initReceivedSignals map[string][]*iwfidl.EncodedObject,
) *SignalReceiver

func (*SignalReceiver) DrainAllReceivedButUnprocessedSignals added in v1.14.0

func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx interfaces.UnifiedContext)

DrainAllReceivedButUnprocessedSignals will process all the signals that are received but not processed in the current workflow task. There are two cases this is needed: 1. ContinueAsNew: retrieve signals that after signal handler threads are stopped, so that the signals can be carried over to next run by continueAsNew. This includes both regular user signals and system signals 2. Conditional close/complete workflow on signal/internal channel: retrieve all signal/internal channel messages before checking the signal/internal channels

func (*SignalReceiver) GetAllReceived added in v1.15.0

func (sr *SignalReceiver) GetAllReceived() map[string][]*iwfidl.EncodedObject

func (*SignalReceiver) GetInfos added in v1.15.0

func (sr *SignalReceiver) GetInfos() map[string]iwfidl.ChannelInfo

func (*SignalReceiver) HasSignal added in v1.3.0

func (sr *SignalReceiver) HasSignal(channelName string) bool

func (*SignalReceiver) IsFailWorkflowRequested added in v1.5.0

func (sr *SignalReceiver) IsFailWorkflowRequested() (bool, error)

func (*SignalReceiver) Retrieve added in v1.3.0

func (sr *SignalReceiver) Retrieve(channelName string) *iwfidl.EncodedObject

type StateExecutionCounter added in v1.3.0

type StateExecutionCounter struct {
	// contains filtered or unexported fields
}

func NewStateExecutionCounter added in v1.3.0

func NewStateExecutionCounter(
	ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner,
	configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter,
) *StateExecutionCounter

func RebuildStateExecutionCounter added in v1.5.0

func RebuildStateExecutionCounter(
	ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner,
	stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int,
	totalCurrentlyExecutingCount int,
	configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter,
) *StateExecutionCounter

func (*StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally added in v1.5.0

func (e *StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally()

ClearExecutingStateIdsSearchAttributeFinally should only be called at the end of workflow

func (*StateExecutionCounter) CreateNextExecutionId added in v1.3.0

func (e *StateExecutionCounter) CreateNextExecutionId(stateId string) string

func (*StateExecutionCounter) Dump added in v1.3.0

func (*StateExecutionCounter) GetTotalCurrentlyExecutingCount added in v1.5.0

func (e *StateExecutionCounter) GetTotalCurrentlyExecutingCount() int

func (*StateExecutionCounter) MarkStateExecutionCompleted added in v1.3.0

func (e *StateExecutionCounter) MarkStateExecutionCompleted(currentState iwfidl.StateMovement, nextStates []iwfidl.StateMovement) error

func (*StateExecutionCounter) MarkStateIdExecutingIfNotYet added in v1.5.0

func (e *StateExecutionCounter) MarkStateIdExecutingIfNotYet(stateReqs []StateRequest) error

type StateRequest added in v1.5.0

type StateRequest struct {
	// contains filtered or unexported fields
}

func NewStateResumeRequest added in v1.5.0

func NewStateResumeRequest(resumeRequest service.StateExecutionResumeInfo) StateRequest

func NewStateStartRequest added in v1.5.0

func NewStateStartRequest(movement iwfidl.StateMovement) StateRequest

func (StateRequest) GetStateId added in v1.5.0

func (sq StateRequest) GetStateId() string

func (StateRequest) GetStateMovement added in v1.8.0

func (sq StateRequest) GetStateMovement() iwfidl.StateMovement

func (StateRequest) GetStateResumeRequest added in v1.5.0

func (sq StateRequest) GetStateResumeRequest() service.StateExecutionResumeInfo

func (StateRequest) GetStateStartRequest added in v1.5.0

func (sq StateRequest) GetStateStartRequest() iwfidl.StateMovement

func (StateRequest) IsResumeRequest added in v1.5.0

func (sq StateRequest) IsResumeRequest() bool

type StateRequestQueue added in v1.5.0

type StateRequestQueue struct {
	// contains filtered or unexported fields
}

func NewStateRequestQueue added in v1.5.0

func NewStateRequestQueue() *StateRequestQueue

func NewStateRequestQueueWithResumeRequests added in v1.5.0

func NewStateRequestQueueWithResumeRequests(startReqs []iwfidl.StateMovement, resumeReqs map[string]service.StateExecutionResumeInfo) *StateRequestQueue

func (*StateRequestQueue) AddSingleStateStartRequest added in v1.8.0

func (srq *StateRequestQueue) AddSingleStateStartRequest(stateId string, input *iwfidl.EncodedObject, options *iwfidl.WorkflowStateOptions)

func (*StateRequestQueue) AddStateStartRequests added in v1.5.0

func (srq *StateRequestQueue) AddStateStartRequests(reqs []iwfidl.StateMovement)

func (*StateRequestQueue) GetAllStateResumeRequests added in v1.11.0

func (srq *StateRequestQueue) GetAllStateResumeRequests() []service.StateExecutionResumeInfo

func (*StateRequestQueue) GetAllStateStartRequests added in v1.5.0

func (srq *StateRequestQueue) GetAllStateStartRequests() []iwfidl.StateMovement

func (*StateRequestQueue) IsEmpty added in v1.5.0

func (srq *StateRequestQueue) IsEmpty() bool

func (*StateRequestQueue) TakeAll added in v1.5.0

func (srq *StateRequestQueue) TakeAll() []StateRequest

type WorkflowUpdater added in v1.8.0

type WorkflowUpdater struct {
	// contains filtered or unexported fields
}

func NewWorkflowUpdater added in v1.8.0

func NewWorkflowUpdater(
	ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager,
	stateRequestQueue *StateRequestQueue,
	continueAsNewer *ContinueAsNewer, continueAsNewCounter *cont.ContinueAsNewCounter, configer *config.WorkflowConfiger,
	internalChannel *InternalChannel, signalReceiver *SignalReceiver, basicInfo service.BasicInfo,
	globalVersioner *GlobalVersioner,
) (*WorkflowUpdater, error)

Directories

Path Synopsis
Package interpreter is a generated GoMock package.
Package interpreter is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL