Services

The Spark Client SDK uses Google Protocol Buffers to define the interface for each service in a language independent manner. Services have both functions which return a single result and streams of data which return values continuously while subscribed. This page describes the mapping from the Protocol Buffer definition to the concrete definition in each supported language.

ServiceContext

To connect your client to the Spark Server you need to get or create a ServiceContext instance. A ServiceContext represents a connection to the Spark Server and a background thread for handling messages on that connection. Generally there should be a single ServiceContext per process. All of the methods on the ServiceContext are safe thread safe.

Construct a ServiceContext by calling the default constructor. The ServiceContext will find the Spark Server by loading the server configuration over an HTTP connection. To determine the URL it will search the following locations and use the first one that is present.

  • A command line argument -configserver <url>
  • An environment variable BTSCONFIG
  • %APPDATA%/bts/conf/spark.conf (Windows)
  • /sparkdata/conf/spark.conf (Linux)
  • $HOME/sparkdata/conf/spark.conf (Linux)

If your application is running on a system with the BTS Client installed this will probably work without any further configuration. If you plan to run your client application on a separate machine we recommend setting the BTSCONFIG environment variable.

If you are using the Spark Server SDK to run a local copy of the Spark server set

BTSCONFIG = http://localhost:9000/config/

To get a proxy for a given service call ServiceContext.locate<T>("qualifier") where T is the type of the service and "qualifier" is an optional qualifier used to disambiguate between 2 instances of the same service. The locate call cannot fail but it may return a disconnected proxy. When you make a call on the proxy you should check the call result to see if the method call succeeded.

RPC

Here is the OrderService reduced to a single method definition. A RPC method always takes exactly 1 argument and returns 1 value. In this example the create_order call takes a Create instance and returns a Response.

service OrderService { 
  rpc create_order(Create) returns (Response);
  ...
}

The Create and Response types are also defined as protocol buffer messages. Each field can either be a primitive type another message or a list or map.

message Create {
  OrderDefinition new_order = 1;
  double price = 2;
  uint32 quantity = 3;
}

message Response {
    uint32 order_key = 1; 
    OrderError error_code = 2; 
    string msg = 3;
}

Each RPC call generates both synchronous (blocking) and asynchronous variants. In C# the asynchronous calls have the _async suffix and return a Task. In C++ the call is overloaded to either return a value synchronously or accept a callback function to be called on the ServiceContext thread when the call completes.

To call the resulting service locate a proxy using the ServiceContext and then call cancel_order or cancel_order_async. If you are calling the Spark API from a GUI application you should always prefer the _async version of the calls.

C# example

var ctx = new ServiceContext();
var orders = ctx.locate<OrderService>();
var response = 
  await orders.create_order_async(
    new Create { price = 2.25, quantity = 42 });
if (response.success) {
  Console.WriteLine($"success: {response.value}");
}
else {
  Console.WriteLine($"Error: {response.error_message}");
}

C++ example

auto ctx = ServiceContext{};
auto orders = ctx.locate<OrderService>();
Create c;
c.set_price(2.25);
c.set_quantity(42);
// Instead of providing an _async method C++ uses overloading.
// To make a syncronous call simply omit the callback
orders.create_order(c, [=](const Result<Response>& response) {
  if(response.success()) {
    std::cout << "success.";
  }
  else {
    std::cout << "error: " << response.error_message();
  }
});

Streams

Streams allow you to subscribe to live updating data. Each stream represents a changing collection of protocol buffer objects. Similar to a database row each item has an id which uniquely identifies it within the collection. Updates from the Spark server can add, remove or replace entries in the collection. Since only the changed items are sent over the wire the stream can remain responsive even when it represents a large collection.

Streams are implemented using the language bindings for ReativeX which provide utility functions to drill down to the specific data your application needs. In C# we provide both mutable and immutable versions of the generated data types (C++ users can rely on const).

When defined in a proto file streams use the stream keyword on the result type.

service OrderService { 
  ...
  rpc fills (FillsSub) returns (stream Fill);
}

C# example

  var sc = new ServiceContext();
  var orders = sc.locate<OrderService>();
  var fills_observable = orders.subscribei_fills();
  var unsub = fills_observable.Subscribe(update => {
    Console.WriteLine(
        "Added {0} items, Removed {1} items, Updated {2} items.  Total items {3}", 
        update.added.Count(), 
        update.removed.Count(), 
        update.changes.Count(), 
        update.all.Count());
  });

  Thread.Sleep(TimeSpan.FromMinutes(5));
  unsub.Dispose();

C++ Example

#include "lib/api/public/ServiceContext.h"
#include "lib/api/services/OrderService.client.pb.h"
using namespace bts::api;
ServiceContext ctx;
auto orders = ctx.locate<orderman::OrderService>();
auto unsub = orders.subscribe_fills().subscribe([](const auto& update) {
  std::cout 
    << "Added " << update.added().size() << " items, "
    << "Removed " << update.removed().size() << " items, "
    << "Updated " << update.updated().size() << " items, "
    << "Total items " << update.all().size() << std::endl;
});
std::this_thread::sleep_for(std::chrono::minutes(5));

Previous