Kudu C++ client API
Public Types | Public Member Functions | List of all members
kudu::client::KuduSession Class Reference

Representation of a Kudu client session. More...

#include <client.h>

Inheritance diagram for kudu::client::KuduSession:
Inheritance graph
[legend]
Collaboration diagram for kudu::client::KuduSession:
Collaboration graph
[legend]

Public Types

enum  FlushMode { AUTO_FLUSH_SYNC, AUTO_FLUSH_BACKGROUND, MANUAL_FLUSH }
 Modes of flush operations. More...
 
enum  ExternalConsistencyMode { CLIENT_PROPAGATED, COMMIT_WAIT }
 The possible external consistency modes on which Kudu operates. More...
 

Public Member Functions

Status SetFlushMode (FlushMode m) WARN_UNUSED_RESULT
 
Status SetExternalConsistencyMode (ExternalConsistencyMode m) WARN_UNUSED_RESULT
 
Status SetMutationBufferSpace (size_t size_bytes) WARN_UNUSED_RESULT
 
Status SetMutationBufferFlushWatermark (double watermark_pct) WARN_UNUSED_RESULT
 
Status SetMutationBufferFlushInterval (unsigned int millis) WARN_UNUSED_RESULT
 
Status SetMutationBufferMaxNum (unsigned int max_num) WARN_UNUSED_RESULT
 
void SetTimeoutMillis (int millis)
 
Status Apply (KuduWriteOperation *write_op) WARN_UNUSED_RESULT
 
Status Flush () WARN_UNUSED_RESULT
 
void FlushAsync (KuduStatusCallback *cb)
 
Status Close () WARN_UNUSED_RESULT
 
bool HasPendingOperations () const
 
int CountBufferedOperations () const ATTRIBUTE_DEPRECATED("this method is experimental and will disappear ""in a future release")
 
Status SetErrorBufferSpace (size_t size_bytes)
 
int CountPendingErrors () const
 
void GetPendingErrors (std::vector< KuduError * > *errors, bool *overflowed)
 
KuduClientclient () const
 

Detailed Description

Representation of a Kudu client session.

A KuduSession belongs to a specific KuduClient, and represents a context in which all read/write data access should take place. Within a session, multiple operations may be accumulated and batched together for better efficiency. Settings like timeouts, priorities, and trace IDs are also set per session.

A KuduSession's main purpose is for grouping together multiple data-access operations together into batches or transactions. It is important to note the distinction between these two:

Note
Kudu does not currently support transactions! They are only mentioned in the above documentation to clarify that batches are not transactional and should only be used for efficiency.

KuduSession is separate from KuduTable because a given batch or transaction may span multiple tables. This is particularly important in the future when we add ACID support, but even in the context of batching, we may be able to coalesce writes to different tables hosted on the same server into the same RPC.

KuduSession is separate from KuduClient because, in a multi-threaded application, different threads may need to concurrently execute transactions. Similar to a JDBC "session", transaction boundaries will be delineated on a per-session basis – in between a "BeginTransaction" and "Commit" call on a given session, all operations will be part of the same transaction. Meanwhile another concurrent Session object can safely run non-transactional work or other transactions without interfering.

Additionally, there is a guarantee that writes from different sessions do not get batched together into the same RPCs – this means that latency-sensitive clients can run through the same KuduClient object as throughput-oriented clients, perhaps by setting the latency-sensitive session's timeouts low and priorities high. Without the separation of batches, a latency-sensitive single-row insert might get batched along with 10MB worth of inserts from the batch writer, thus delaying the response significantly.

Though we currently do not have transactional support, users will be forced to use a KuduSession to instantiate reads as well as writes. This will make it more straight-forward to add RW transactions in the future without significant modifications to the API.

Users who are familiar with the Hibernate ORM framework should find this concept of a Session familiar.

Note
This class is not thread-safe.

Member Enumeration Documentation

The possible external consistency modes on which Kudu operates.

Enumerator
CLIENT_PROPAGATED 

The response to any write will contain a timestamp. Any further calls from the same client to other servers will update those servers with that timestamp. Following write operations from the same client will be assigned timestamps that are strictly higher, enforcing external consistency without having to wait or incur any latency penalties.

In order to maintain external consistency for writes between two different clients in this mode, the user must forward the timestamp from the first client to the second by using KuduClient::GetLatestObservedTimestamp() and KuduClient::SetLatestObservedTimestamp().

This is the default external consistency mode.

Warning
Failure to propagate timestamp information through back-channels between two different clients will negate any external consistency guarantee under this mode.
COMMIT_WAIT 

The server will guarantee that write operations from the same or from other client are externally consistent, without the need to propagate timestamps across clients. This is done by making write operations wait until there is certainty that all follow up write operations (operations that start after the previous one finishes) will be assigned a timestamp that is strictly higher, enforcing external consistency.

Warning
Depending on the clock synchronization state of TabletServers this may imply considerable latency. Moreover operations in COMMIT_WAIT external consistency mode will outright fail if TabletServer clocks are either unsynchronized or synchronized but with a maximum error which surpasses a pre-configured threshold.

Modes of flush operations.

Enumerator
AUTO_FLUSH_SYNC 

Every write will be sent to the server in-band with the Apply() call. No batching will occur. In this mode, the Flush() call never has any effect, since each Apply() call has already flushed the buffer. This is the default flush mode.

AUTO_FLUSH_BACKGROUND 

Apply() calls will return immediately (unless there is not enough buffer space to accommodate the newly added operations), but the writes will be sent in the background, potentially batched together with other writes from the same session. If there is not sufficient buffer space, Apply() blocks for buffer space to become available.

Because writes are applied in the background, any errors will be stored in a session-local buffer. Call CountPendingErrors() or GetPendingErrors() to retrieve them.

In this mode, calling the FlushAsync() or Flush() methods causes a flush that normally would have happened at some point in the near future to happen right now. The Flush() call can be used to block until the current batch is sent and the reclaimed space is available for new operations.

Attention
The AUTO_FLUSH_BACKGROUND mode, when used in conjunction with a KuduSession::SetMutationBufferMaxNum() of greater than 1 (the default is 2), may result in out-of-order writes. This is because the buffers may flush concurrently, so multiple write operations may be sent to the server in parallel. See KUDU-1767 for more information.
Todo:
Provide an API for the user to specify a callback to do their own error reporting.
MANUAL_FLUSH 

Apply() calls will return immediately, and the writes will not be sent until the user calls Flush(). If the buffer runs past the configured space limit, then Apply() will return an error.

Attention
The MANUAL_FLUSH mode, when used in conjunction with a KuduSession::SetMutationBufferMaxNum() of greater than 1 (the default is 2), may result in out-of-order writes if KuduSession::FlushAsync() is used. This is because the buffers may flush concurrently, so multiple write operations may be sent to the server in parallel. See KUDU-1767 for more information.

Member Function Documentation

Status kudu::client::KuduSession::Apply ( KuduWriteOperation write_op)
Todo:
Add "doAs" ability here for proxy servers to be able to act on behalf of other users, assuming access rights.

Apply the write operation.

The behavior of this function depends on the current flush mode. Regardless of flush mode, however, Apply() may begin to perform processing in the background for the call (e.g. looking up the tablet, etc). Given that, an error may be queued into the PendingErrors structure prior to flushing, even in MANUAL_FLUSH mode.

In case of any error, which may occur during flushing or because the write_op is malformed, the write_op is stored in the session's error collector which may be retrieved at any time.

A KuduSession accumulates write operations submitted via the Apply() method in mutation buffers. A KuduSession always has at least one mutation buffer. In any flush mode, this call may block if the maximum number of mutation buffers per session is reached (use KuduSession::SetMutationBufferMaxNum() to set the limit on maximum number of batchers).

Parameters
[in]write_opOperation to apply. This method transfers the write_op's ownership to the KuduSession.
Returns
Operation result status.
KuduClient* kudu::client::KuduSession::client ( ) const
Returns
Client for the session: pointer to the associated client object.
Status kudu::client::KuduSession::Close ( )
Returns
Status of the session closure. In particular, an error is returned if there are non-flushed or in-flight operations.
int kudu::client::KuduSession::CountBufferedOperations ( ) const

Get number of buffered operations (not the same as 'pending').

Note that this is different than HasPendingOperations() above, which includes operations which have been sent and not yet responded to.

This method is most relevant in MANUAL_FLUSH mode, where the result count stays valid until next explicit flush or Apply() call. There is not much sense using this method in other flush modes:

  • in AUTO_FLUSH_SYNC mode, the data is immediately put en-route to the destination by Apply() method itself, so this method always returns zero.
  • in AUTO_FLUSH_BACKGROUND mode, the result count returned by this method expires unpredictably and there isn't any guaranteed validity interval for the result: the background flush task can run any moment, invalidating the result.
Deprecated:
This method is experimental and will disappear in a future release.
Returns
The number of buffered operations. These are operations that have not yet been flushed – i.e. they are not en-route yet.
int kudu::client::KuduSession::CountPendingErrors ( ) const

Get error count for pending operations.

Errors may accumulate in session's lifetime; use this method to see how many errors happened since last call of GetPendingErrors() method. The error count includes both the accumulated and dropped errors. An error might be dropped due to the limit on the error buffer size; see the SetErrorBufferSpace() method for details.

Returns
Total count of errors accumulated during the session.
Status kudu::client::KuduSession::Flush ( )

Flush any pending writes.

This method initiates flushing of the current batch of buffered write operations, if any, and then awaits for completion of all pending operations of the session. I.e., after successful return from this method no pending operations should be left in the session.

In AUTO_FLUSH_SYNC mode, calling this method has no effect, since every Apply() call flushes itself inline.

Returns
Operation result status. In particular, returns a non-OK status if there are any pending errors after the rows have been flushed. Callers should then use GetPendingErrors to determine which specific operations failed.
void kudu::client::KuduSession::FlushAsync ( KuduStatusCallback cb)

Flush any pending writes asynchronously.

This method schedules a background flush of the latest batch of buffered write operations. Provided callback is invoked upon the flush completion of the latest batch of buffered write operations. If there were errors while flushing the operations, corresponding 'not OK' status is passed as a parameter for the callback invocation. Callers should then use GetPendingErrors() to determine which specific operations failed.

In the case that the async version of this method is used, then the callback will be called upon completion of the operations which were buffered since the last flush. In other words, in the following sequence:

session->Insert(a);
session->FlushAsync(callback_1);
session->Insert(b);
session->FlushAsync(callback_2);

... callback_2 will be triggered once b has been inserted, regardless of whether a has completed or not. That means there might be pending operations left in prior batches even after the callback has been invoked to report on the flush status of the latest batch.

Note
This also means that, if FlushAsync is called twice in succession, with no intervening operations, the second flush will return immediately. For example:
session->Insert(a);
session->FlushAsync(callback_1); // called when 'a' is inserted
session->FlushAsync(callback_2); // called immediately!
Note that, as in all other async functions in Kudu, the callback may be called either from an IO thread or the same thread which calls FlushAsync. The callback should not block.
Parameters
[in]cbCallback to call upon flush completion. The cb must remain valid until it is invoked.
void kudu::client::KuduSession::GetPendingErrors ( std::vector< KuduError * > *  errors,
bool *  overflowed 
)

Get information on errors from previous session activity.

The information on errors are reset upon calling this method.

Parameters
[out]errorsPointer to the container to fill with error info objects. Caller takes ownership of the returned errors in the container.
[out]overflowedIf there were more errors than could be held in the session's error buffer, then overflowed is set to true.
bool kudu::client::KuduSession::HasPendingOperations ( ) const

Check if there are any pending operations in this session.

Returns
true if there are operations which have not yet been delivered to the cluster. This may include buffered operations (i.e. those that have not yet been flushed) as well as in-flight operations (i.e. those that are in the process of being sent to the servers).
Todo:
Maybe "incomplete" or "undelivered" is clearer?
Status kudu::client::KuduSession::SetErrorBufferSpace ( size_t  size_bytes)

Set limit on maximum buffer (memory) size used by this session's errors. By default, when a session is created, there is no limit on maximum size.

The session's error buffer contains information on failed write operations. In most cases, the error contains the row which would be applied as is. If the error buffer space limit is set, the number of errors which fit into the buffer varies depending on error conditions, write operation types (insert/update/delete), and write operation row sizes.

When the limit is set, the session will drop the first error that would overflow the buffer as well as all subsequent errors. To resume the accumulation of session errors, it's necessary to flush the current contents of the error buffer using the GetPendingErrors() method.

Parameters
[in]size_bytesLimit on the maximum memory size consumed by collected session errors, where 0 means 'unlimited'.
Returns
Operation result status. An error is returned on an attempt to set the limit on the buffer space if:
  • the session has already dropped at least one error since the last call to the GetPendingErrors() method
  • the new limit is less than the amount of space occupied by already accumulated errors.
Status kudu::client::KuduSession::SetExternalConsistencyMode ( ExternalConsistencyMode  m)

Set external consistency mode for the session.

Parameters
[in]mExternal consistency mode to set.
Returns
Operation result status.
Status kudu::client::KuduSession::SetFlushMode ( FlushMode  m)

Set the flush mode.

Precondition
There should be no pending writes – call Flush() first to ensure nothing is pending.
Parameters
[in]mFlush mode to set.
Returns
Operation status.
Status kudu::client::KuduSession::SetMutationBufferFlushInterval ( unsigned int  millis)

Set the interval for time-based flushing of the mutation buffer.

In some cases, while running in AUTO_FLUSH_BACKGROUND mode, the size of the mutation buffer for pending operations and the flush watermark for fresh operations may be too high for the rate of incoming data: it would take too long to accumulate enough data in the buffer to trigger flushing. I.e., it makes sense to flush the accumulated operations if the prior flush happened long time ago. This method sets the wait interval for the time-based flushing which takes place along with the flushing triggered by the over-the-watermark criterion. By default, the interval is set to 1000 ms (i.e. 1 second).

Note
This setting is applicable only for AUTO_FLUSH_BACKGROUND sessions. I.e., calling this method in other flush modes is safe, but the parameter has no effect until the session is switched into AUTO_FLUSH_BACKGROUND mode.
Parameters
[in]millisThe duration of the interval for the time-based flushing, in milliseconds.
Returns
Operation result status.
Status kudu::client::KuduSession::SetMutationBufferFlushWatermark ( double  watermark_pct)

Set the buffer watermark to trigger flush in AUTO_FLUSH_BACKGROUND mode.

This method sets the watermark for fresh operations in the buffer when running in AUTO_FLUSH_BACKGROUND mode: once the specified threshold is reached, the session starts sending the accumulated write operations to the appropriate tablet servers. By default, the buffer flush watermark is to to 50%.

Note
This setting is applicable only for AUTO_FLUSH_BACKGROUND sessions. I.e., calling this method in other flush modes is safe, but the parameter has no effect until the session is switched into AUTO_FLUSH_BACKGROUND mode.
The buffer contains data for fresh (i.e. newly submitted) operations and also operations which are scheduled for flush or being flushed. The flush watermark determines how much of the buffer space is taken by newly submitted operations. Setting this level to 1.0 (i.e. 100%) results in flushing the buffer only when the newly applied operation would overflow the buffer.
Parameters
[in]watermark_pctWatermark level as percentage of the mutation buffer size.
Returns
Operation result status.
Status kudu::client::KuduSession::SetMutationBufferMaxNum ( unsigned int  max_num)

Set the maximum number of mutation buffers per KuduSession object.

A KuduSession accumulates write operations submitted via the Apply() method in mutation buffers. A KuduSession always has at least one mutation buffer. The mutation buffer which accumulates new incoming operations is called the current mutation buffer. The current mutation buffer is flushed either explicitly using the KuduSession::Flush() and/or KuduSession::FlushAsync() methods or it's done by the KuduSession automatically if running in AUTO_FLUSH_BACKGROUND mode. After flushing the current mutation buffer, a new buffer is created upon calling KuduSession::Apply(), provided the limit is not exceeded. A call to KuduSession::Apply() blocks if it's at the maximum number of buffers allowed; the call unblocks as soon as one of the pending batchers finished flushing and a new batcher can be created.

The minimum setting for this parameter is 1 (one). The default setting for this parameter is 2 (two).

Parameters
[in]max_numThe maximum number of mutation buffers per KuduSession object to hold the applied operations. Use 0 to set the maximum number of concurrent mutation buffers to unlimited.
Returns
Operation result status.
Status kudu::client::KuduSession::SetMutationBufferSpace ( size_t  size_bytes)

Set the amount of buffer space used by this session for outbound writes.

The effect of the buffer size varies based on the flush mode of the session:

  • AUTO_FLUSH_SYNC since no buffering is done, this has no effect.
  • AUTO_FLUSH_BACKGROUND if the buffer space is exhausted, then write calls will block until there is space available in the buffer.
  • MANUAL_FLUSH if the buffer space is exhausted, then write calls will return an error

By default, the buffer space is set to 7 MiB (i.e. 7 * 1024 * 1024 bytes).

Parameters
[in]size_bytesSize of the buffer space to set (number of bytes).
Returns
Operation result status.
void kudu::client::KuduSession::SetTimeoutMillis ( int  millis)

Set the timeout for writes made in this session.

Parameters
[in]millisTimeout to set in milliseconds; should be greater or equal to 0. If the parameter value is less than 0, it's implicitly set to 0.

The documentation for this class was generated from the following file: