http-kafka proxy
http-kafka proxy
The http-kafka proxy binding for adapting http request-response streams to kafka topic streams.
http_kafka_proxy:
type: http-kafka
kind: proxy
routes:
- when:
- method: GET
path: /items
exit: kafka_cache_client
with:
capability: fetch
topic: items-snapshots
merge:
content-type: application/json
- when:
- method: GET
path: /items/{id}
exit: kafka_cache_client
with:
capability: fetch
topic: items-snapshots
filters:
- key: ${params.id}
- when:
- method: PUT
path: /items/{id}
- method: GET
path: /items/{id};cid={correlationId}
exit: kafka_cache_client
with:
capability: produce
topic: items-requests
acks: leader_only
key: ${params.id}
reply-to: items-responses
async:
location: /items/${params.id};cid=${correlationId}Configuration (* required)
options
object
http-kafka specific options.
options:
idempotency:
header: idempotency-key
correlation:
headers:
reply-to: zilla:reply-to
correlation-id: zilla:correlation-idoptions.idempotency
object
HTTP request header used to specify the idempotency key.
idempotency.header*
string| Default:idempotency-key
HTTP request header name for idempotency key.
options.correlation
object
Kafka request message headers injected.
correlation.headers*
object
Kafka request message reply to and correlation id header names injected.
headers.reply-to
string| Default:zilla:reply-to
Kafka header name for reply-to topic.
headers.correlation-id
string| Default:zilla:correlation-id
Kafka header name for request-response correlation identifier.
routes*
arrayofobject
Conditional http-kafka specific routes.
Correlated Request-Response route:
routes:
- when:
- method: PUT
path: /items/{id}
- method: GET
path: /items/{id};cid={correlationId}
exit: kafka_cache_client
with:
capability: produce
topic: items-requests
acks: leader_only
key: ${params.id}
reply-to: items-responses
async:
location: /items/${params.id};cid=${correlationId}Single topic CRUD routes:
routes:
- when:
- method: POST
path: /items
exit: kafka_cache_client
with:
capability: produce
topic: items-crud
key: ${idempotencyKey}
- when:
- method: GET
path: /items
exit: kafka_cache_client
with:
capability: fetch
topic: items-snapshots
merge:
content-type: application/json
- when:
- method: GET
path: /items/{id}
exit: kafka_cache_client
with:
capability: fetch
topic: items-snapshots
filters:
- key: ${params.id}
- when:
- path: /items/{id}
exit: kafka_cache_client
with:
capability: produce
topic: items-crud
key: ${params.id}routes[].guarded
objectas map of namedarrayofstring
Roles required by named guard.
routes:
- guarded:
my_guard:
- read:itemsDynamic guarded routes
Dynamic guarded routes allow roles to be evaluated at runtime based on the incoming HTTP request.
routes:
- guarded:
my_guard:
- ${method}${path}
${method}Replaced with the request method (e.g.,GET,POST)
${path}Replaced with the request path (e.g.,/items,/users/123)
A combination of both static and dynamic guarded routes is supported.
routes[].when
arrayofobject
List of conditions (any match) to match this route when adapting http request-response streams to kafka topic streams. Read more: When a route matches
routes:
- when:
- method: GET
path: /items/{id};cid={correlationId}when[].method
string
HTTP Method, such as GET, HEAD, POST, PUT, DELETE, CONNECT, OPTIONS, TRACE, PATCH.
when[].path
string| Pattern:^/
Path with optional embedded parameter names, such as /{topic}.
routes[].exit
string
Next binding when following this route.
routes:
- when:
...
exit: kafka_cache_clientroutes[].with*
object
Defines the route with the Fetch capability.
with:
capability: fetchDefines the route with the Produce capability.
with:
capability: producewith.capability: fetch*
const
Kafka parameters for matched route when adapting http request-response streams to kafka topic fetch streams.
Routes with fetch capability map http GET requests to a kafka log-compacted topic, supporting filtered retrieval of messages with a specific key, or unfiltered retrieval of all messages with distinct keys in the topic merged into a unified response.
Filtering can be performed by kafka message key, message headers, or a combination of both message key and headers, extracting the parameter values from the inbound http request path.
Status 200 http responses include an etag header that can be used with if-none-match for subsequent conditional GET requests to check for updates. Rather than polling, http requests can also include the prefer: wait=N header to wait a maximum of N seconds before responding with 304 if not modified. When a new message arrives in the topic that would modify the response, then all prefer: wait=N clients receive the response immediately.
with:
capability: fetch
topic: items-snapshots
filters:
- key: ${params.id}
merge:
content-type: application/json
patch:
initial: []
path: /-with.topic*
string
Kafka topic name. Supports dynamic parameter substitution using the following patterns:
${params.topic}– Substitutes a path parameter.${guarded['jwt'].identity}– Substitutes a guarded identity value.
with.filters
arrayofobject
List of criteria (any match) to this filter. Kafka filters for matched route when adapting http request-response streams to kafka topic fetch streams. All specified headers and key must match for the combined criteria to match.
filters[].key
string
Message key. Supports dynamic parameter substitution using the following patterns:
${params.key}– Substitutes a path parameter.${guarded['jwt'].identity}– Substitutes a guarded identity value.
filters[].headers
objectas map of namedstringproperties
Message headers. Supports dynamic parameter substitution using the following patterns:
${params.headerX}– Substitutes a path parameter.${guarded['jwt'].identity}– Substitutes a guarded identity value.
with.merge
object
Merge multiple Kafka messages into a unified HTTP response. Kafka merge configuration for matched route when adapting http request-response streams to kafka topic streams where all messages are fetched and must be merged into a unified http response.
merge.content-type: application/json*
const
Content type of merged HTTP response.
merge.patch
object
Describes how to patch initial HTTP response to include one or more Kafka messages in unified HTTP response.
patch (application/json)
Kafka merge patch configuration for matched route when adapting
httprequest-response streams tokafkatopic streams where all messages are fetched and must be merged into a unifiedhttpresponse.
patch.initial: []*
const
Initial JSON value.
patch.path: /-*
const
JSON Patch path to include each Kafka message in unified HTTP response.
with.capability: produce*
const
Kafka parameters for matched route when adapting http request-response streams to kafka topic produce streams.
Routes with produce capability map any http request-response to a correlated pair of kafka messages. The http request message is sent to a requests topic, with a zilla:correlation-id header. When the request message received and processed by the kafka requests topic consumer, it produces a response message to the responses topic, with the same zilla:correlation-id header to correlate the response.
Requests including an idempotency-key http header can be replayed and safely receive the same response. This requires the kafka consumer to detect and ignore the duplicate request with the same idempotency-key and zilla:correlation-id.
Specifying async allows clients to include a prefer: respond-async header in the http request to receive 202 Accepted response with location response header.
A corresponding routes[].when object with matching GET method and location path is also required for follow up GET requests to return the same response as would have been returned if prefer: respond-async request header had been omitted.
with:
capability: produce
topic: items-requests
acks: leader_only
key: ${params.id}
reply-to: items-responses
overrides:
custom-text: custom-value
async:
location: /items/${params.id};cid=${correlationId}with.topic*
string
Kafka topic name. Supports dynamic parameter substitution using the following patterns:
${params.topic}– Substitutes a path parameter.${guarded['jwt'].identity}– Substitutes a guarded identity value.
with.acks
enum[none,leader_only,in_sync_replicas] | Default:in_sync_replicas
Kafka acknowledgement mode
with.key
string
Kafka message key. Supports dynamic parameter substitution using the following patterns:
${params.id}– Substitutes a path parameter.${guarded['jwt'].identity}– Substitutes a guarded identity value.
with.overrides
objectas map of namedstringproperties
Kafka message headers, with values optionally referencing path parameter.
with.reply-to
string
Kafka reply-to topic name.
with.async
objectas map of namedstringproperties
Allows an HTTP response to be retrieved asynchronously.
A location: <path> property can be used to define the path where an async result can be fetched, with the <path> value optionally referencing route path parameters or the ${correlationId}.
telemetry
object
Defines the desired telemetry for the binding.
telemetry.metrics
array
Telemetry metrics to track
telemetry:
metrics:
- stream.*
- http.*
