Asynchronous Callback API Tutorial
Asynchronous Callback API Tutorial
This tutorial shows you how to write a simple server and client in C++ using gRPC’s asynchronous callback APIs. The example used in this tutorial follows the RouteGuide example.
Overview
gRPC C++ offers two kinds of APIs: sync APIs and async APIs. More specifically, we have two kinds of async APIs: the old one is completion-queue based; the new one is callback-based, which is easier to use. In this tutorial, we will focus on the callback-based async APIs (callback APIs for short). You will learn how to use the callback APIs to implement the server and the client for the following kinds of RPCs:
- Unary RPC
- Server-side streaming RPC
- Client-side streaming RPC
- Bidirectional streaming RPC
Example Code
In this tutorial, we are going to create a route guiding application. The clients can get information about features on their route, create a summary of their route, and exchange route information such as traffic updates with the server and other clients.
Below is the service interface defined in Protocol Buffers.
// Interface exported by the server.
service RouteGuide {
// A simple RPC.
//
// Obtains the feature at a given position.
//
// A feature with an empty name is returned if there's no feature at the given
// position.
rpc GetFeature(Point) returns (Feature) {}
// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
The same example is also implemented using the sync APIs. If you are interested, you can compare the two implementations.
Service to Implement
Since we want to implement the service using the callback APIs, the service
interface we should implement is RouteGuide::CallbackService
.
class RouteGuideImpl final : public RouteGuide::CallbackService {
...
};
We will implement all the four RPCs in this service in the Server
subsections
of the following sections.
Unary RPC
Let’s begin with the simplest RPC: GetFeature
, which is a unary RPC. By
GetFeature
, the client sends a Point
to the server, and then the server
returns the Feature
of that Point
to the client.
Server
The implementation of this RPC is quite simple and straightforward.
grpc::ServerUnaryReactor* GetFeature(CallbackServerContext* context,
const Point* point,
Feature* feature) override {
feature->set_name(GetFeatureName(*point, feature_list_));
feature->mutable_location()->CopyFrom(*point);
auto* reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
return reactor;
}
After setting the output fields of Feature
, we return the final status via the
ServerUnaryReactor
.
Custom Unary Reactor
The above example uses the Default Reactor. We could also use a custom reactor here if we want to handle specific actions such as RPC cancellation or run an action asynchronously when the RPC is done. In the below example we add logs for both actions.
grpc::ServerUnaryReactor* GetFeature(grpc::CallbackServerContext* context,
const Point* point,
Feature* feature) override {
class Reactor : public grpc::ServerUnaryReactor {
public:
Reactor(const Point& point, const std::vector<Feature>& feature_list,
Feature* feature) {
feature->set_name(GetFeatureName(point, feature_list));
*feature->mutable_location() = point;
Finish(grpc::Status::OK);
}
private:
void OnDone() override {
LOG(INFO) << "RPC Completed";
delete this;
}
void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
};
return new Reactor(*point, feature_list_, feature);
}
For ServerUnaryReactor
, we need to override OnDone()
, and optionally
OnCancel()
.
NOTE The callback methods (e.g.,
OnDone()
) are supposed to return quickly. Never perform blocking work (e.g., waiting for an event) in such callbacks.
The ServerUnaryReactor
’s constructor is called when GetFeature()
constructs
and provides the reactor in response to the started RPC. It collects the request
Point
, the response Feature
and the feature_list
. It then gets the
response Feature
from the Point
and adds it to the feature_list as well. To
finish the RPC we call Finish(Status::OK)
.
OnDone()
reacts to the RPC completion. We will do the final cleanup in
OnDone()
and log the end of the RPC.
OnCancel()
reacts to the cancellation of the RPC. Here, we log the occurrence
of a cancellation in this method.
Client
NOTE: For simplicity, we will not discuss how to create a channel and a stub in this tutorial. Please refer to Basics tutorial for that.
To start a GetFeature
RPC, besides a ClientContext
, a request (i.e.,
Point
), and a response (i.e., Feature
), the client also needs to pass a
callback (i.e., std::function<void(::grpc::Status)>
) to
stub_->async()->GetFeature()
. The callback will be invoked after the server
has fulfilled the request and the RPC is finished.
bool GetOneFeature(const Point& point, Feature* feature) {
ClientContext context;
bool result;
std::mutex mu;
std::condition_variable cv;
bool done = false;
stub_->async()->GetFeature(
&context, &point, feature,
[&result, &mu, &cv, &done, feature, this](Status status) {
bool ret;
if (!status.ok()) {
std::cout << "GetFeature rpc failed." << std::endl;
ret = false;
} else if (!feature->has_location()) {
std::cout << "Server returns incomplete feature." << std::endl;
ret = false;
} else if (feature->name().empty()) {
std::cout << "Found no feature at "
<< feature->location().latitude() / kCoordFactor_ << ", "
<< feature->location().longitude() / kCoordFactor_
<< std::endl;
ret = true;
} else {
std::cout << "Found feature called " << feature->name() << " at "
<< feature->location().latitude() / kCoordFactor_ << ", "
<< feature->location().longitude() / kCoordFactor_
<< std::endl;
ret = true;
}
std::lock_guard<std::mutex> lock(mu);
result = ret;
done = true;
cv.notify_one();
});
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&done] { return done; });
return result;
}
A callback can do various follow-up work for a unary RPC. For example, the callback in the above snippet checks the status and the returned feature, frees the heap-allocated objects for this call, and finally notifies that the RPC is done.
For simplicity, the example shows the same function waiting on the notification for the RPC completion, but that’s not necessary.
Server-side streaming RPC
Now let’s look at a more complex RPC - ListFeatures
. ListFeatures
is a
server-side streaming RPC. The client sends a Rectangle
to the server, and the
server will return a sequence of Feature
s to the client, each of which is sent
in a separate message.
Server
For any streaming RPC, including the server-side streaming RPC, the RPC handler’s interface is similar. The handler does not have any input parameters; the return type is some kind of server reactor, which handles all the business logic for one RPC.
Below is the handler interface of ListFeatures
.
grpc::ServerWriteReactor<Feature>* ListFeatures(
CallbackServerContext* context,
const routeguide::Rectangle* rectangle);
Because ListFeatures
is a server-streaming RPC, the return type should be
ServerWriteReactor
. ServerWriteReactor
has two template parameters:
Rectangle
is the type of the request from the client; Feature
is the type of
each response message from the server.
The complexity of handling an RPC is delegated to the ServerWriteReactor
.
Below is how we implement the ServerWriteReactor
to handle a ListFeatures
RPC.
grpc::ServerWriteReactor<Feature>* ListFeatures(
CallbackServerContext* context,
const routeguide::Rectangle* rectangle) override {
class Lister : public grpc::ServerWriteReactor<Feature> {
public:
Lister(const routeguide::Rectangle* rectangle,
const std::vector<Feature>* feature_list)
: left_((std::min)(rectangle->lo().longitude(),
rectangle->hi().longitude())),
right_((std::max)(rectangle->lo().longitude(),
rectangle->hi().longitude())),
top_((std::max)(rectangle->lo().latitude(),
rectangle->hi().latitude())),
bottom_((std::min)(rectangle->lo().latitude(),
rectangle->hi().latitude())),
feature_list_(feature_list),
next_feature_(feature_list_->begin()) {
NextWrite();
}
void OnWriteDone(bool ok) override {
if (!ok) {
Finish(Status(grpc::StatusCode::UNKNOWN, "Unexpected Failure"));
}
NextWrite();
}
void OnDone() override {
LOG(INFO) << "RPC Completed";
delete this;
}
void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
private:
void NextWrite() {
while (next_feature_ != feature_list_->end()) {
const Feature& f = *next_feature_;
next_feature_++;
if (f.location().longitude() >= left_ &&
f.location().longitude() <= right_ &&
f.location().latitude() >= bottom_ &&
f.location().latitude() <= top_) {
StartWrite(&f);
return;
}
}
// Didn't write anything, all is done.
Finish(Status::OK);
}
const long left_;
const long right_;
const long top_;
const long bottom_;
const std::vector<Feature>* feature_list_;
std::vector<Feature>::const_iterator next_feature_;
};
return new Lister(rectangle, &feature_list_);
}
Different reactors have different callback methods. We need to override the
methods we are interested in to implement our RPC. For ListFeatures
, we need
to override OnWriteDone()
, OnDone()
and optionally OnCancel
.
The ServerWriteReactor
’s constructor is called when ListFeatures()
constructs and provides the reactor in response to the started RPC. It collects
all the Feature
s within rectangle
into features_to_send_
, and starts
sending them, if any.
OnWriteDone()
reacts to a write completion. If the write was done
successfully, we continue to send the next Feature
until features_to_send_
is empty, at which point we will call Finish(Status::OK)
to finish the call.
OnDone()
reacts to the RPC completion. We will do the final cleanup in
OnDone()
.
OnCancel()
reacts to the cancellation of the RPC. We log the occurrence of a
cancellation in this method.
Client
Similar to the server side, the client side needs to implement some kind of client reactor to interact with the server. A client reactor encapsulates all the operations needed to process an RPC.
Since ListFeatures
is server-streaming, we should implement a
ClientReadReactor
, which has a name that is symmetric to ServerWriteReactor
.
class Reader : public grpc::ClientReadReactor<Feature> {
public:
Reader(RouteGuide::Stub* stub, float coord_factor,
const routeguide::Rectangle& rect)
: coord_factor_(coord_factor) {
stub->async()->ListFeatures(&context_, &rect, this);
StartRead(&feature_);
StartCall();
}
void OnReadDone(bool ok) override {
if (ok) {
std::cout << "Found feature called " << feature_.name() << " at "
<< feature_.location().latitude() / coord_factor_ << ", "
<< feature_.location().longitude() / coord_factor_
<< std::endl;
StartRead(&feature_);
}
}
void OnDone(const Status& s) override {
std::unique_lock<std::mutex> l(mu_);
status_ = s;
done_ = true;
cv_.notify_one();
}
Status Await() {
std::unique_lock<std::mutex> l(mu_);
cv_.wait(l, [this] { return done_; });
return std::move(status_);
}
private:
ClientContext context_;
float coord_factor_;
Feature feature_;
std::mutex mu_;
std::condition_variable cv_;
Status status_;
bool done_ = false;
};
The ClientReadReactor
is templatized with one parameter, Feature
, which is
the type of response message from the server.
In the constructor of Reader
, we pass the ClientContext
, &rectangle_
(the
request object), and the Reader
to the RPC method
stub->async()->ListFeatures()
. Then we pass the &feature_
to StartRead()
to specify where to store the received response. Finally, we call StartCall()
to activate the RPC!
OnReadDone()
reacts to read completion. If the read was done successfully, we
continue to read the next Feature
until we fail to do so, indicated by ok
being false
.
OnDone()
reacts to the RPC completion. It checks the RPC status outcome and
notifies the conditional variable waiting for OnDone()
.
Await()
is not a method of ClientReadReactor
. This is just added for
simplicity, so that the example knows that the RPC is done. Alternatively, if
there were no need for a notification on completion, OnDone()
could simply
perform return after cleanup, for example, freeing up heap allocated objects.
To initiate an RPC, the client simply instantiates a ReadReactor
and waits for
the RPC completion.
routeguide::Rectangle rect;
Feature feature;
rect.mutable_lo()->set_latitude(400000000);
rect.mutable_lo()->set_longitude(-750000000);
rect.mutable_hi()->set_latitude(420000000);
rect.mutable_hi()->set_longitude(-730000000);
std::cout << "Looking for features between 40, -75 and 42, -73"
<< std::endl;
Reader reader(stub_.get(), kCoordFactor_, rect);
Status status = reader.Await();
if (status.ok()) {
std::cout << "ListFeatures rpc succeeded." << std::endl;
} else {
std::cout << "ListFeatures rpc failed." << std::endl;
}
Client-side streaming RPC
Once you understand the idea of server-side streaming RPC in the previous section, you should find the client-side streaming RPC easy to learn.
RecordRoute
is the client-side streaming RPC we will discuss. The client sends
a sequence of Point
s to the server, and the server will return a
RouteSummary
after the client has finished sending the Point
s.
Server
The RPC handler’s interface for a client-side streaming RPC does not have any
input parameters, and its return type is a server reactor, namely, a
ServerReadReactor
.
The ServerReadReactor
has two template parameters: Point
is the type of each
request message from the client; RouteSummary
is the type of the response from
the server.
Similar to ServerWriteReactor
, ServerReadReactor
is the class that handles
an RPC.
grpc::ServerReadReactor<Point>* RecordRoute(CallbackServerContext* context,
RouteSummary* summary) override {
class Recorder : public grpc::ServerReadReactor<Point> {
public:
Recorder(RouteSummary* summary, const std::vector<Feature>* feature_list)
: start_time_(system_clock::now()),
summary_(summary),
feature_list_(feature_list) {
StartRead(&point_);
}
void OnReadDone(bool ok) override {
if (ok) {
point_count_++;
if (!GetFeatureName(point_, *feature_list_).empty()) {
feature_count_++;
}
if (point_count_ != 1) {
distance_ += GetDistance(previous_, point_);
}
previous_ = point_;
StartRead(&point_);
} else {
summary_->set_point_count(point_count_);
summary_->set_feature_count(feature_count_);
summary_->set_distance(static_cast<long>(distance_));
auto secs = std::chrono::duration_cast<std::chrono::seconds>(
system_clock::now() - start_time_);
summary_->set_elapsed_time(secs.count());
Finish(Status::OK);
}
}
void OnDone() override {
LOG(INFO) << "RPC Completed";
delete this;
}
void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
private:
system_clock::time_point start_time_;
RouteSummary* summary_;
const std::vector<Feature>* feature_list_;
Point point_;
int point_count_ = 0;
int feature_count_ = 0;
float distance_ = 0.0;
Point previous_;
};
return new Recorder(summary, &feature_list_);
}
The ServerReadReactor
’s constructor is called when RecordRoute()
constructs
and provides the reactor in response to the started RPC. The constructor stores
the RouteSummary*
to return the response later, and initiates a read operation
by calling StartRead(&point_)
.
OnReadDone()
reacts to a read completion. If the read was done successfully,
we update the stats with the newly received Point
, and continue to read next
Point
until we fail to do so, indicated by ok
being false
. Upon read
failure, the server will set the response into summary_
and call
Finish(Status::OK)
to finish the RPC.
OnDone()
reacts to the RPC completion. We will do the final cleanup in
OnDone()
.
OnCancel()
reacts to the cancellation of the RPC. We log the occurrence of a
cancellation in this method.
Client
Unsurprisingly, we need to implement a client reactor for the client side, and
that client reactor is called ClientWriteReactor
.
class Recorder : public grpc::ClientWriteReactor<Point> {
public:
Recorder(RouteGuide::Stub* stub, float coord_factor,
const std::vector<Feature>* feature_list)
: coord_factor_(coord_factor),
feature_list_(feature_list),
generator_(
std::chrono::system_clock::now().time_since_epoch().count()),
feature_distribution_(0, feature_list->size() - 1),
delay_distribution_(500, 1500) {
stub->async()->RecordRoute(&context_, &stats_, this);
// Use a hold since some StartWrites are invoked indirectly from a
// delayed lambda in OnWriteDone rather than directly from the reaction
// itself
AddHold();
NextWrite();
StartCall();
}
void OnWriteDone(bool ok) override {
// Delay and then do the next write or WritesDone
alarm_.Set(
std::chrono::system_clock::now() +
std::chrono::milliseconds(delay_distribution_(generator_)),
[this](bool /*ok*/) { NextWrite(); });
}
void OnDone(const Status& s) override {
std::unique_lock<std::mutex> l(mu_);
status_ = s;
done_ = true;
cv_.notify_one();
}
Status Await(RouteSummary* stats) {
std::unique_lock<std::mutex> l(mu_);
cv_.wait(l, [this] { return done_; });
*stats = stats_;
return std::move(status_);
}
private:
void NextWrite() {
if (points_remaining_ != 0) {
const Feature& f =
(*feature_list_)[feature_distribution_(generator_)];
std::cout << "Visiting point "
<< f.location().latitude() / coord_factor_ << ", "
<< f.location().longitude() / coord_factor_ << std::endl;
StartWrite(&f.location());
points_remaining_--;
} else {
StartWritesDone();
RemoveHold();
}
}
ClientContext context_;
float coord_factor_;
int points_remaining_ = 10;
Point point_;
RouteSummary stats_;
const std::vector<Feature>* feature_list_;
std::default_random_engine generator_;
std::uniform_int_distribution<int> feature_distribution_;
std::uniform_int_distribution<int> delay_distribution_;
grpc::Alarm alarm_;
std::mutex mu_;
std::condition_variable cv_;
Status status_;
bool done_ = false;
};
The ClientWriteReactor
is templatized with one parameter, Point
, which is
the type of request message from the client.
In the constructor of Recorder
, we pass the ClientContext
, &stats_
(the
response object), and the Recorder
to the RPC method
sstub->async()->RecordRoute()
. Then we add an operation to send the first
Point
in points_to_send_
, if any. Note that if there is nothing to send at
the startup of the RPC, we need to call StartWritesDone()
to inform the server
that we are done with writes. Finally, calling StartCall()
activates the RPC.
OnWriteDone()
reacts to write completion. If the write was done successfully,
we continue to write the next Point
until points_to_send_
is empty. For the
last Point
to send, we call StartWriteLast()
to piggyback the signal that
writes are done. StartWriteLast()
is effectively the same with combined
StartWrite()
and StartWritesDone()
, but is more efficient.
OnDone()
reacts to the RPC completion. It checks the RPC status outcome and
the response in stats_
, and notifies the conditional variable waiting for
OnDone()
.
Await()
is not a method of ClientWriteReactor
. We add Await()
so that the
caller can wait until the RPC is done.
To initiate an RPC, the client simply instantiates a Recorder
and waits for
the RPC completion.
Recorder recorder(stub_.get(), kCoordFactor_, &feature_list_);
RouteSummary stats;
Status status = recorder.Await(&stats);
if (status.ok()) {
std::cout << "Finished trip with " << stats.point_count() << " points\n"
<< "Passed " << stats.feature_count() << " features\n"
<< "Travelled " << stats.distance() << " meters\n"
<< "It took " << stats.elapsed_time() << " seconds"
<< std::endl;
} else {
std::cout << "RecordRoute rpc failed." << std::endl;
}
Bidirectional streaming RPC
Finally, we will look at the bidirectional streaming RPC RouteChat
. In this
case, the client sends a sequence of RouteNote
s to the server. Each time a
RouteNote
at a Point
is sent, the server will return a sequence of
RouteNote
s at the same Point
that have been sent by all the clients before.
Server
Again, the RPC handler’s interface for a bidirectional streaming RPC does not
have any input parameters, and its return type is a server reactor, namely, a
ServerBidiReactor
.
The ServerBidiReactor
has two template parameters, both of which are
RouteNote
, because RouteNote
is the message type of both request and
response. After all, RouteChat
means to let clients chat and share information
with each other!
Since we have already discussed ServerWriteReactor
and ServerReadReactor
,
ServerBidiReactor
should be quite straightforward.
grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
CallbackServerContext* context) override {
class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
public:
Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
: mu_(mu), received_notes_(received_notes) {
StartRead(¬e_);
}
void OnReadDone(bool ok) override {
if (ok) {
// Unlike the other example in this directory that's not using
// the reactor pattern, we can't grab a local lock to secure the
// access to the notes vector, because the reactor will most likely
// make us jump threads, so we'll have to use a different locking
// strategy. We'll grab the lock locally to build a copy of the
// list of nodes we're going to send, then we'll grab the lock
// again to append the received note to the existing vector.
mu_->Lock();
std::copy_if(received_notes_->begin(), received_notes_->end(),
std::back_inserter(to_send_notes_),
[this](const RouteNote& note) {
return note.location().latitude() ==
note_.location().latitude() &&
note.location().longitude() ==
note_.location().longitude();
});
mu_->Unlock();
notes_iterator_ = to_send_notes_.begin();
NextWrite();
} else {
Finish(Status::OK);
}
}
void OnWriteDone(bool /*ok*/) override { NextWrite(); }
void OnDone() override {
LOG(INFO) << "RPC Completed";
delete this;
}
void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
private:
void NextWrite() {
if (notes_iterator_ != to_send_notes_.end()) {
StartWrite(&*notes_iterator_);
notes_iterator_++;
} else {
mu_->Lock();
received_notes_->push_back(note_);
mu_->Unlock();
StartRead(¬e_);
}
}
RouteNote note_;
absl::Mutex* mu_;
std::vector<RouteNote>* received_notes_ ABSL_GUARDED_BY(mu_);
std::vector<RouteNote> to_send_notes_;
std::vector<RouteNote>::iterator notes_iterator_;
};
return new Chatter(&mu_, &received_notes_);
}
The ServerBidiReactor
’s constructor is called when RouteChat()
constructs
and provides the reactor in response to the started RPC. The constructor
initiates a read operation by calling StartRead(&received_note_)
.
OnReadDone()
reacts to a read completion. If the read was done successfully
(i.e., ok
is true
), we will continue to read next RouteNote
; otherwise, we
will record that reads are all finished, and finish the RPC. As for the newly
received RouteNote
upon a successful read, we add it to received_notes_
, and
append the previously received notes at the same Point
to to_send_notes_
.
Whenever to_send_notes_
becomes non-empty, we start to send the RouteNote
s
in to_send_notes_
.
OnWriteDone()
reacts to a write completion. If the write was done
successfully, we continue to send next RouteNote
until to_send_notes_
is
empty, at which point we will continue reading next RouteNote
or finish the
RPC if reads are also finished.
OnDone()
reacts to the RPC completion. We will do the final cleanup in
OnDone()
.
OnCancel()
reacts to the cancellation of the RPC. We log the occurrence of a
cancellation in this method.
Client
Yes, the client reactor for a bidirectional streaming RPC is
ClientBidiReactor
.
class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote> {
public:
explicit Chatter(RouteGuide::Stub* stub)
: notes_{MakeRouteNote("First message", 0, 0),
MakeRouteNote("Second message", 0, 1),
MakeRouteNote("Third message", 1, 0),
MakeRouteNote("Fourth message", 0, 0)},
notes_iterator_(notes_.begin()) {
stub->async()->RouteChat(&context_, this);
NextWrite();
StartRead(&server_note_);
StartCall();
}
void OnWriteDone(bool ok) override {
if (ok) {
NextWrite();
}
}
void OnReadDone(bool ok) override {
if (ok) {
std::cout << "Got message " << server_note_.message() << " at "
<< server_note_.location().latitude() << ", "
<< server_note_.location().longitude() << std::endl;
StartRead(&server_note_);
}
}
void OnDone(const Status& s) override {
std::unique_lock<std::mutex> l(mu_);
status_ = s;
done_ = true;
cv_.notify_one();
}
Status Await() {
std::unique_lock<std::mutex> l(mu_);
cv_.wait(l, [this] { return done_; });
return std::move(status_);
}
private:
void NextWrite() {
if (notes_iterator_ != notes_.end()) {
const auto& note = *notes_iterator_;
std::cout << "Sending message " << note.message() << " at "
<< note.location().latitude() << ", "
<< note.location().longitude() << std::endl;
StartWrite(¬e);
notes_iterator_++;
} else {
StartWritesDone();
}
}
ClientContext context_;
const std::vector<RouteNote> notes_;
std::vector<RouteNote>::const_iterator notes_iterator_;
RouteNote server_note_;
std::mutex mu_;
std::condition_variable cv_;
Status status_;
bool done_ = false;
};
The ClientBidiReactor
is templatized with two parameters, the message types of
the request and the response, which are both RouteNote
in the case of RPC
RouteChat
.
In the constructor of Chatter
, we pass the ClientContext
and the Chatter
to the RPC method stub->async()->RouteChat()
. Then we add an operation to send
the first RouteNote
in notes_
, if any. Note that if there is nothing to send
at the startup of the RPC, we need to call StartWritesDone()
to inform the
server that we are done with writes. We also call StartRead()
to add a read
operation. Finally, calling StartCall()
activates the RPC.
OnReadDone()
reacts to read completion. If the read was done successfully, we
continue to read next RouteNote
until we fail to do so, indicated by ok
being false
.
OnWriteDone()
reacts to write completion. If the write was done successfully,
we continue to write next RouteNote
until notes_
is empty. For the last
RouteNote
to send, we call StartWriteLast()
to piggyback the signal that
writes are done. StartWriteLast()
is effectively the same with combined
StartWrite()
and StartWritesDone()
, but is more efficient.
OnDone()
reacts to the RPC completion. It checks the RPC status outcome and
the message stats, and notifies the conditional variable waiting for OnDone()
.
Await()
is not a method of ClientBidiReactor
. We add Await()
so that the
caller can wait until the RPC is done.
To initiate an RPC, the client simply instantiates a Chatter
and waits for the
RPC completion.
Chatter chatter(stub_.get());
Status status = chatter.Await();
if (!status.ok()) {
std::cout << "RouteChat rpc failed." << std::endl;
}