Class LocalStreamsExecutionEnvironment
- java.lang.Object
-
- io.streamthoughts.azkarra.runtime.env.LocalStreamsExecutionEnvironment
-
- All Implemented Interfaces:
AzkarraContextAware
,HasName
,StreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
public class LocalStreamsExecutionEnvironment extends Object implements StreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>, AzkarraContextAware
AStreamsExecutionEnvironment
implementation that runs and managesKafkaStreams
instance locally.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface io.streamthoughts.azkarra.api.StreamsExecutionEnvironment
StreamsExecutionEnvironment.View
-
-
Field Summary
Fields Modifier and Type Field Description static String
STREAMS_CONFIG_PREFIX
static String
TYPE
-
Fields inherited from interface io.streamthoughts.azkarra.api.StreamsExecutionEnvironment
ENVIRONMENT_DEFAULT_NAME
-
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description LocalStreamsExecutionEnvironment
addConfiguration(Supplier<Conf> configuration)
Adds a new configuration to this environment.LocalStreamsExecutionEnvironment
addFallbackConfiguration(Conf fallback)
Adds settings to this environment that will be used in fallback if not present in the defined environment configuration.LocalStreamsExecutionEnvironment
addGlobalStateListener(org.apache.kafka.streams.processor.StateRestoreListener listener)
Adds aStateRestoreListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.LocalStreamsExecutionEnvironment
addStateListener(org.apache.kafka.streams.KafkaStreams.StateListener listener)
Adds aKafkaStreams.StateListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.LocalStreamsExecutionEnvironment
addStreamsConfiguration(Supplier<Conf> configuration)
Helper method to add a configuration prefixed with 'streams.'.LocalStreamsExecutionEnvironment
addStreamsLifecycleInterceptor(Supplier<StreamsLifecycleInterceptor> interceptor)
Adds a streams interceptor that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.Optional<ApplicationId>
addTopology(Supplier<TopologyProvider> supplier)
Adds a newTopologyProvider
instance to thisStreamsExecutionEnvironment
.Optional<ApplicationId>
addTopology(Supplier<TopologyProvider> supplier, Executed executed)
Adds a newTopologyProvider
instance to thisStreamsExecutionEnvironment
.static LocalStreamsExecutionEnvironment
create()
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance using the empty configuration and a generated unique name.static LocalStreamsExecutionEnvironment
create(Conf settings)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and using a generated env name.static LocalStreamsExecutionEnvironment
create(String name)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specified env name and using the configuration.static LocalStreamsExecutionEnvironment
create(String name, Conf settings)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and env name.Optional<KafkaStreamsApplication>
getApplicationById(ApplicationId id)
Gets theKafkaStreamsApplication
for the specifiedapplication.id
Supplier<ApplicationIdBuilder>
getApplicationIdBuilder()
Gets theApplicationIdBuilder
.Set<ApplicationId>
getApplicationIds()
Returns all {ApplicationId
for active Kafka Streams applications.Conf
getConfiguration()
Gets this environment configuration.Set<ContainerId>
getContainerIds()
Returns allContainerId
for active Kafka Streams applications.Collection<KafkaStreamsContainer>
getContainers()
Returns all containers for active Kafka Streams applications.Supplier<StreamThreadExceptionHandler>
getStreamThreadExceptionHandler()
Gets theStreamThreadExceptionHandler
.boolean
isDefault()
Check whether thisStreamsExecutionEnvironment
is marked as default.LocalStreamsExecutionEnvironment
isDefault(boolean isDefault)
Sets whether this execution environment should be the default.String
name()
Gets the name of thisStreamsExecutionEnvironment
.StreamsTopologyExecution
newTopologyExecution(StreamsTopologyMeta meta, Executed executed)
Creates a newStreamsTopologyExecution
to be applied on thisStreamsExecutionEnvironment
.LocalStreamsExecutionEnvironment
registerTopology(Supplier<TopologyProvider> supplier)
Registers a newTopologyProvider
instance to thisStreamsExecutionEnvironment
.LocalStreamsExecutionEnvironment
registerTopology(Supplier<TopologyProvider> supplier, Executed executed)
Registers a newTopologyProvider
instance to thisStreamsExecutionEnvironment
.LocalStreamsExecutionEnvironment
registerTopology(org.apache.kafka.streams.Topology topology, Version version, Executed executed)
Registers a newTopologyProvider
instance to thisStreamsExecutionEnvironment
.LocalStreamsExecutionEnvironment
setApplicationIdBuilder(Supplier<ApplicationIdBuilder> supplier)
Sets theApplicationIdBuilder
that should be used for building streamsapplication.id
.void
setAzkarraContext(AzkarraContext context)
Set theAzkarraContext
that this object runs in.LocalStreamsExecutionEnvironment
setKafkaStreamsFactory(Supplier<KafkaStreamsFactory> factory)
Sets theKafkaStreamsFactory
that will be used to provide theKafkaStreams
to configure and start.LocalStreamsExecutionEnvironment
setStreamThreadExceptionHandler(Supplier<StreamThreadExceptionHandler> handler)
Sets theStreamThreadExceptionHandler
invoked when a StreamThread abruptly terminates due to an uncaught exception.void
start()
Starts thisStreamsExecutionEnvironment
instance.State
state()
Gets the state of thisStreamsExecutionEnvironment
.void
stop(boolean cleanUp)
Stops thisStreamsExecutionEnvironment
instance and all runningKafkaStreams
instance.void
stop(ApplicationId id, boolean cleanUp, Duration timeout)
Stops all the streams instance for the specified application id.void
stop(ContainerId id, boolean cleanUp, Duration timeout)
Stops the streams container for the specifiedapplication.id
.void
terminate(ApplicationId id, Duration timeout)
Terminates all streams container for the specifiedapplication.id
and remove the associated topology from this environment.void
terminate(ContainerId id, Duration timeout)
Terminates theKafkaStreams
instance for the specifiedcontainer.id
and remove the associated topology from this environment.String
type()
Gets the type of thisStreamsExecutionEnvironment
.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface io.streamthoughts.azkarra.api.StreamsExecutionEnvironment
addConfiguration, describe, getContainerById, stop, stop, stop, terminate, terminate
-
-
-
-
Field Detail
-
TYPE
public static final String TYPE
- See Also:
- Constant Field Values
-
STREAMS_CONFIG_PREFIX
public static final String STREAMS_CONFIG_PREFIX
- See Also:
- Constant Field Values
-
-
Method Detail
-
create
public static LocalStreamsExecutionEnvironment create()
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance using the empty configuration and a generated unique name.- Returns:
- a new
LocalStreamsExecutionEnvironment
instance.
-
create
public static LocalStreamsExecutionEnvironment create(String name)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specified env name and using the configuration.- Parameters:
name
- the name to be used for identifying this environment.- Returns:
- a new
LocalStreamsExecutionEnvironment
instance.
-
create
public static LocalStreamsExecutionEnvironment create(Conf settings)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and using a generated env name.- Parameters:
settings
- theConf
instance.- Returns:
- a new
LocalStreamsExecutionEnvironment
instance.
-
create
public static LocalStreamsExecutionEnvironment create(String name, Conf settings)
Static helper that can be used to creates a newStreamsExecutionEnvironment
instance from the specifiedConf
and env name.- Parameters:
settings
- theConf
instance.name
- the name to be used for identifying this environment.- Returns:
- a new
LocalStreamsExecutionEnvironment
instance.
-
type
public String type()
Gets the type of thisStreamsExecutionEnvironment
.- Specified by:
type
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- the string type.
-
name
public String name()
Gets the name of thisStreamsExecutionEnvironment
.- Specified by:
name
in interfaceHasName
- Specified by:
name
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- the string name.
-
state
public State state()
Gets the state of thisStreamsExecutionEnvironment
.- Specified by:
state
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- the
State
.
-
isDefault
public boolean isDefault()
Check whether thisStreamsExecutionEnvironment
is marked as default.- Specified by:
isDefault
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
true
if thisStreamsExecutionEnvironment
is marked as default.
-
isDefault
public LocalStreamsExecutionEnvironment isDefault(boolean isDefault)
Sets whether this execution environment should be the default.- Specified by:
isDefault
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
isDefault
-true
to set this environment as default.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
newTopologyExecution
public StreamsTopologyExecution newTopologyExecution(StreamsTopologyMeta meta, Executed executed)
Creates a newStreamsTopologyExecution
to be applied on thisStreamsExecutionEnvironment
.- Specified by:
newTopologyExecution
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
meta
- theStreamsTopologyMeta
to executed.executed
- the execution options.- Returns:
- the new
StreamsTopologyExecution
instance.
-
addStateListener
public LocalStreamsExecutionEnvironment addStateListener(org.apache.kafka.streams.KafkaStreams.StateListener listener)
Adds aKafkaStreams.StateListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.- Parameters:
listener
- theKafkaStreams.StateListener
instance.- Returns:
- this
StreamsExecutionEnvironment
instance. - Throws:
IllegalStateException
- if thisStreamsExecutionEnvironment
instance is started.
-
addGlobalStateListener
public LocalStreamsExecutionEnvironment addGlobalStateListener(org.apache.kafka.streams.processor.StateRestoreListener listener)
Adds aStateRestoreListener
instance that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
.- Parameters:
listener
- theStateRestoreListener
instance.- Returns:
- this
StreamsExecutionEnvironment
instance. - Throws:
IllegalStateException
- if thisStreamsExecutionEnvironment
instance is started.- See Also:
.
-
addStreamsLifecycleInterceptor
public LocalStreamsExecutionEnvironment addStreamsLifecycleInterceptor(Supplier<StreamsLifecycleInterceptor> interceptor)
Adds a streams interceptor that will set to allKafkaStreams
instance created in thisStreamsExecutionEnvironment
. The interceptors will be executed in the order in which they were added.- Parameters:
interceptor
- the {@link {@link StreamsLifecycleInterceptor}}.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setStreamThreadExceptionHandler
public LocalStreamsExecutionEnvironment setStreamThreadExceptionHandler(Supplier<StreamThreadExceptionHandler> handler)
Sets theStreamThreadExceptionHandler
invoked when a StreamThread abruptly terminates due to an uncaught exception.- Parameters:
handler
- theStreamThreadExceptionHandler
.- Returns:
- this
StreamsExecutionEnvironment
instance. - See Also:
KafkaStreams.setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)
-
getStreamThreadExceptionHandler
public Supplier<StreamThreadExceptionHandler> getStreamThreadExceptionHandler()
Gets theStreamThreadExceptionHandler
.- Returns:
- the
Supplier
, otherwisenull
if no handler is set.
-
getContainers
public Collection<KafkaStreamsContainer> getContainers()
Returns all containers for active Kafka Streams applications.- Specified by:
getContainers
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- a collection of
KafkaStreamsContainer
applications.
-
getContainerIds
public Set<ContainerId> getContainerIds()
Returns allContainerId
for active Kafka Streams applications.- Specified by:
getContainerIds
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- the set of
ContainerId
.
-
getApplicationIds
public Set<ApplicationId> getApplicationIds()
Returns all {ApplicationId
for active Kafka Streams applications.- Specified by:
getApplicationIds
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- the set of
ApplicationId
.
-
getApplicationById
public Optional<KafkaStreamsApplication> getApplicationById(ApplicationId id)
Gets theKafkaStreamsApplication
for the specifiedapplication.id
- Specified by:
getApplicationById
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
id
- theapplication.id
.- Returns:
- the
KafkaStreamsApplication
instance.
-
addConfiguration
public LocalStreamsExecutionEnvironment addConfiguration(Supplier<Conf> configuration)
Adds a new configuration to this environment. This method can be invoked multiple time. The supplied configuration will override all prior configurations.- Specified by:
addConfiguration
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- this
StreamsExecutionEnvironment
instance.
-
getConfiguration
public Conf getConfiguration()
Gets this environment configuration.- Specified by:
getConfiguration
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- the
Conf
instance.
-
addStreamsConfiguration
public LocalStreamsExecutionEnvironment addStreamsConfiguration(Supplier<Conf> configuration)
Helper method to add a configuration prefixed with 'streams.'.- Parameters:
configuration
- theConf
to supply.- Returns:
this
.
-
setApplicationIdBuilder
public LocalStreamsExecutionEnvironment setApplicationIdBuilder(Supplier<ApplicationIdBuilder> supplier)
Sets theApplicationIdBuilder
that should be used for building streamsapplication.id
.- Specified by:
setApplicationIdBuilder
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
supplier
- theApplicationIdBuilder
instance supplier.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
getApplicationIdBuilder
public Supplier<ApplicationIdBuilder> getApplicationIdBuilder()
Gets theApplicationIdBuilder
.- Specified by:
getApplicationIdBuilder
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Returns:
- this
ApplicationIdBuilder
instance ornull
.
-
registerTopology
public LocalStreamsExecutionEnvironment registerTopology(Supplier<TopologyProvider> supplier)
Registers a newTopologyProvider
instance to thisStreamsExecutionEnvironment
. A newKafkaStreams
instance will be created and started for this topology when the environment will start.If the
LocalStreamsExecutionEnvironment
is already started, then a newKafkaStreams
instance is immediately created.- Parameters:
supplier
- theTopologyProvider
supplier.- Returns:
this
.- See Also:
addTopology(Supplier)
-
registerTopology
public LocalStreamsExecutionEnvironment registerTopology(org.apache.kafka.streams.Topology topology, Version version, Executed executed)
Registers a newTopologyProvider
instance to thisStreamsExecutionEnvironment
. A newKafkaStreams
instance will be created and started for this topology when the environment will start.If the
LocalStreamsExecutionEnvironment
is already started, then a newKafkaStreams
instance is immediately created.- Parameters:
topology
- theTopology
.version
- the topology'sVersion
.executed
- the topology's execution options.- Returns:
this
.- See Also:
addTopology(Supplier, Executed)
-
registerTopology
public LocalStreamsExecutionEnvironment registerTopology(Supplier<TopologyProvider> supplier, Executed executed)
Registers a newTopologyProvider
instance to thisStreamsExecutionEnvironment
. A newKafkaStreams
instance will be created and started for this topology when the environment will start.If the
LocalStreamsExecutionEnvironment
is already started, then a newKafkaStreams
instance is immediately created.- Parameters:
supplier
- theTopologyProvider
supplier.executed
- theExecuted
instance.- Returns:
this
.- See Also:
addTopology(Supplier, Executed)
-
addTopology
public Optional<ApplicationId> addTopology(Supplier<TopologyProvider> supplier)
Adds a newTopologyProvider
instance to thisStreamsExecutionEnvironment
. A newKafkaStreams
instance will be created and started for this topology when the environment will start.If the
LocalStreamsExecutionEnvironment
is already started, then a newKafkaStreams
instance is immediately created.- Parameters:
supplier
- theTopologyProvider
supplier.- Returns:
- the
ApplicationId
instance if the environment is already started, otherwiseOptional.empty()
.
-
addTopology
public Optional<ApplicationId> addTopology(Supplier<TopologyProvider> supplier, Executed executed)
Adds a newTopologyProvider
instance to thisStreamsExecutionEnvironment
. A newKafkaStreams
instance will be created and started for this topology when the environment will start.If the
LocalStreamsExecutionEnvironment
is already started, then a newKafkaStreams
instance is immediately created.- Parameters:
supplier
- theTopologyProvider
supplier.executed
- theExecuted
instance.- Returns:
- the
ApplicationId
instance if the environment is already started, otherwiseOptional.empty()
.
-
start
public void start() throws IllegalStateException, AzkarraException
Starts thisStreamsExecutionEnvironment
instance.- Specified by:
start
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Throws:
IllegalStateException
AzkarraException
-
stop
public void stop(boolean cleanUp)
Stops thisStreamsExecutionEnvironment
instance and all runningKafkaStreams
instance.- Specified by:
stop
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
cleanUp
- if local states of eachKafkaStreams
instance must be cleanup.- See Also:
.
-
stop
public void stop(ApplicationId id, boolean cleanUp, Duration timeout)
Stops all the streams instance for the specified application id.- Specified by:
stop
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
id
- theApplicationId
of the streams instance.cleanUp
- if local states of eachKafkaStreams
instance must be cleanup.timeout
- the duration to wait for the streams to shutdown.- See Also:
.
-
stop
public void stop(ContainerId id, boolean cleanUp, Duration timeout)
Stops the streams container for the specifiedapplication.id
.- Specified by:
stop
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
id
- theApplicationId
of the streams instance.cleanUp
- if local states of eachKafkaStreams
instance must be cleanup.timeout
- the duration to wait for the streams to shutdown.- See Also:
.
-
terminate
public void terminate(ContainerId id, Duration timeout)
Terminates theKafkaStreams
instance for the specifiedcontainer.id
and remove the associated topology from this environment.A clean up of the local state directory is performed when the instance is terminated.
- Specified by:
terminate
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
id
- theContainerId
of the streams instance.timeout
- the duration to wait for the streams to shutdown.
-
terminate
public void terminate(ApplicationId id, Duration timeout)
Terminates all streams container for the specifiedapplication.id
and remove the associated topology from this environment.A clean up of the local state directory of each container is performed when an application is terminated.
- Specified by:
terminate
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
id
- theApplicationId
of the streams instance.timeout
- the duration to wait for the streams to shutdown.
-
addFallbackConfiguration
public LocalStreamsExecutionEnvironment addFallbackConfiguration(Conf fallback)
Adds settings to this environment that will be used in fallback if not present in the defined environment configuration.- Specified by:
addFallbackConfiguration
in interfaceStreamsExecutionEnvironment<LocalStreamsExecutionEnvironment>
- Parameters:
fallback
- theConf
instance.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setKafkaStreamsFactory
public LocalStreamsExecutionEnvironment setKafkaStreamsFactory(Supplier<KafkaStreamsFactory> factory)
Sets theKafkaStreamsFactory
that will be used to provide theKafkaStreams
to configure and start.- Parameters:
factory
- theKafkaStreamsFactory
instance.- Returns:
- this
StreamsExecutionEnvironment
instance.
-
setAzkarraContext
public void setAzkarraContext(AzkarraContext context)
Set theAzkarraContext
that this object runs in.- Specified by:
setAzkarraContext
in interfaceAzkarraContextAware
- Parameters:
context
- theAzkarraContext
instance.
-
-