原文地址:http://skaka.me/blog/2016/05/01/finagle2/
在上篇文章中我介紹了Finagle中的Future/Service/Filter. 這篇文章里, 我們將構建一個(gè)基于Http協(xié)議的echo服務(wù)端和客戶(hù)端, 下篇文章將構建一個(gè)基于thrift協(xié)議的客戶(hù)端和服務(wù)端. 這兩篇文章對應的源代碼地址在
Github. 代碼中有
Java和
Scala版本兩套版本的實(shí)現, 但是這里我只會(huì )介紹Java版本.
首先來(lái)看echo應用的Server端代碼, 打開(kāi)java-finagle-example/src/main/java/com/akkafun/finagle/Server.java:
12345678910111213141516171819202122public class Server extends Service<Request, Response> { //1 @Override public Future<Response> apply(Request request) { //2 System.out.println("request: " + request.getContentString()); Response response = Response.apply(Version.Http11$.MODULE$, Status.Ok()); response.setContentString(request.getContentString()); return Future.value(response); } public static void main(String[] args) throws Exception { Server service = new Server(); ListeningServer server = Http.server(). //3 withLabel("echo-server"). withTracer(ZipkinTracer.mk("192.168.99.100", 9410, DefaultStatsReceiver$.MODULE$, 1.0f)). serve(new InetSocketAddress(8081), service); Await.result(server); }}
在Finagle中, 實(shí)現一個(gè)RPC服務(wù)非常簡(jiǎn)單. 只需要繼承Service抽象類(lèi), 實(shí)現它的apply方法. Service抽象類(lèi)有兩個(gè)類(lèi)型參數, 第一個(gè)類(lèi)型參數代表的是請求對象, 第二個(gè)類(lèi)型參數代表的是返回對象. 這兩個(gè)對象的具體類(lèi)型與Service實(shí)現類(lèi)使用的具體協(xié)議有關(guān). 例如我們在echo服務(wù)中使用Http協(xié)議, 對應的Request類(lèi)就是com.twitter.finagle.http.Request, 對應的Response類(lèi)是com.twitter.finagle.http.Response. 如果是thrift協(xié)議, 則這兩個(gè)類(lèi)型參數在Service實(shí)現類(lèi)中都是
scala.Array<scala.Byte>(Array和Byte都是scala中的類(lèi), 對應Java中的數組與byte).
apply方法中, 我們首先使用Response的工廠(chǎng)方法構造一個(gè)Response對象. 然后將Request中的請求內容原封不動(dòng)的設置到Response中, 再將Response設置到Future中返回. 需要最后一步的原因是apply方法的返回值類(lèi)型是Future<Response>, 但是我們在這個(gè)方法中不需要進(jìn)行異步操作, 所以可以直接使用Future.value(response)將對象包裝成Future返回. 另外, 細心的你應該發(fā)現了一行比較礙眼的代碼: Response.apply(Version.Http11$.MODULE$, Status.Ok()), 其中Version的用法很古怪. 這是Java調用Scala伴生對象的副作用, Scala有一些語(yǔ)法和特性在Java中沒(méi)有對應的概念, 這種情況下Java調用Scala的代碼就會(huì )比較晦澀.
為了啟動(dòng)Service實(shí)例, 我們需要構造一個(gè)com.twitter.finagle.ListeningServer. withLabel設置服務(wù)名稱(chēng), withTracer設置監控信息, 這個(gè)等后面介紹zipkin的時(shí)候在解釋. 最后指定端口啟動(dòng)服務(wù).
現在來(lái)看echo應用的Client端代碼, 打開(kāi)java-finagle-example/src/main/java/com/akkafun/finagle/Client.java:
1234567891011121314151617181920212223242526272829303132333435import static scala.compat.java8.JFunction.*;public class Client { public static void main(String[] args) throws TimeoutException, InterruptedException { Service<Request, Response> service = Http.client(). //1 withLabel("echo-client"). withTracer(ZipkinTracer.mk("192.168.99.100", 9410, DefaultStatsReceiver$.MODULE$, 1.0f)). newService("127.0.0.1:8081"); //create a "Greetings!" request. Reader data = Reader$.MODULE$.fromStream( //2 new ByteArrayInputStream("Greetings!".getBytes(StandardCharsets.UTF_8))); Request request = Request.apply(Version.Http11$.MODULE$, Method.Post$.MODULE$, "/", data); Future<Response> responseFuture = Await.ready(service.apply(request)); //3 responseFuture.onSuccess(func(response -> { //4 System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT; })); responseFuture.onFailure(func(e -> { System.out.println("error: " + e.toString()); return BoxedUnit.UNIT; })); responseFuture.ensure(func(() -> { service.close(); //IDE may complain here, just ignore return BoxedUnit.UNIT; })); }}
這部分代碼和我們之前的Server類(lèi)代碼很像. 在Server類(lèi)中, 我們創(chuàng )建了一個(gè)Service實(shí)例并監聽(tīng)了8081端口, 現在客戶(hù)端通過(guò)newService創(chuàng )建了一個(gè)Service的stub.
這部分代碼用來(lái)構造一個(gè)消息內容為Greetings的Http請求.
service.apply(request)就是一次客戶(hù)端到服務(wù)端的RPC調用. 這個(gè)調用的返回值是Future<Response>.
而service.apply(request)是一個(gè)異步操作, 主線(xiàn)程調用這個(gè)方法并不會(huì )阻塞, 有可能主線(xiàn)程退出了實(shí)際調用還沒(méi)有完成. 所以這里就要用到Await.ready了. Await.ready的作用是等待一個(gè)Future執行完成再返回, 是一個(gè)同步操作. 通過(guò)調用Await.ready我們就能將一個(gè)異步操作轉化成一個(gè)同步操作.
接下來(lái)我們在Future上注冊請求成功與失敗的回調函數. 請求成功的回調函數中只是簡(jiǎn)單的打印出響應的消息內容.
這里有個(gè)細節需要說(shuō)明一下. Future的onSuccess方法需要傳入一個(gè)Scala的函數特質(zhì): scala.Function1[Response, BoxedUnit]. 如果是Java6或7, 我們可以這樣實(shí)現這個(gè)特質(zhì):
12345678responseFuture.onSuccess(new AbstractFunction1<Response, BoxedUnit>(){ @Override public BoxedUnit apply(Response response) { System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT; }});
在Java8中, 這種匿名類(lèi)我們一般會(huì )使用Lambda代替, 理想情況下寫(xiě)法是這樣:
12345responseFuture.onSuccess(response -> { System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT;});
可惜的是這種寫(xiě)法編譯不會(huì )通過(guò), 因為只有符合FunctionalInterface定義的接口才能使用Lambda表達式(什么是FunctionalInterface, 請參考
Javadoc), 而在Scala2.11中, scala.Function1不是一個(gè)FunctionalInterface(Scala2.12會(huì )兼容Java8). 為了在這里使用Lambda, 我們使用了
scala-java8-compat這個(gè)庫, 調用scala.compat.java8.JFunction.func方法將一個(gè)FunctionalInterface轉化成scala.Function1.
可以看出, 在Java中調用Finagle的API不是很方便. 所以Finagle適合以Scala為主, Java為輔的項目. 如果項目全是Java, 則值得為Finagle主要的API寫(xiě)一層Java的適配層, 來(lái)屏蔽Java調用Scala代碼會(huì )出現的一些晦澀代碼.
現在我們啟動(dòng)服務(wù)端和客戶(hù)端來(lái)看看運行結果. 首先啟動(dòng)Server類(lèi), 然后啟動(dòng)Client. Client運行完畢自動(dòng)結束, 你應該能在Client的控制臺看到如下輸出:
1response status: Status(200), response string: Greetings!
Server控制臺的輸出:
1request: Greetings!
Http協(xié)議比較適合用于對外提供服務(wù), 并且一般會(huì )使用REST. 在Finagle中使用REST可以使用
Finch庫. 這個(gè)庫輕量小巧, API簡(jiǎn)單, 提供了一套很方便的對Http消息進(jìn)行操作的DSL. 如果是內網(wǎng)服務(wù)調用, 一般推薦使用結構緊湊, 傳輸效率高的協(xié)議. 比如protocol buffer, thrift或Avro. Finagle對thrift有很好的支持, 下篇文章我將介紹在Finagle中如何開(kāi)發(fā)thrift應用.