web/ru/finagle.textile (204 lines of code) (raw):

--- prev: java.textile next: searchbird.textile title: Введение в Finagle layout: post --- «Finagle»:https://github.com/twitter/finagle — это RPC система от компании Twitter. «Здесь»:https://blog.twitter.com/2011/finagle-a-protocol-agnostic-rpc-system рассказывается о мотивах и основных принципа дизайна, «finagle README»:https://github.com/twitter/finagle/blob/master/README.md содержит больше детальной информации. Finagle помогает просто создавать надежные клиентские и серверные приложения. h2. Futures Finagle использует <code>com.twitter.util.Future</code>[1], чтобы описать отложенные операции. Futures очень выразительны и компактны, они позволяют кратко описать параллельные и последовательные операции с большой ясностью. Futures управляют значениями, которые еще не доступны, с методами для регистрации обратного вызова, который вызывается, когда значение становится доступным. Они переворачивают с ног на голову «традиционную» модель асинхронных вычислений, которые обычно предоставляют API, похожее на это: <pre> Callback<R> cb = new Callback<R>() { void onComplete(R result) { … } void onFailure(Throwable error) { … } } dispatch(req, cb); </pre> Здесь <code>Callback.onComplete</code> вызывается, когда результат операции <code>dispatch</code> становится доступен, и <code>Callback.onFailure</code> если операция провалилась. Во futures, мы переворачиваем поток управления: <pre> val future = dispatch(req) future onSuccess { value => … } future onFailure { error => … } </pre> Futures сами по себе это комбинаторы, с которыми мы столкнулись в различных API коллекциях. Комбинаторы работают, используя единый API, оборачивая некоторые <code>Future</code> новым поведением без изменения <code>Future</code>. h3. Последовательная композиция Наиболее важным <code>Future</code> комбинатором является <code>flatMap</code>[2]: <blockquote> <code>def Future[A].flatMap[B](f: A => Future[B]): Future[B]</code> </blockquote> <code>flatMap</code> объединяет две сущности. Сигнатура метода описывает: данное успешное значение future <code>f</code> должно предоставить следущий <code>Future</code>. Результат этой операции другой <code>Future</code>, который завершится когда оба этих futures будут завершены. Если один из <code>Future</code> завершится с ошибкой, то данный <code>Future</code> также завершится с ошибкой. Это неявное чередование ошибок позволяет нам управлять ошибками только в тех местах, где это необходимо. <code>flatMap</code> это стандартное имя для данного комбинатора с заданной семантикой. В Scala есть короткая запись для этого вызова: конструкция <code>for</code>. Как пример, давайте предположим, что у нас есть методы <code>authenticate: Request -> User</code>, и <code>rateLimit: User -> Boolean</code>, тогда получим следующий код: <pre> val f = authenticate(request) flatMap { u => rateLimit(u) map { r => (u, r) } </pre> С помощью конструкции for, мы можем написать: <pre> val f = for { u <- authenticate(request) r <- rateLimit(u) } yield (u, r) </pre> используем future <code>f: Future[(User, Boolean)]</code>, который предоставляет объект пользователь и логическое выражение, которое сигнализирует о достигнутости пользователем предела. Заметьте, как здесь требуемая последовательная композиция: <code>rateLimit</code> берет аргумент выходного результата <code>authenticate</code> h3. Параллельная композиция Есть также несколько параллельных комбинаторов. Обычно они конвертируют последовательность <code>Future</code>-ов в <code>Future</code> последовательность, по-разному: <pre> object Future { … def collect[A](fs: Seq[Future[A]]): Future[Seq[A]] def join(fs: Seq[Future[_]]): Future[Unit] def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])] } </pre> <code>collect</code> самый простой вариант: берем набор <code>Future</code> похожего типа, мы получаем <code>Future</code> последовательность значений этого типа. Этот future завершится, когда все остальные futures будут завершены, или когда один из них завершится с ошибкой. <code>join</code> берет последовательнсоть <code>Future</code> типы которых можно смешать, возвращая <code>Future[Unit]</code>, который завершится, когда все остальные futures будут завершены (или завершится с ошибкой, если в одном из них ошибка). Это полезно для отслеживания завершения набора гетерогенных операций. <code>select</code> возвращает <code>Future</code>, который завершится, когда первый из данных <code>Future</code>завершится, вместе с остальными незавершенными future. В совокупности, это дает нам мощное и краткое выражение основных сетевых операций. Этот гипотетический код выполняет установку ограничений (с целью сохранения локального кеша) параллелльно с управлением запросами пользователя на серверной стороне: <pre> def serve(request: Request): Future[Response] = { val userLimit: Future[(User, Boolean)] = for { user <- auth(request) limited <- isLimit(user) } yield (user, limited) val done = dispatch(request) join userLimit done flatMap { case (rep, (usr, lim)) => if (lim) { updateLocalRateLimitCache(usr) Future.exception(new Exception("rate limited")) } else { Future.value(rep) } } } </pre> Этот гипотетический пример объединяет последовательную и параллельную композиции. Также обратите внимание, что нет явного обработчика ошибок, только конвертирование ограничения отвечает за выброс исключения. Если future выдает здесь ошибку, она автоматически распространяется до возвращаемого <code>Future</code>. h2. Service <code>Service</code> это фукнция <code>Req => Future[Rep]</code> для запросов и типовых ответов. <code>Service</code> используется как на клиенте, так и на сервере: серверы реализуют <code>Service</code>, а клиенты используют сборщики, для создания запросов. <blockquote> <code>abstract class Service[-Req, +Rep] extends (Req => Future[Rep])</code> </blockquote> Простой HTTP клиент может делать: <pre> service: Service[HttpRequest, HttpResponse] val f = service(HttpRequest("/", HTTP_1_1)) f onSuccess { res => println("got response", res) } onFailure { exc => println("failed :-(", exc) } </pre> Серверы реализуют <code>Service</code>: <pre> class MyServer extends Service[HttpRequest, HttpResponse] { def apply(request: HttpRequest) = { request.path match { case "/" => Future.value(HttpResponse("root")) case _ => Future.value(HttpResponse("default")) } } } </pre> Комбинировать их просто. Простейший прокси может выглядеть так: <pre> class MyServer(client: Service[..]) extends Service[HttpRequest, HttpResponse] { def apply(request: HttpRequest) = { client(rewriteReq(request)) map { res => rewriteRes(res) } } } </pre> где <code>rewriteReq</code> и <code>rewriteRes</code> могут предоставить протокол перевода, например. h2. Фильтры Фильтры — это service преобразователи. Они полезны как для предоставления функциональности <em>обобщенный service</em>, так и для производства данного service в различных состояниях. <pre> abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn] extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut]) </pre> Его тип лучше рассмотреть схематически: <pre> ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut]) (* Service *) [ReqIn -> (ReqOut -> RepIn) -> RepOut] </pre> Здесь показан способ как вы можете написать фильтр, который является механизмом задержки service. <pre> class TimeoutFilter[Req, Rep]( timeout: Duration, timer: util.Timer) extends Filter[Req, Rep, Req, Rep] { def apply( request: Req, service: Service[Req, Rep] ): Future[Rep] = { service(request).timeout(timer, timeout) { Throw(new TimedoutRequestException) } } } </pre> Этот пример показывает как вы можете использовать аутентификацию (через сервис аутентификации) для того, чтобы конвертировать <code>Service[AuthHttpReq, HttpRep]</code> в <code>Service[HttpReq, HttpRep]</code>. <pre> class RequireAuthentication(authService: AuthService) extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] { def apply( req: HttpReq, service: Service[AuthHttpReq, HttpRep] ) = { authService.auth(req) flatMap { case AuthResult(AuthResultCode.OK, Some(passport), _) => service(AuthHttpReq(req, passport)) case ar: AuthResult => Future.exception( new RequestUnauthenticated(ar.resultCode)) } } } </pre> Фильтры объединяются вместе с <code>andThen</code>. Предоставляя <code>Service</code> как аргумент для <code>andThen</code>, создающий (отфильтрованный) <code>Service</code> (типы представлены для иллюстрации). <pre> val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep] val serviceRequiringAuth: Service[AuthHttpReq, HttpRep] val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] = authFilter andThen timeoutFilter val authenticatedTimedOutService: Service[HttpReq, HttpRep] = authenticateAndTimedOut andThen serviceRequiringAuth </pre> h2. Компоновщики В конце концов, компоновщики собирают все вместе. <code>ClientBuilder</code> предоставляет экземпляр <code>Service</code>, дающий набор параметров, и <code>ServerBuilder</code> берущий экземпляр <code>Service</code> и отправляет ему входящие запросы. Для того чтобы определить тип <code>Service</code>, мы должны иметь <code>Codec</code>. Codec предоставляет нижележащий протокол реализации (например, HTTP, thrift, memcached). Оба компоновщика имеют много параметров, но их требуется совсем немного. Ниже представлен пример вызова <code>ClientBuilder</code> (типы представлены для иллюстрации): <pre> val client: Service[HttpRequest, HttpResponse] = ClientBuilder() .codec(Http) .hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003") .hostConnectionLimit(1) .tcpConnectTimeout(1.second) .retries(2) .reportTo(new OstrichStatsReceiver) .build() </pre> Здесь создается клиент, который выравнивает нагрузку 3 данных хостов, устанавливая более чем 1 соединение на хост, и падает только после 2 неудач. Статистика собирается с помощью "ostrich":https://github.com/twitter/ostrich. Требуются следующие опции компоновщика (и они обычно присутствуют постоянно): <code>hosts</code> или <code>cluster</code>, <code>codec</code> и <code>hostConnectionLimit</code>. <pre> val myService: Service[HttpRequest, HttpResponse] = // provided by the user ServerBuilder() .codec(Http) .hostConnectionMaxLifeTime(5.minutes) .readTimeout(2.minutes) .name("myHttpServer") .bindTo(new InetSocketAddress(serverPort)) .build(myService) </pre> Все будет работать на порту <code>serverPort</code> HTTP сервера, который управляет запросами на <code>myService</code>. Каждое соединение держится открытым до 5 минут, и мы также требуем, чтобы запрос был отправлен в течение 2 минут. Необходимые опции <code>ServerBuilder</code>: <code>name</code>, <code>bindTo</code> and <code>codec</code>. fn1. отличающийся от <code>java.util.concurrent.Future</code> fn2. это равносильно монадическому(monadic) связыванию