Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dialyzer_ignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
lib/mint/tunnel_proxy.ex:49
lib/mint/http1.ex:915
lib/mint/http1.ex:927
lib/mint/unsafe_proxy.ex:173
lib/mint/unsafe_proxy.ex:198
test/support
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### New features

* Add `Mint.HTTP.request_body_window/2` to inspect the request body flow-control window for a streaming request. Returns `min(connection_window, stream_window)` for HTTP/2 and `:infinity` for HTTP/1, which has no application-level flow control.

## v1.7.1

### Bug Fixes and Improvements
Expand Down
2 changes: 2 additions & 0 deletions lib/mint/core/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ defmodule Mint.Core.Conn do
@callback put_proxy_headers(conn(), Mint.Types.headers()) :: conn()

@callback put_log(conn(), boolean()) :: conn()

@callback request_body_window(conn(), Types.request_ref()) :: non_neg_integer() | :infinity
end
73 changes: 73 additions & 0 deletions lib/mint/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,10 @@ defmodule Mint.HTTP do

This function always returns an updated connection to be stored over the old connection.

When streaming a body of arbitrary size, use `request_body_window/2` to learn
how many bytes you can send right now without violating HTTP/2 flow control,
then split your body accordingly before passing each chunk to this function.

For information about transfer encoding and content length in HTTP/1, see
`Mint.HTTP1.stream_request_body/3`.

Expand Down Expand Up @@ -1065,6 +1069,75 @@ defmodule Mint.HTTP do
@impl true
def put_proxy_headers(conn, headers), do: conn_apply(conn, :put_proxy_headers, [conn, headers])

@doc """
Returns the request body flow-control window for the streaming request
identified by `request_ref`.

The semantics differ by protocol:

* In HTTP/2, returns `min(connection_window, stream_window)` — the maximum
number of body bytes that can be sent right now without violating flow
control. Exceeding this value in a single `DATA` frame would close the
connection with a `FLOW_CONTROL_ERROR`. See `Mint.HTTP2.get_window_size/2`
for the underlying primitives.

* In HTTP/1, returns `:infinity`. HTTP/1 has no application-level
flow-control mechanism: any amount of body data is protocol-valid.

The value returned reflects only the protocol-level flow-control
constraint. It does not account for the operating-system socket send
buffer: under either protocol, `stream_request_body/3` can still block
when that buffer fills up. To bound this behavior, configure
`send_timeout` on the socket via `:transport_opts` when establishing the
connection (see `Mint.HTTP.connect/4`).

Raises `ArgumentError` if `request_ref` is not associated with an active
streaming request.

## Examples

Streaming a binary body in chunks that respect the protocol window:

defp stream_body(conn, ref, "") do
Mint.HTTP.stream_request_body(conn, ref, :eof)
end

defp stream_body(conn, ref, body) do
conn
|> Mint.HTTP.request_body_window(ref)
|> send_body_chunk(conn, ref, body)
end

defp send_body_chunk(0, conn, ref, body) do
with {:ok, conn} <- wait(conn, ref) do
stream_body(conn, ref, body)
end
end

defp send_body_chunk(window, conn, ref, body) do
chunk_size = min(window, byte_size(body))
<<chunk::binary-size(chunk_size), rest::binary>> = body

with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, ref, chunk) do
stream_body(conn, ref, rest)
end
end
Comment thread
tank-bohr marked this conversation as resolved.

defp wait(conn, ref) do
# Wait for the server to refill the request body window with a
# WINDOW_UPDATE frame. The concrete implementation depends on the
# socket mode and other context.
end

Note that `min(:infinity, n) == n` thanks to Erlang term ordering, so the
same loop works on HTTP/1 (each iteration sends the entire remaining body in
a single chunk) and on HTTP/2 (each iteration sends at most the current
flow-control window).
"""
@doc since: "1.8.0"
@impl true
def request_body_window(conn, ref), do: conn_apply(conn, :request_body_window, [conn, ref])

## Helpers

defp conn_apply(%UnsafeProxy{}, fun, args), do: apply(UnsafeProxy, fun, args)
Expand Down
12 changes: 12 additions & 0 deletions lib/mint/http1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,18 @@ defmodule Mint.HTTP1 do
%{conn | proxy_headers: headers}
end

@doc """
See `Mint.HTTP.request_body_window/2`.
"""
@doc since: "1.8.0"
@impl true
def request_body_window(%__MODULE__{streaming_request: %{ref: ref}}, ref), do: :infinity

def request_body_window(%__MODULE__{}, ref) do
raise ArgumentError,
"request with request reference #{inspect(ref)} was not found or is not streaming a body"
end

## Helpers

defp decode(:status, %{request: request} = conn, data, responses) do
Expand Down
9 changes: 9 additions & 0 deletions lib/mint/http2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,15 @@ defmodule Mint.HTTP2 do
%{conn | proxy_headers: headers}
end

@doc """
See `Mint.HTTP.request_body_window/2`.
"""
@doc since: "1.8.0"
@impl true
def request_body_window(%__MODULE__{} = conn, ref) do
min(get_window_size(conn, :connection), get_window_size(conn, {:request, ref}))
end

## Helpers

defp handle_closed(conn) do
Expand Down
5 changes: 5 additions & 0 deletions lib/mint/unsafe_proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,9 @@ defmodule Mint.UnsafeProxy do
def put_proxy_headers(%__MODULE__{}, _headers) do
raise "invalid function for proxy unsafe proxy connections"
end

@impl true
def request_body_window(%__MODULE__{module: module, state: state}, ref) do
module.request_body_window(state, ref)
end
end
23 changes: 23 additions & 0 deletions test/http_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
defmodule Mint.HTTPTest do
use ExUnit.Case, async: true
doctest Mint.HTTP

alias Mint.{HTTP, HTTP1.TestServer}

setup do
{:ok, port, server_ref} = TestServer.start()
assert {:ok, conn} = HTTP.connect(:http, "localhost", port)
assert_receive {^server_ref, server_socket}

[conn: conn, server_socket: server_socket]
end

describe "request_body_window/2" do
test "returns :infinity for an HTTP/1 streaming request", %{conn: conn} do
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
assert HTTP.request_body_window(conn, ref) == :infinity
end

test "raises ArgumentError for an unknown request ref", %{conn: conn} do
assert_raise ArgumentError, fn ->
HTTP.request_body_window(conn, make_ref())
end
end
end
end
13 changes: 13 additions & 0 deletions test/mint/http1/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,19 @@ defmodule Mint.HTTP1Test do
{:ok, conn, responses}
end

describe "request_body_window/2" do
test "returns :infinity for an active streaming request", %{conn: conn} do
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
assert HTTP1.request_body_window(conn, ref) == :infinity
end

test "raises if no request is currently streaming a body", %{conn: conn} do
assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn ->
HTTP1.request_body_window(conn, make_ref())
end
end
end

@mint_user_agent "mint/#{Mix.Project.config()[:version]}"
defp mint_user_agent, do: @mint_user_agent
end
76 changes: 76 additions & 0 deletions test/mint/http2/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,82 @@ defmodule Mint.HTTP2Test do
HTTP2.get_window_size(conn, {:request, make_ref()})
end
end

test "request_body_window/2 returns the minimum of connection and request window sizes",
%{conn: conn} do
{conn, ref} = open_request(conn, :stream)

send_window = HTTP2.request_body_window(conn, ref)
conn_window = HTTP2.get_window_size(conn, :connection)
request_window = HTTP2.get_window_size(conn, {:request, ref})

assert send_window == min(conn_window, request_window)
end

test "request_body_window/2 decreases after streaming body data", %{conn: conn} do
{conn, ref} = open_request(conn, :stream)

initial_send_window = HTTP2.request_body_window(conn, ref)
assert initial_send_window > 0

body_chunk = "hello"
{:ok, conn} = HTTP2.stream_request_body(conn, ref, body_chunk)

assert HTTP2.request_body_window(conn, ref) == initial_send_window - byte_size(body_chunk)
end

test "request_body_window/2 raises if the request is not found", %{conn: conn} do
assert_raise ArgumentError, ~r/request with request reference .+ was not found/, fn ->
HTTP2.request_body_window(conn, make_ref())
end
end

@tag server_settings: [initial_window_size: 5]
test "streaming a body larger than the window using request_body_window/2 in a loop",
%{conn: conn} do
{conn, ref} = open_request(conn, :stream)

assert_recv_frames [headers(stream_id: stream_id)]

body = "0123456789ABCDE"

# First chunk: window is 5, so we send 5 bytes.
assert HTTP2.request_body_window(conn, ref) == 5
<<chunk1::binary-size(5), rest1::binary>> = body
{:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk1)

assert HTTP2.request_body_window(conn, ref) == 0

assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk1, flags: flags1)]
assert flags1 == set_flags(:data, [])

# Server replenishes the stream window so we can send more.
{:ok, conn, []} =
stream_frames(conn, [window_update(stream_id: stream_id, window_size_increment: 5)])

assert HTTP2.request_body_window(conn, ref) == 5
<<chunk2::binary-size(5), rest2::binary>> = rest1
{:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk2)

assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk2)]

# Final replenishment for the remaining bytes plus :eof.
{:ok, conn, []} =
stream_frames(conn, [
window_update(stream_id: stream_id, window_size_increment: byte_size(rest2))
])

assert HTTP2.request_body_window(conn, ref) == byte_size(rest2)
{:ok, conn} = HTTP2.stream_request_body(conn, ref, rest2)
{:ok, _conn} = HTTP2.stream_request_body(conn, ref, :eof)

assert_recv_frames [
data(stream_id: ^stream_id, data: ^rest2),
data(stream_id: ^stream_id, data: "", flags: end_flags)
]

assert end_flags == set_flags(:data, [:end_stream])
end
end

describe "settings" do
Expand Down
Loading