Class LocalKafkaStreamsContainer
- java.lang.Object
-
- io.streamthoughts.azkarra.runtime.streams.LocalKafkaStreamsContainer
-
- All Implemented Interfaces:
LocalStoreAccessProvider
,QueryableKafkaStreams
,QueryCall.QueryCallFactory
,KafkaStreamsContainer
public class LocalKafkaStreamsContainer extends Object implements KafkaStreamsContainer, LocalStoreAccessProvider, QueryableKafkaStreams
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
KafkaStreamsContainer.KafkaMetricFilter, KafkaStreamsContainer.StateChangeWatcher
-
-
Field Summary
Fields Modifier and Type Field Description protected List<StreamsLifecycleInterceptor>
interceptors
protected Conf
streamsConfig
protected KafkaStreamsFactory
streamsFactory
protected TopologyDefinition
topologyDefinition
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addStateChangeWatcher(KafkaStreamsContainer.StateChangeWatcher watcher)
Register a watcher to be notified ofKafkaStreams.State
change event.List<KafkaStreamsInstance>
allInstances()
List<LocalStatePartitionsInfo>
allLocalStorePartitionInfos()
Gets the partition restoration and lag for all local state store.String
applicationId()
Gets configuredStreamsConfig.APPLICATION_ID_CONFIG
for thisKafkaStreams
instance.boolean
checkEndpoint(Endpoint endpoint)
Checks whether the givenEndpoint
is the same as this container.void
close(boolean cleanUp, Duration timeout)
Closes thisKafkaStreams
instance and wait up to the timeout for the streams to be closed.void
close(Duration timeout)
Closes thisKafkaStreams
instance.String
containerId()
Gets the id for this container.org.apache.kafka.clients.producer.Producer<byte[],byte[]>
createNewProducer(Map<String,Object> overrides)
Creates a newProducer
instance using the same configs that the Kafka Streams instance.Optional<org.apache.kafka.common.serialization.Serde>
defaultKeySerde()
Gets the defaultSerde
configured for key.KafkaStreamsInstance
describe()
Describes the localKafkaStreams
instance.Optional<Endpoint>
endpoint()
Gets the endpoint configured for thisKafkaStreams
instance.Optional<Throwable>
exception()
Gets the last observed exception thrown theKafkaStreams
instance.Set<Endpoint>
findAllEndpointsForStore(String storeName)
<K> Optional<org.apache.kafka.streams.KeyQueryMetadata>
findMetadataForStoreAndKey(String storeName, K key, org.apache.kafka.common.serialization.Serializer<K> keySerializer)
org.apache.kafka.clients.admin.AdminClient
getAdminClient()
Gets a sharedAdminClient
instance for thisKafkaStreams
instance.<K,V>
EventStreamPublisher<K,V>getEventStreamPublisherForType(String eventType)
Gets a newEventStreamPublisher
for the given event-tye.org.apache.kafka.streams.KafkaStreams
getKafkaStreams()
Returns the wrappedKafkaStreams
instance.KafkaStreamsInstance
getLocalInstance(KafkaStreamsMetadata metadata)
org.apache.kafka.streams.Topology
getTopology()
Returns the wrappedTopology
instance.boolean
isRunning()
Checks if theKafkaStreams
is either RUNNING or REBALANCING.Set<String>
listRegisteredEventStreamTypes()
Gets the set of registered event streams.<K,V>
LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyKeyValueStore<K,V>>localKeyValueStore(String store)
Gets a read-only access to a local key-value store.<K,V>
LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlySessionStore<K,V>>localSessionStore(String storeName)
Gets a read-only access to a local session store.<K,V>
LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyKeyValueStore<K,org.apache.kafka.streams.state.ValueAndTimestamp<V>>>localTimestampedKeyValueStore(String storeName)
Gets a read-only access to the local timestamped key-value store.<K,V>
LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyWindowStore<K,org.apache.kafka.streams.state.ValueAndTimestamp<V>>>localTimestampedWindowStore(String storeName)
Gets a read-only access to a local window store.<K,V>
LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyWindowStore<K,V>>localWindowStore(String storeName)
Gets a read-only access to a local window store.Set<MetricGroup>
metrics(KafkaStreamsContainer.KafkaMetricFilter filter)
Gets allMetric
s for thisKafkaStreams
instance matching the specifiedKafkaStreamsContainer.KafkaMetricFilter
.static LocalKafkaStreamsContainerBuilder
newBuilder()
<K,V>
LocalQueryCall<K,V>newQueryCall(QueryRequest request)
Creates a newQueryCall
for specifiedQueryRequest
.ConsumerGroupOffsets
offsets()
Gets the offsets for the topic/partitions assigned to thisKafkaStreams
instance.<K,V>
voidregisterEventStream(EventStream<K,V> eventStream)
Registers a newEventStream
to this container.void
restart()
Restarts this container.void
setState(State newState)
Sets the current state of the streams.Future<State>
start(Executor executor)
Asynchronously start the underlyingKafkaStreams
instance.long
startedSince()
Gets the started epoch-time in milliseconds.TimestampedValue<State>
state()
Gets the current state of the streams container.Conf
streamsConfig()
Gets the configuration for thisKafkaStreams
instance.Set<org.apache.kafka.streams.processor.ThreadMetadata>
threadMetadata()
Gets the local thread metadata.StreamsTopologyGraph
topologyGraph()
Gets theStreamsTopologyGraph
for thisKafkaStreams
instance.TopologyMetadata
topologyMetadata()
Gets theTopologyMetadata
about the topology runs by thisKafkaStreams
instance.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer
metrics
-
-
-
-
Field Detail
-
streamsFactory
protected final KafkaStreamsFactory streamsFactory
-
streamsConfig
protected final Conf streamsConfig
-
topologyDefinition
protected final TopologyDefinition topologyDefinition
-
interceptors
protected final List<StreamsLifecycleInterceptor> interceptors
-
-
Method Detail
-
newBuilder
public static LocalKafkaStreamsContainerBuilder newBuilder()
- Returns:
- a new
LocalKafkaStreamsContainerBuilder
instance.
-
newQueryCall
public <K,V> LocalQueryCall<K,V> newQueryCall(QueryRequest request) throws InvalidQueryException
Creates a newQueryCall
for specifiedQueryRequest
.- Specified by:
newQueryCall
in interfaceQueryableKafkaStreams
- Specified by:
newQueryCall
in interfaceQueryCall.QueryCallFactory
- Type Parameters:
K
- the expected type for record-key.V
- the expected type for record-value.- Parameters:
request
- the query.- Returns:
- a new
QueryCall
object. - Throws:
InvalidQueryException
- if the given request is invalid.
-
start
public Future<State> start(Executor executor)
Asynchronously start the underlyingKafkaStreams
instance.- Parameters:
executor
- theExecutor
instance to be used for starting the streams.
-
getEventStreamPublisherForType
public <K,V> EventStreamPublisher<K,V> getEventStreamPublisherForType(String eventType)
Gets a newEventStreamPublisher
for the given event-tye.- Specified by:
getEventStreamPublisherForType
in interfaceKafkaStreamsContainer
- Parameters:
eventType
- theEventStream
type.- Returns:
- a new
EventStreamPublisher
instance.
-
registerEventStream
public <K,V> void registerEventStream(EventStream<K,V> eventStream)
Registers a newEventStream
to this container.- Specified by:
registerEventStream
in interfaceKafkaStreamsContainer
- Type Parameters:
K
- the record key-type.V
- the record value-type.- Parameters:
eventStream
- theEventStream
to register.
-
listRegisteredEventStreamTypes
public Set<String> listRegisteredEventStreamTypes()
Gets the set of registered event streams.- Specified by:
listRegisteredEventStreamTypes
in interfaceKafkaStreamsContainer
- Returns:
- the
Set
of event-streams.
-
setState
public void setState(State newState)
Sets the current state of the streams.- Parameters:
newState
- the new state of the Kafka Streams.
-
state
public TimestampedValue<State> state()
Gets the current state of the streams container.- Specified by:
state
in interfaceKafkaStreamsContainer
- Returns:
- a
TimestampedValue
instance.
-
defaultKeySerde
public Optional<org.apache.kafka.common.serialization.Serde> defaultKeySerde()
Gets the defaultSerde
configured for key.- Specified by:
defaultKeySerde
in interfaceKafkaStreamsContainer
- Returns:
- a optional
Serde
instance.
-
threadMetadata
public Set<org.apache.kafka.streams.processor.ThreadMetadata> threadMetadata()
Gets the local thread metadata.- Specified by:
threadMetadata
in interfaceKafkaStreamsContainer
- Returns:
- a set of
ThreadMetadata
instance.
-
startedSince
public long startedSince()
Gets the started epoch-time in milliseconds.- Specified by:
startedSince
in interfaceKafkaStreamsContainer
- Returns:
- a unix epoch-time in milliseconds.
-
streamsConfig
public Conf streamsConfig()
Gets the configuration for thisKafkaStreams
instance.- Specified by:
streamsConfig
in interfaceKafkaStreamsContainer
- Returns:
- a
Conf
instance.
-
containerId
public String containerId()
Gets the id for this container.- Specified by:
containerId
in interfaceKafkaStreamsContainer
- Returns:
- a string container id.
-
applicationId
public String applicationId()
Gets configuredStreamsConfig.APPLICATION_ID_CONFIG
for thisKafkaStreams
instance.- Specified by:
applicationId
in interfaceKafkaStreamsContainer
- Returns:
- a string
application.id
-
endpoint
public Optional<Endpoint> endpoint()
Gets the endpoint configured for thisKafkaStreams
instance.- Specified by:
endpoint
in interfaceKafkaStreamsContainer
- Returns:
- an optional
Endpoint
- See Also:
StreamsConfig.APPLICATION_SERVER_CONFIG
-
exception
public Optional<Throwable> exception()
Gets the last observed exception thrown theKafkaStreams
instance.- Specified by:
exception
in interfaceKafkaStreamsContainer
- Returns:
- a
Throwable
instance.
-
topologyMetadata
public TopologyMetadata topologyMetadata()
Gets theTopologyMetadata
about the topology runs by thisKafkaStreams
instance.- Specified by:
topologyMetadata
in interfaceKafkaStreamsContainer
- Returns:
- a
TopologyMetadata
instance.
-
topologyGraph
public StreamsTopologyGraph topologyGraph()
Gets theStreamsTopologyGraph
for thisKafkaStreams
instance.- Specified by:
topologyGraph
in interfaceKafkaStreamsContainer
- Returns:
- a new
TopologyDescription
instance.
-
metrics
public Set<MetricGroup> metrics(KafkaStreamsContainer.KafkaMetricFilter filter)
Gets allMetric
s for thisKafkaStreams
instance matching the specifiedKafkaStreamsContainer.KafkaMetricFilter
.- Specified by:
metrics
in interfaceKafkaStreamsContainer
- Parameters:
filter
- theKafkaStreamsContainer.KafkaMetricFilter
to be used.- Returns:
- a
Set
ofMetric
. - See Also:
KafkaStreams.metrics()
-
offsets
public ConsumerGroupOffsets offsets()
Gets the offsets for the topic/partitions assigned to thisKafkaStreams
instance. If theKafkaStreams
instance is not running then no offsets will be computed.- Specified by:
offsets
in interfaceKafkaStreamsContainer
- Returns:
- the
ConsumerGroupOffsets
.
-
createNewProducer
public org.apache.kafka.clients.producer.Producer<byte[],byte[]> createNewProducer(Map<String,Object> overrides)
Creates a newProducer
instance using the same configs that the Kafka Streams instance.- Specified by:
createNewProducer
in interfaceKafkaStreamsContainer
- Parameters:
overrides
- the producer configs to overrides.
-
getAdminClient
public org.apache.kafka.clients.admin.AdminClient getAdminClient()
Gets a sharedAdminClient
instance for thisKafkaStreams
instance.- Specified by:
getAdminClient
in interfaceKafkaStreamsContainer
- Returns:
- a
AdminClient
instance.
-
close
public void close(Duration timeout)
Closes thisKafkaStreams
instance.- Specified by:
close
in interfaceKafkaStreamsContainer
-
close
public void close(boolean cleanUp, Duration timeout)
Closes thisKafkaStreams
instance and wait up to the timeout for the streams to be closed. Atimeout
of 0 means to return immediately (i.eDuration.ZERO
- Specified by:
close
in interfaceKafkaStreamsContainer
- Parameters:
cleanUp
- flag to clean up the local streams states.timeout
- the duration to wait for the streams to shutdown.
-
restart
public void restart()
Restarts this container.- Specified by:
restart
in interfaceKafkaStreamsContainer
-
allLocalStorePartitionInfos
public List<LocalStatePartitionsInfo> allLocalStorePartitionInfos()
Gets the partition restoration and lag for all local state store.- Specified by:
allLocalStorePartitionInfos
in interfaceKafkaStreamsContainer
- Returns:
- the list of
LocalStatePartitionsInfo
.
-
allInstances
public List<KafkaStreamsInstance> allInstances()
-
describe
public KafkaStreamsInstance describe()
Describes the localKafkaStreams
instance.- Specified by:
describe
in interfaceKafkaStreamsContainer
- Returns:
- the
KafkaStreamsInstance
instance.
-
getLocalInstance
public KafkaStreamsInstance getLocalInstance(KafkaStreamsMetadata metadata)
-
findMetadataForStoreAndKey
public <K> Optional<org.apache.kafka.streams.KeyQueryMetadata> findMetadataForStoreAndKey(String storeName, K key, org.apache.kafka.common.serialization.Serializer<K> keySerializer)
- Specified by:
findMetadataForStoreAndKey
in interfaceLocalStoreAccessProvider
-
localKeyValueStore
public <K,V> LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyKeyValueStore<K,V>> localKeyValueStore(String store)
Gets a read-only access to a local key-value store.- Specified by:
localKeyValueStore
in interfaceLocalStoreAccessProvider
- Type Parameters:
K
- the type of the key.V
- the type of the value.- Parameters:
store
- the name of the store to access.- Returns:
- the
LocalStoreAccessor
instance.
-
localTimestampedKeyValueStore
public <K,V> LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyKeyValueStore<K,org.apache.kafka.streams.state.ValueAndTimestamp<V>>> localTimestampedKeyValueStore(String storeName)
Gets a read-only access to the local timestamped key-value store.- Specified by:
localTimestampedKeyValueStore
in interfaceLocalStoreAccessProvider
- Type Parameters:
K
- the type of the key.V
- the type of the value.- Parameters:
storeName
- the name of the store to access.- Returns:
- the
LocalStoreAccessor
instance.
-
localWindowStore
public <K,V> LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyWindowStore<K,V>> localWindowStore(String storeName)
Gets a read-only access to a local window store.- Specified by:
localWindowStore
in interfaceLocalStoreAccessProvider
- Type Parameters:
K
- the type of the key.V
- the type of the value.- Parameters:
storeName
- the name of the store to access.- Returns:
- the
LocalStoreAccessor
instance.
-
localTimestampedWindowStore
public <K,V> LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlyWindowStore<K,org.apache.kafka.streams.state.ValueAndTimestamp<V>>> localTimestampedWindowStore(String storeName)
Gets a read-only access to a local window store.- Specified by:
localTimestampedWindowStore
in interfaceLocalStoreAccessProvider
- Type Parameters:
K
- the type of the key.V
- the type of the value.- Parameters:
storeName
- the name of the store to access.- Returns:
- the
LocalStoreAccessor
instance.
-
localSessionStore
public <K,V> LocalStoreAccessor<org.apache.kafka.streams.state.ReadOnlySessionStore<K,V>> localSessionStore(String storeName)
Gets a read-only access to a local session store.- Specified by:
localSessionStore
in interfaceLocalStoreAccessProvider
- Type Parameters:
K
- the type of the key.V
- the type of the value.- Parameters:
storeName
- the name of the store to access.- Returns:
- the
LocalStoreAccessor
instance.
-
findAllEndpointsForStore
public Set<Endpoint> findAllEndpointsForStore(String storeName)
- Specified by:
findAllEndpointsForStore
in interfaceLocalStoreAccessProvider
-
isRunning
public boolean isRunning()
Checks if theKafkaStreams
is either RUNNING or REBALANCING.- Specified by:
isRunning
in interfaceKafkaStreamsContainer
- Returns:
false
if noKafkaStreams
is initialized.
-
addStateChangeWatcher
public void addStateChangeWatcher(KafkaStreamsContainer.StateChangeWatcher watcher)
Register a watcher to be notified ofKafkaStreams.State
change event.- Specified by:
addStateChangeWatcher
in interfaceKafkaStreamsContainer
- Parameters:
watcher
- theKafkaStreamsContainer.StateChangeWatcher
to be registered.
-
getKafkaStreams
public org.apache.kafka.streams.KafkaStreams getKafkaStreams()
Returns the wrappedKafkaStreams
instance. This method can throw anUnsupportedOperationException
if theKafkaStreamsContainer
implementation doesn't manage theKafkaStreams
locally.- Specified by:
getKafkaStreams
in interfaceKafkaStreamsContainer
- Returns:
- the
KafkaStreams
.
-
getTopology
public org.apache.kafka.streams.Topology getTopology()
Returns the wrappedTopology
instance. This method can throw anUnsupportedOperationException
if theKafkaStreamsContainer
implementation doesn't manage theKafkaStreams
locally.- Specified by:
getTopology
in interfaceKafkaStreamsContainer
- Returns:
- the
Topology
instance.
-
checkEndpoint
public boolean checkEndpoint(Endpoint endpoint)
Checks whether the givenEndpoint
is the same as this container.- Specified by:
checkEndpoint
in interfaceKafkaStreamsContainer
- Parameters:
endpoint
- theEndpoint
to verify.- Returns:
true
if the given host
-
-