Streams

Endpoints for retrieving information about streams instances running locally.

Endpoints for retrieving information about streams instances running locally.

Base URL: /api/v1/streams

GET /api/v1/streams

Get a list of streams instances currently active on the local JVM.

Example Request

GET /api/v1/streams
Host: localhost:8080

Example Response

[
  "word-count-topology-1-0"
]

GET /api/v1/streams/(string: applicationId)

Get information about the local streams instance.

Example Request

GET /api/v1/streams/word-count-topology-1-0
Host: localhost:8080

Example Response

{
  "since": "2019-10-04T14:06:06.137+02:00[Europe/Paris]",
  "state": {
    "since": "2019-10-04T14:06:06.461+02:00[Europe/Paris]",
    "state": "RUNNING"
  },
  "version": "1.0",
  "description": null,
  "config": {
    "bootstrap.servers": "localhost:9092",
    "application.server": "localhost:8080",
    "log.maxFileSize": 104857600,
    "default.value.serde": "org.apache.kafka.common.serialization.Serdes$StringSerde",
    "rocksdb.config.setter": "io.streamthoughts.azkarra.api.streams.rocksdb.DefaultRocksDBConfigSetter",
    "state.dir": "/tmp/kafka-streams/",
    "stats.enable": false,
    "default.key.serde": "org.apache.kafka.common.serialization.Serdes$StringSerde",
    "log.dir": "/var/log/kafka-streams/rocksdb",
    "stats.dumpPeriodSec": 30,
    "application.id": "word-count-topology-1-0"
  },
  "name": "WordCountTopology"
}

GET /api/v1/streams/(string: applicationId)/status

Get current status about the running tasks for the streams application.

Example Request

GET /api/v1/streams/word-count-topology-1-0/tasks
Host: localhost:8080

Example Response


GET /api/v1/streams/(string: applicationId)/config

Get the configuration for the streams application.

Example Request

GET /api/v1/streams/word-count-topology-1-0/config
Host: localhost:8080

Example Response

{
  "stats.dumpPeriodSec": 30,
  "default.value.serde": "org.apache.kafka.common.serialization.Serdes$StringSerde",
  "rocksdb.config.setter": "io.streamthoughts.azkarra.api.streams.rocksdb.DefaultRocksDBConfigSetter",
  "bootstrap.servers": "localhost:9092",
  "application.server": "localhost:8080",
  "state.dir": "/tmp/kafka-streams/",
  "stats.enable": false,
  "log.maxFileSize": 104857600,
  "default.key.serde": "org.apache.kafka.common.serialization.Serdes$StringSerde",
  "log.dir": "/var/log/kafka-streams/rocksdb",
  "application.id": "word-count-topology-1-0"
}

GET /api/v1/streams/(string: applicationId)/offsets

Get the consumers offsets positition for the streams application.

Example Request

GET /api/v1/streams/word-count-topology/config
Host: localhost:8080

Example Response

{
  "group": "word-count-topology",
  "consumers": [
    {
      "client_id": "word-count-topology-babe9079-fc6e-4b9e-a518-3067c899e692-StreamThread-1-consumer",
      "stream_thread": "word-count-topology-babe9079-fc6e-4b9e-a518-3067c899e692-StreamThread-1",
      "positions": [
        {
          "topic": "streams-plaintext-input",
          "partition": 0,
          "consumed_offset": 5,
          "consumed_timestamp": 1589230351903,
          "committed_offset": 6,
          "committed_timestamp": 1589230354458,
          "log_end_offset": 6,
          "log_start_offset": 0,
          "lag": 0
        },
        {
          "topic": "word-count-topology-count-repartition",
          "partition": 0,
          "consumed_offset": 11,
          "consumed_timestamp": 1589230351903,
          "committed_offset": 12,
          "committed_timestamp": 1589230354525,
          "log_end_offset": 12,
          "log_start_offset": 12,
          "lag": 0
        }
      ]
    }
  ]
}

GET /api/v1/streams/(string: applicationId)/metrics

Get current metrics for the streams application.

Accepted query parameters :

Parameter/Value Description
format=prometheus Get streams metrics for Prometheus scrapping
filter_empty Filter all streams metrics with empty value

Example Request

GET /api/v1/streams/word-count-topology-1-0/metrics
Host: localhost:8080

Example Response

{ 
  [{
    "name": "consumer-fetch-manager-metrics",
    "metrics": [
      {
        "name": "bytes-consumed-rate",
        "description": "The average number of bytes consumed per second for a topic",
        "tags": {
          "client-id": "word-count-topology-1-0-5f27b08e-f7a2-408e-b8d1-72ddf4d9adc6-StreamThread-1-consumer",
          "topic": "word-count-topology-1-0-count-repartition"
        },
        "value": 0
      },
      {
        "name": "bytes-consumed-total",
        "description": "The total number of bytes consumed",
        "tags": {
          "client-id": "word-count-topology-1-0-5f27b08e-f7a2-408e-b8d1-72ddf4d9adc6-StreamThread-1-consumer"
        },
        "value": 0
      },
      {
        "name": "fetch-latency-avg",
        "description": "The average time taken for a fetch request.",
        "tags": {
          "client-id": "word-count-topology-1-0-5f27b08e-f7a2-408e-b8d1-72ddf4d9adc6-StreamThread-1-consumer"
        },
        "value": 501.46875
      },
    ...
   }]
}

GET /api/v1/streams/(string: applicationId)/metrics/group/{group}

Get current metrics for the streams application and metric group.

Accepted query parameters :

Parameter/Value Description
format=prometheus Get streams metrics for Prometheus scrapping
filter_empty Filter all streams metrics with empty value

Example Request

GET /api/v1/streams/word-count-topology-1-0/metrics/metrics/group/app-info
Host: localhost:8080

GET /api/v1/streams/(string: applicationId)/metrics/group/{group}/name/{name}

Get current metrics for the streams application, metric group and name.

Accepted query parameters :

Parameter/Value Description
format=prometheus Get streams metrics for Prometheus scrapping
filter_empty Filter all streams metrics with empty value

Example Request

GET /api/v1/streams/word-count-topology-1-0/metrics/metrics/group/app-info/name/version
Host: localhost:8080

GET /api/v1/streams/(string: applicationId)/metrics/group/{group}/name/{name}/value

Get current metrics for the streams application, metric group and name.

Example Request

GET /api/v1/streams/word-count-topology-1-0/metrics/metrics/group/app-info/name/version/value
Host: localhost:8080

POST /api/v1/streams/(string: applicationId)/restart

Restart the local active streams instance.

Example Request

POST /api/v1/streams/word-count-topology-1-0/restart
Host: localhost:8080

Example Response

RESPONSE 200/OK

DELETE /api/v1/streams/(string: applicationId)/stop

Stop the local active streams instance.

Request JSON Object:

  • cleanup (boolean): Flag to indicate if the local streams states should be cleaned up.

Example Request

DELETE /api/v1/streams/word-count-topology-1-0/stop
Host: localhost:8080
{
  cleanup: false
}

Example Response

RESPONSE 200/OK

GET /api/v1/streams/(string: applicationId)/subscribe/(string: eventType)

Establish a new Server-sent events (SSE) connection to receive a stream of updates from a local Kafka Streams instance.

Last modified September 25, 2020: update(site): add docs for SSE (69182c5)