BrokerConnection

class kafka.BrokerConnection(host, port, afi, **configs)[source]

Initialize a Kafka broker connection

Keyword Arguments:
client_id (str): a name for this client. This string is passed in
each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. Default: 50.
reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname. default: True.
ssl_cafile (str): optional filename of ca file to use in certificate
verification. default: None.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. default: None.
ssl_keyfile (str): optional filename containing the client private key.
default: None.
ssl_password (callable, str, bytes, bytearray): optional password or
callable function that returns a password, for decrypting the client private key. Default: None.
ssl_crlfile (str): optional filename containing the CRL to check for
certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: None.
ssl_ciphers (str): optionally set the available ciphers for ssl
connections. It should be a string in the OpenSSL cipher list format. If no cipher can be selected (because compile-time options or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
api_version (tuple): Specify which Kafka API version to use.
Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10). Default: (0, 8, 2)
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker api version. Only applies if api_version is None
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing. Default: selectors.DefaultSelector
state_change_callback (callable): function to be called when the
connection state changes from CONNECTING to CONNECTED etc.
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.

metric_group_prefix (str): Prefix for metric names. Default: ‘’ sasl_mechanism (str): Authentication mechanism when security_protocol

is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER.
sasl_plain_username (str): username for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
sasl_plain_password (str): password for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: ‘kafka’
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
blacked_out()[source]

Return true if we are disconnected from the given node and can’t re-establish a connection yet

can_send_more()[source]

Return True unless there are max_in_flight_requests_per_connection.

check_version(timeout=2, strict=False, topics=[])[source]

Attempt to guess the broker version.

Note: This is a blocking call.

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), …

close(error=None)[source]

Close socket and fail all in-flight-requests.

Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception. Default: kafka.errors.KafkaConnectionError.
connect()[source]

Attempt to connect and return ConnectionState

connected()[source]

Return True iff socket is connected.

connecting()[source]

Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).

connection_delay()[source]

Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting, returns 0 to allow non-blocking connect to finish. When connected, returns a very large number to handle slow/stalled connections.

disconnected()[source]

Return True iff socket is closed

recv()[source]

Non-blocking network receive.

Return list of (response, future) tuples

send(request, blocking=True)[source]

Queue request for async network send, return Future()

send_pending_requests()[source]

Can block on network if request is larger than send_buffer_bytes