web/ru/finagle.textile (204 lines of code) (raw):
---
prev: java.textile
next: searchbird.textile
title: Введение в Finagle
layout: post
---
«Finagle»:https://github.com/twitter/finagle — это RPC система от компании Twitter. «Здесь»:https://blog.twitter.com/2011/finagle-a-protocol-agnostic-rpc-system рассказывается о мотивах и основных принципа дизайна, «finagle README»:https://github.com/twitter/finagle/blob/master/README.md содержит больше детальной информации. Finagle помогает просто создавать надежные клиентские и серверные приложения.
h2. Futures
Finagle использует <code>com.twitter.util.Future</code>[1], чтобы описать отложенные операции. Futures очень выразительны и компактны, они позволяют кратко описать параллельные и последовательные операции с большой ясностью. Futures управляют значениями, которые еще не доступны, с методами для регистрации обратного вызова, который вызывается, когда значение становится доступным. Они переворачивают с ног на голову «традиционную» модель асинхронных вычислений, которые обычно предоставляют API, похожее на это:
<pre>
Callback<R> cb = new Callback<R>() {
void onComplete(R result) { … }
void onFailure(Throwable error) { … }
}
dispatch(req, cb);
</pre>
Здесь <code>Callback.onComplete</code> вызывается, когда результат операции <code>dispatch</code> становится доступен, и <code>Callback.onFailure</code> если операция провалилась. Во futures, мы переворачиваем поток управления:
<pre>
val future = dispatch(req)
future onSuccess { value => … }
future onFailure { error => … }
</pre>
Futures сами по себе это комбинаторы, с которыми мы столкнулись в различных API коллекциях. Комбинаторы работают, используя единый API, оборачивая некоторые <code>Future</code> новым поведением без изменения <code>Future</code>.
h3. Последовательная композиция
Наиболее важным <code>Future</code> комбинатором является <code>flatMap</code>[2]:
<blockquote>
<code>def Future[A].flatMap[B](f: A => Future[B]): Future[B]</code>
</blockquote>
<code>flatMap</code> объединяет две сущности. Сигнатура метода описывает: данное успешное значение future <code>f</code> должно предоставить следущий <code>Future</code>. Результат этой операции другой <code>Future</code>, который завершится когда оба этих futures будут завершены. Если один из <code>Future</code> завершится с ошибкой, то данный <code>Future</code> также завершится с ошибкой. Это неявное чередование ошибок позволяет нам управлять ошибками только в тех местах, где это необходимо. <code>flatMap</code> это стандартное имя для данного комбинатора с заданной семантикой. В Scala есть короткая запись для этого вызова: конструкция <code>for</code>.
Как пример, давайте предположим, что у нас есть методы <code>authenticate: Request -> User</code>, и <code>rateLimit: User -> Boolean</code>, тогда получим следующий код:
<pre>
val f = authenticate(request) flatMap { u =>
rateLimit(u) map { r => (u, r)
}
</pre>
С помощью конструкции for, мы можем написать:
<pre>
val f = for {
u <- authenticate(request)
r <- rateLimit(u)
} yield (u, r)
</pre>
используем future <code>f: Future[(User, Boolean)]</code>, который предоставляет объект пользователь и логическое выражение, которое сигнализирует о достигнутости пользователем предела. Заметьте, как здесь требуемая последовательная композиция: <code>rateLimit</code> берет аргумент выходного результата <code>authenticate</code>
h3. Параллельная композиция
Есть также несколько параллельных комбинаторов. Обычно они конвертируют последовательность <code>Future</code>-ов в <code>Future</code> последовательность, по-разному:
<pre>
object Future {
…
def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
def join(fs: Seq[Future[_]]): Future[Unit]
def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]
}
</pre>
<code>collect</code> самый простой вариант: берем набор <code>Future</code> похожего типа, мы получаем <code>Future</code> последовательность значений этого типа. Этот future завершится, когда все остальные futures будут завершены, или когда один из них завершится с ошибкой.
<code>join</code> берет последовательнсоть <code>Future</code> типы которых можно смешать, возвращая <code>Future[Unit]</code>, который завершится, когда все остальные futures будут завершены (или завершится с ошибкой, если в одном из них ошибка). Это полезно для отслеживания завершения набора гетерогенных операций.
<code>select</code> возвращает <code>Future</code>, который завершится, когда первый из данных <code>Future</code>завершится, вместе с остальными незавершенными future.
В совокупности, это дает нам мощное и краткое выражение основных сетевых операций. Этот гипотетический код выполняет установку ограничений (с целью сохранения локального кеша) параллелльно с управлением запросами пользователя на серверной стороне:
<pre>
def serve(request: Request): Future[Response] = {
val userLimit: Future[(User, Boolean)] =
for {
user <- auth(request)
limited <- isLimit(user)
} yield (user, limited)
val done =
dispatch(request) join userLimit
done flatMap { case (rep, (usr, lim)) =>
if (lim) {
updateLocalRateLimitCache(usr)
Future.exception(new Exception("rate limited"))
} else {
Future.value(rep)
}
}
}
</pre>
Этот гипотетический пример объединяет последовательную и параллельную композиции. Также обратите внимание, что нет явного обработчика ошибок, только конвертирование ограничения отвечает за выброс исключения. Если future выдает здесь ошибку, она автоматически распространяется до возвращаемого <code>Future</code>.
h2. Service
<code>Service</code> это фукнция <code>Req => Future[Rep]</code> для запросов и типовых ответов. <code>Service</code> используется как на клиенте, так и на сервере: серверы реализуют <code>Service</code>, а клиенты используют сборщики, для создания запросов.
<blockquote>
<code>abstract class Service[-Req, +Rep] extends (Req => Future[Rep])</code>
</blockquote>
Простой HTTP клиент может делать:
<pre>
service: Service[HttpRequest, HttpResponse]
val f = service(HttpRequest("/", HTTP_1_1))
f onSuccess { res =>
println("got response", res)
} onFailure { exc =>
println("failed :-(", exc)
}
</pre>
Серверы реализуют <code>Service</code>:
<pre>
class MyServer
extends Service[HttpRequest, HttpResponse]
{
def apply(request: HttpRequest) = {
request.path match {
case "/" =>
Future.value(HttpResponse("root"))
case _ =>
Future.value(HttpResponse("default"))
}
}
}
</pre>
Комбинировать их просто. Простейший прокси может выглядеть так:
<pre>
class MyServer(client: Service[..])
extends Service[HttpRequest, HttpResponse]
{
def apply(request: HttpRequest) = {
client(rewriteReq(request)) map { res =>
rewriteRes(res)
}
}
}
</pre>
где <code>rewriteReq</code> и <code>rewriteRes</code> могут предоставить протокол перевода, например.
h2. Фильтры
Фильтры — это service преобразователи. Они полезны как для предоставления функциональности <em>обобщенный service</em>, так и для производства данного service в различных состояниях.
<pre>
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])
</pre>
Его тип лучше рассмотреть схематически:
<pre>
((ReqIn, Service[ReqOut, RepIn])
=> Future[RepOut])
(* Service *)
[ReqIn -> (ReqOut -> RepIn) -> RepOut]
</pre>
Здесь показан способ как вы можете написать фильтр, который является механизмом задержки service.
<pre>
class TimeoutFilter[Req, Rep](
timeout: Duration, timer: util.Timer)
extends Filter[Req, Rep, Req, Rep]
{
def apply(
request: Req, service: Service[Req, Rep]
): Future[Rep] = {
service(request).timeout(timer, timeout) {
Throw(new TimedoutRequestException)
}
}
}
</pre>
Этот пример показывает как вы можете использовать аутентификацию (через сервис аутентификации) для того, чтобы конвертировать <code>Service[AuthHttpReq, HttpRep]</code> в <code>Service[HttpReq, HttpRep]</code>.
<pre>
class RequireAuthentication(authService: AuthService)
extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
{
def apply(
req: HttpReq,
service: Service[AuthHttpReq, HttpRep]
) = {
authService.auth(req) flatMap {
case AuthResult(AuthResultCode.OK, Some(passport), _) =>
service(AuthHttpReq(req, passport))
case ar: AuthResult =>
Future.exception(
new RequestUnauthenticated(ar.resultCode))
}
}
}
</pre>
Фильтры объединяются вместе с <code>andThen</code>. Предоставляя <code>Service</code> как аргумент для <code>andThen</code>, создающий (отфильтрованный) <code>Service</code> (типы представлены для иллюстрации).
<pre>
val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep]
val serviceRequiringAuth: Service[AuthHttpReq, HttpRep]
val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] =
authFilter andThen timeoutFilter
val authenticatedTimedOutService: Service[HttpReq, HttpRep] =
authenticateAndTimedOut andThen serviceRequiringAuth
</pre>
h2. Компоновщики
В конце концов, компоновщики собирают все вместе. <code>ClientBuilder</code> предоставляет экземпляр <code>Service</code>, дающий набор параметров, и <code>ServerBuilder</code> берущий экземпляр <code>Service</code> и отправляет ему входящие запросы. Для того чтобы определить тип <code>Service</code>, мы должны иметь <code>Codec</code>. Codec предоставляет нижележащий протокол реализации (например, HTTP, thrift, memcached). Оба компоновщика имеют много параметров, но их требуется совсем немного.
Ниже представлен пример вызова <code>ClientBuilder</code> (типы представлены для иллюстрации):
<pre>
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
.codec(Http)
.hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003")
.hostConnectionLimit(1)
.tcpConnectTimeout(1.second)
.retries(2)
.reportTo(new OstrichStatsReceiver)
.build()
</pre>
Здесь создается клиент, который выравнивает нагрузку 3 данных хостов, устанавливая более чем 1 соединение на хост, и падает только после 2 неудач. Статистика собирается с помощью "ostrich":https://github.com/twitter/ostrich. Требуются следующие опции компоновщика (и они обычно присутствуют постоянно): <code>hosts</code> или <code>cluster</code>, <code>codec</code> и <code>hostConnectionLimit</code>.
<pre>
val myService: Service[HttpRequest, HttpResponse] = // provided by the user
ServerBuilder()
.codec(Http)
.hostConnectionMaxLifeTime(5.minutes)
.readTimeout(2.minutes)
.name("myHttpServer")
.bindTo(new InetSocketAddress(serverPort))
.build(myService)
</pre>
Все будет работать на порту <code>serverPort</code> HTTP сервера, который управляет запросами на <code>myService</code>. Каждое соединение держится открытым до 5 минут, и мы также требуем, чтобы запрос был отправлен в течение 2 минут. Необходимые опции <code>ServerBuilder</code>: <code>name</code>, <code>bindTo</code> and <code>codec</code>.
fn1. отличающийся от <code>java.util.concurrent.Future</code>
fn2. это равносильно монадическому(monadic) связыванию