Transparent REST Client Proxies

AppKernel provides a lightweight async HTTP client layer for inter-service calls. A single httpx.AsyncClient connection pool is created at startup and reused for the lifetime of the process.

Overview

Three classes form the client layer:

  • HttpClientFactory — entry point. Call get(root_url) to obtain a proxy bound to an upstream service.

  • HttpClientServiceProxy — per-upstream proxy. Builds RequestWrapper instances via attribute access or wrap().

  • RequestWrapper — per-resource wrapper. Exposes get, post, put, patch, and delete as async methods.

All HTTP I/O goes through a single httpx.AsyncClient singleton so connections are pooled and reused across calls.

Basic usage

from appkernel import HttpClientFactory

payments = HttpClientFactory.get('http://payments.svc')

# POST to http://payments.svc/authorisations/
code, result = await payments.authorisations.post(payload)

# GET http://payments.svc/refunds/12345
code, result = await payments.wrap('/refunds/12345').get()

URL construction

There are two ways to get a RequestWrapper from a proxy.

Attribute access appends the attribute name with a trailing slash:

payments.authorisations
# → RequestWrapper('http://payments.svc/authorisations/')

payments.charges
# → RequestWrapper('http://payments.svc/charges/')

wrap() gives explicit control over the path. No trailing slash is added:

payments.wrap('/refunds/12345')
# → RequestWrapper('http://payments.svc/refunds/12345')

payments.wrap('charges/')
# → RequestWrapper('http://payments.svc/charges/')

path_extension on a method call appends a sub-path to the wrapper’s URL at call time:

wrapper = payments.wrap('/refunds')
code, result = await wrapper.get(path_extension='12345/items')
# GET http://payments.svc/refunds/12345/items

Leading and trailing slashes are normalised — rstrip('/') on the base and lstrip('/') on the extension — so double slashes are never produced.

Request methods

All five HTTP methods are available on RequestWrapper:

code, result = await wrapper.get(path_extension=None, timeout=3)
code, result = await wrapper.post(payload, path_extension=None, timeout=3)
code, result = await wrapper.put(payload, path_extension=None, timeout=3)
code, result = await wrapper.patch(payload, path_extension=None, timeout=3)
code, result = await wrapper.delete(payload, path_extension=None, timeout=3)

Parameter

Default

Description

payload

None

Request body. A Model instance is serialised with dumps(). A dict is serialised with json.dumps(). Any other value is sent as-is (string / bytes).

path_extension

None

Sub-path appended to the wrapper URL. See URL construction.

timeout

3

Per-request timeout in seconds, overriding the pool-level read timeout.

All methods follow redirects automatically (follow_redirects=True).

File upload and download

Three dedicated methods handle binary file transfer. They share the same URL-construction rules and circuit-breaker integration as the standard HTTP methods.

upload()

Sends a file as multipart/form-data (field name file) with a POST request. The response is deserialised using the same JSON logic as post() — returning a Model instance if _type is present, or a plain dict otherwise:

from appkernel import HttpClientFactory

files_svc = HttpClientFactory.get('http://files.svc')

with open('photo.jpg', 'rb') as fh:
    code, ref = await files_svc.wrap('/files/').upload(
        fh,
        filename='photo.jpg',
        content_type='image/jpeg',
    )
# code = 201, ref = FileRef(id='F…', original_filename='photo.jpg', …)

# Bytes are also accepted directly
code, ref = await files_svc.wrap('/files/').upload(
    b'\xff\xd8\xff\xe0…',
    filename='photo.jpg',
    content_type='image/jpeg',
)

Parameter

Default

Description

file

(required)

File content. Accepts bytes, a binary file-like object, or a path string.

filename

'upload'

Original filename sent in the multipart part header.

content_type

'application/octet-stream'

MIME type of the file part.

path_extension

None

Sub-path appended to the wrapper URL. See URL construction.

timeout

60

Per-request timeout in seconds. Higher default than regular requests to accommodate large uploads.

download()

Fetches a file and returns its raw bytes:

code, data = await files_svc.wrap('/files/F123/content').download()
# code = 200, data = b'…raw bytes…'

with open('photo.jpg', 'wb') as fh:
    fh.write(data)

The entire body is buffered in memory. For large files use stream_download() to avoid high memory usage.

Parameter

Default

Description

path_extension

None

Sub-path appended to the wrapper URL.

timeout

30

Per-request timeout in seconds.

Returns (status_code: int, body: bytes).

stream_download()

An async generator that yields the response body in chunks. No full-file buffer is held in memory, making it suitable for large files or when bytes are piped directly to a StreamingResponse:

async for chunk in files_svc.wrap('/files/F123/content').stream_download(
    chunk_size=65536,
):
    await writer.write(chunk)

# Pipe into a FastAPI StreamingResponse
from starlette.responses import StreamingResponse

return StreamingResponse(
    files_svc.wrap('/files/F123/content').stream_download(),
    media_type='application/octet-stream',
)

Parameter

Default

Description

path_extension

None

Sub-path appended to the wrapper URL.

chunk_size

65536

Read buffer size in bytes (64 KiB).

timeout

30

Per-request timeout in seconds.

Raises CircuitOpenError immediately (before any network call) when the circuit breaker is OPEN. Raises RequestHandlingException on non-2xx responses or network errors.

download_to()

Convenience wrapper around stream_download() that saves the file directly to the local filesystem without any intermediate buffer:

# Save to an explicit file path
path = await files_svc.wrap('/files/F123/content').download_to('/tmp/report.pdf')

# Save to a directory — filename resolved from Content-Disposition
path = await files_svc.wrap('/files/F123/content').download_to('/tmp/downloads/')
# → '/tmp/downloads/report.pdf'  (or last URL segment as fallback)

The filename resolution rules when dest is a directory (or ends with os.sep):

  1. Use the filename= parameter from the Content-Disposition response header (handles both filename="foo" and filename*=UTF-8''foo).

  2. Fall back to the last non-empty path segment of the upstream URL.

Parameter

Default

Description

dest

(required)

Destination file path, or an existing directory.

path_extension

None

Sub-path appended to the wrapper URL.

chunk_size

65536

Read buffer size in bytes (64 KiB).

timeout

30

Per-request timeout in seconds.

Returns the absolute path of the saved file.

upload_from()

Convenience wrapper around upload() that reads a file from the local filesystem. The filename and MIME type are inferred from the path when not provided explicitly:

# Infer filename ('report.pdf') and content type ('application/pdf')
code, ref = await files_svc.wrap('/files/').upload_from('/tmp/report.pdf')

# Override both
code, ref = await files_svc.wrap('/files/').upload_from(
    '/tmp/data.bin',
    filename='firmware-v2.bin',
    content_type='application/octet-stream',
)

Inference rules:

  • filenameos.path.basename(local_path)

  • content_typemimetypes.guess_type(); falls back to 'application/octet-stream' for unknown extensions

Parameter

Default

Description

local_path

(required)

Path to the local file to upload.

filename

basename of local_path

Override the filename sent in the multipart header.

content_type

guessed from extension

Override the MIME type sent in the multipart header.

path_extension

None

Sub-path appended to the wrapper URL.

timeout

60

Per-request timeout in seconds.

Returns (status_code, body) — same semantics as upload(). Raises OSError if local_path cannot be opened.

Authentication

RequestWrapper.get_headers() is a @staticmethod that builds a headers dict with an optional Authorization value and an Accept-Language header:

from appkernel.http_client import RequestWrapper

headers = RequestWrapper.get_headers(
    auth_header='Bearer <token>',
    accept_language='en',
)
# {'Authorization': 'Bearer <token>', 'Accept-Language': 'en'}

Note

The built-in get, post, put, patch, and delete methods call get_headers() internally without an auth_header, so they send no Authorization header by default. To inject per-request auth, subclass RequestWrapper and override get_headers:

class AuthenticatedWrapper(RequestWrapper):
    def __init__(self, url, token, circuit=None):
        super().__init__(url, circuit)
        self._token = token

    def get_headers(self, auth_header=None, accept_language='en'):
        return {
            'Authorization': f'Bearer {self._token}',
            'Accept-Language': accept_language or 'en',
        }

Alternatively, configure the shared httpx.AsyncClient with default auth headers by calling configure_http_client() with a pre-configured client before the engine starts.

Response deserialization

On a 2xx response the body is parsed as follows:

  1. If the body is valid JSON and contains a _type key that is not 'OperationResult' or 'ErrorMessage', AppKernel looks up the corresponding Model subclass and reconstructs the object via from_dict(). The _type key is removed from the dict before reconstruction.

  2. If _type is 'OperationResult' or 'ErrorMessage', or if there is no _type key, the response is returned as a plain dict.

  3. If the body is not valid JSON, it is returned as {'result': response_text}.

Both paths return a (status_code: int, body: Model | dict) tuple.

Example — reconstructed Model:

code, user = await users.wrap('/users/U123').get()
# code = 200, user = User(id='U123', name='Alice', ...)
assert isinstance(user, User)

Example — plain dict (list response or OperationResult):

code, data = await users.users.get()
# code = 200, data = {'_items': [...], '_links': {...}}

Error handling

Any non-2xx response or network exception raises RequestHandlingException.

Attribute

Description

status_code

HTTP status code from the upstream response (or 500 for network errors).

message

Error message. If the upstream body is an AppKernel ErrorMessage JSON object the message field is extracted; otherwise a generic string is used.

upstream_service

Set to the upstream_service field from the upstream body, or falls back to the last path segment of the upstream URL.

from appkernel.http_client import RequestHandlingException, CircuitOpenError

try:
    code, result = await payments.authorisations.post(payload)
except CircuitOpenError as exc:
    # Circuit is open — upstream unavailable
    logger.warning('Payments unavailable: %s', exc)
    return cached_result
except RequestHandlingException as exc:
    logger.error('Upstream error %s: %s', exc.status_code, exc.message)
    raise

Connection pool configuration

Pass an HttpClientConfig to AppKernelEngine to control pool and timeout behaviour:

from appkernel import AppKernelEngine, HttpClientConfig

kernel = AppKernelEngine(
    'my-app',
    http_client_config=HttpClientConfig(
        max_connections=100,
        max_keepalive_connections=20,
        keepalive_expiry=30.0,
        connect_timeout=2.0,
        read_timeout=10.0,
        write_timeout=5.0,
        pool_timeout=5.0,
    )
)

The client is initialised at startup and closed gracefully on shutdown via FastAPI’s lifespan.

Parameter

Default

Description

max_connections

100

Total simultaneous connections (active + queued). Requests queue once this limit is reached.

max_keepalive_connections

20

Idle connections held open for reuse. Set to steady-state concurrency.

keepalive_expiry

30.0

Seconds before an idle connection is proactively closed. Keep below the upstream server’s keep-alive timeout (typically 30–75 s).

connect_timeout

2.0

Max seconds to establish a TCP + TLS connection.

read_timeout

10.0

Max seconds to wait for the first response byte after the request is sent.

write_timeout

5.0

Max seconds to finish sending the request body.

pool_timeout

5.0

Max seconds to wait for a connection slot when max_connections is exhausted. Acts as backpressure — raise to queue, lower to fail fast.

Recommended pool profiles:

Circuit breaker

Without a circuit breaker, a slow downstream service holds connections open until they time out, exhausting the pool and causing cascading failures. The circuit breaker detects repeated failures and short-circuits calls immediately, returning HTTP 503 instead of queuing work against an unavailable service.

Each proxy maintains an independent circuit breaker with three states:

  • CLOSED (normal) — requests flow through. Consecutive 5xx and network errors are counted toward failure_threshold.

  • OPEN (tripped) — once failure_threshold consecutive failures are recorded, the circuit opens. All further calls raise CircuitOpenError (status_code = 503) immediately; no network connection is attempted.

  • HALF_OPEN (probing) — after recovery_timeout seconds, up to half_open_max_calls probe requests are allowed through. A successful probe closes the circuit; a failed probe re-opens it.

Only 5xx and network errors count. 4xx client errors do not trip the circuit — they indicate a bad request, not an unhealthy upstream.

The circuit breaker state is in-process — each app instance maintains its own state independently.

Set circuit_breaker on HttpClientConfig to apply the same breaker to every proxy created by HttpClientFactory:

from appkernel import AppKernelEngine, HttpClientConfig, CircuitBreakerConfig

kernel = AppKernelEngine(
    'my-app',
    http_client_config=HttpClientConfig(
        circuit_breaker=CircuitBreakerConfig(
            failure_threshold=5,    # trip after 5 consecutive failures
            recovery_timeout=30.0,  # probe after 30 s
            half_open_max_calls=1,  # one probe request at a time
        )
    )
)

Pass circuit_breaker to HttpClientFactory.get() to override (or disable) the global default for a specific upstream:

from appkernel import HttpClientFactory, CircuitBreakerConfig

# Stricter threshold for a critical payment service
payments = HttpClientFactory.get(
    'http://payments.svc',
    circuit_breaker=CircuitBreakerConfig(failure_threshold=3, recovery_timeout=60.0),
)

# Disable the breaker entirely for an internal metrics endpoint
metrics = HttpClientFactory.get('http://metrics.svc', circuit_breaker=None)

Passing circuit_breaker=None explicitly disables the breaker for that proxy even when a global default is configured.

Parameter

Default

Description

failure_threshold

5

Consecutive 5xx / network errors before the circuit opens.

recovery_timeout

30.0

Seconds in OPEN state before moving to HALF_OPEN and allowing a probe.

half_open_max_calls

1

Probe requests allowed concurrently in HALF_OPEN before the outcome is decided.

Sizing guidance:

  • Critical path (auth, payments): lower failure_threshold (2–3) and longer recovery_timeout (60 s) — fail fast, give the upstream time to recover.

  • Background / non-critical calls: higher failure_threshold (10+) and shorter recovery_timeout (10–15 s) — tolerate transient blips without opening the circuit.