Springからの要求に応じた付加応答を追加する(2) – reactor適用

はじめに

前回開発したソースには、2つの問題がありました。

  1. Network I / Oを順次実行
    • O(n)時間がかかる:timeout * attachment改修
    • AsyncでO(1)で終わるようにチューニングが必要
  2. Failover
    • attachmentは単純付加情報にもかかわらずattachmentServiceでexceptionが発生した場合、何の情報も取得できない
    • attachは失敗してもBoard情報と残りの成功したattachmentは表示する必要がある

今回は、このような課題をreactorを使って解決してみよう。

Reactor

まず、なぜreactorを使用するのか、簡単に整理してみよう。

  • Rx(Reactive Extension)を実装し、簡単に非同期プログラミングが可能
  • 別のRx実装チェーンRxJavaと比較したとき、次のような利点がある
    • Spring5に統合しやすい
    • Java8対応
      • rxJavaは1.6バージョンから使え、独自のFunctionを実装して使用
      • ReactorはJava8から使え、Java8 ApiとOptionalなどをサポートしている

ここではReactor APIの基本的な内容は言及しません。ちなみにreactorはJavaのバージョンに影響を受けます。本文のソースはSpring 5で作成しているが、Java8を使用する場合は、以下のソースをSpring4に適用しても、正常に動作することを確認する必要があります。

AttachmentWrapperItem

問題を解決する前にまずリファクタリングが必要です。以前の内容を振り返ってみよう。BoardDtoは以下のようにAttachmentWrapperを持っています。

@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class BoardDto implements Attachable {
    private Long id;
    private String title;
    private String content;

    @Setter(AccessLevel.PRIVATE)
    @JsonIgnore
    private AttachmentWrapper attachmentWrapper = new AttachmentWrapper();
}

AttachmentWrapperAttachmentTypeAttachmentを別々に受け取ります。

@ToString
@EqualsAndHashCode
public class AttachmentWrapper {
    //...
    void put(AttachmentType type, Attachment attachment);
}

reactorを使うと、Mono<T>Flux<T>のようにGenericで表現できるように、AttachmentTypeAttachmentを1つにまとめるAttachmentWrapperItemクラスを作成し、これをAttachmentWrapperに反映しなければなりません。

AttachmentWrapperItem

@Value
public class AttachmentWrapperItem {
    // 例外発生時に返却するインスタンス
    public static final AttachmentWrapperItem ON_ERROR = new AttachmentWrapperItem(null, null);
    private AttachmentType type;
    private Attachment attachment;
}

AttachmentWrapper適用

@ToString
@EqualsAndHashCode
public class AttachmentWrapper {
    interface AttachmentMap {
        boolean isEmpty();

        Set<Map.Entry<AttachmentType, Attachment>> entrySet();
    }

    @Delegate(types = AttachmentMap.class)
    private Map<AttachmentType, Attachment> value = new EnumMap<>(AttachmentType.class);

    public void putAll(Collection<AttachmentWrapperItem> items) {
        this.value.putAll(items.stream().collect(Collectors.toMap(AttachmentWrapperItem::getType, AttachmentWrapperItem::getAttachment)));
    }
}

Attachable interface変更
既存の2つのパラメータで受け取っていたものを、1つのパラメータで受け取るように変更します。

// 変更前
interface Attachable {
    //...
    default void attach(Map<? extends AttachmentType, ? extends Attachment> attachment) {
        getAttachmentWrapper().putAll(attachment);
    }
}

// 変更後
interface Attachable {
    //...
    default void attach(Collection<AttachmentWrapperItem> items) {
        getAttachmentWrapper().putAll(items);
    }
}

AttachService変更
getAttachmentの戻り値をAttachmentWrapperItemに変えます。

AttachmentWrapperItem getAttachment(Attachable attachable);

AttachWriterToBoardService変更
AttachServiceの変更されたロジックを反映します。

// 変更前
@Override
public Attachment getAttachment(Attachable attachment) {
    BoardDto boardDto = supportType.cast(attachment);
    return writerClient.getWriter(boardDto.getWriterId());
}
// 変更後
@Override
public AttachmentWrapperItem getAttachment(Attachable attachable) {
    BoardDto boardDto = supportType.cast(attachable);
    Attachment attachment = writerClient.getWriter(boardDto.getWriterId());
    return new AttachmentWrapperItem(supportAttachmentType, attachment);
}

AttachmentAspect変更

// 変更前
private void executeAttach(Attachable attachable) {

    Set<AttachmentType> types = attachmentTypeHolder.getTypes();

    Map<AttachmentType, Attachment> attachmentMap =
            types.stream()
                 .flatMap(type -> typeToServiceMap.get(type).stream())
                 .filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass()))
                 .collect(Collectors.toMap(AttachService::getSupportAttachmentType, service -> service.getAttachment(attachable)));

    attachable.attach(attachmentMap);
}

// 変更後
private void executeAttach(Attachable attachable) {

    Set<AttachmentType> types = attachmentTypeHolder.getTypes();

    List<AttachmentWrapperItem> items =
            types.stream()
                 .flatMap(type -> typeToServiceMap.get(type).stream())
                 .filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass()))
                 .map(service -> service.getAttachment(attachable))
                 .collect(Collectors.toList());

    attachable.attach(items);
}

reactorで非同期プログラミングを適用

attachService.getAttachment()を呼び出すときにNetwork I/Oが発生しています。問題は、このメソッドがattachmentの件数分、実行されるという点です。非同期プログラミングを適用してこれを解決してみよう。

依存性の設定

compile('io.projectreactor:reactor-core:3.1.5.RELEASE')

AttachService修正

getAttachmentの戻り値の型をMono<AttachmentWrapperInfo>に変更します。

public interface AttachService<T extends Attachable> {
    AttachmentType getSupportAttachmentType();

    Class<T> getSupportType();

    Mono<AttachmentWrapperItem> getAttachment(Attachable attachable);
}

AttachWriterToBoardService修正

修正したAttachServiceの実装体であるAttachWriterToBoardServiceに変更内容を反映してみよう。

@Override
public Mono<AttachmentWrapperItem> getAttachment(Attachable attachable) {
    return Mono.defer(() -> executeGetAttachment(attachable))
                // Network I/Oを使用、elastic()で生成されたthreadで実行されるように宣言
               .subscribeOn(Schedulers.elastic()); 
}

// getAttachmentが実行していた部分をもってくる
// 戻り値にMono.just()をかぶせる
private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) {
    BoardDto boardDto = supportType.cast(attachable);
    Attachment attachment = writerClient.getWriter(boardDto.getWriterId());
    return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment));
}

AttachmetAspect修正

Attachableの実装体のタイプに合わせてserviceを実行します。取得したMonoのListをそれぞれ非同期で実行させ、block()を呼び出して同期します。

private void executeAttach(Attachable attachable) {

    List<Mono<AttachmentWrapperItem>> monoItems = createMonoList(attachable);

    List<AttachmentWrapperItem> items = executeMonoAndCollectList(monoItems);

    attachable.attach(items);
}

// Attachableのタイプに合わせてサービス実行後、List<Mono>を生成する
private List<Mono<AttachmentWrapperItem>> createMonoList(Attachable attachable) {
    Set<AttachmentType> types = attachmentTypeHolder.getTypes();
    return types.stream()
                .flatMap(type -> typeToServiceMap.get(type).stream())
                .filter(service -> service.getSupportType().isAssignableFrom(attachable.getClass()))
                .map(service -> service.getAttachment(attachable))
                .collect(Collectors.toList());
}

// List<Mono>をzip()でそれぞれ実行しながら、List<attachmentWrapperItem>を作成して返却
// それぞれのMonoは内部でelastic()により非同期で実行され
// block()を介して最終的に同期する
private List<AttachmentWrapperItem> executeMonoAndCollectList(List<Mono<AttachmentWrapperItem>> monoItems) {
    return Mono.zip(monoItems, this::filterItems)
               .block();
}

private List<AttachmentWrapperItem> filterItems(Object[] itemArray) {
    return Stream.of(itemArray)
                 .map(AttachmentWrapperItem.class::cast)
                 .collect(Collectors.toList());
}

実行

非同期に戻ることを確認しよう。
テストコードを組んで確認するのが最良ですが、ここでは簡単にThread.sleep(3000)で確認してみましょう。

// 書き込みサービスに3秒スリープ
private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) {
    try { Thread.sleep(3000); } catch (InterruptedException e) { }
    BoardDto boardDto = supportType.cast(attachable);
    Attachment attachment = new SimpleAttachmentCollection<>(commentClient.getComments(boardDto.getId()));
    return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment));
}

// 作成者情報サービスに3秒スリープ
private Mono<AttachmentWrapperItem> executeGetAttachment(Attachable attachable) {
    try { Thread.sleep(3000); } catch (InterruptedException e) { }
    BoardDto boardDto = supportType.cast(attachable);
    Attachment attachment = writerClient.getWriter(boardDto.getWriterId());
    return Mono.just(new AttachmentWrapperItem(supportAttachmentType, attachment));
}

3秒以上、6秒以内にリクエストがきたら成功です。

reactorでエラー克服

reactorでエラーを克服する方法は簡単です。エラーが発生した場合、以前作成したAttachmentWrapperItem.ON_ERRORを返すようにすればよいでしょう。Rxはこのような状況のためのAPIが、すべて定義しています。

AttachServiceで例外発生時の処理

AttachWriterToBoardServiceWriterをインポートしている間に、Exceptionが発生した場合、AttachmentWrapperItem.ON_ERRORを送信するように変更します。

@Slf4j
@Component
public class AttachWriterToBoardService implements AttachService<BoardDto> {
    //...
    private final WriterClient writerClient;
    private final Duration timeout;

    @Autowired
    public AttachWriterToBoardService(@NonNull WriterClient writerClient,
                                      @Value("${attach.writer.timeoutMillis:5000}") long timeout) {
        this.writerClient = writerClient;
        this.timeout = Duration.ofMillis(timeout);
    }

    //...

    @Override
    public Mono<AttachmentWrapperItem> getAttachment(Attachable attachable) {
        return Mono.defer(() -> executeGetAttachment(attachable))
                   .subscribeOn(Schedulers.elastic())
                   .timeout(timeout) // reactorにtimeoutをかける。
                   .doOnError(e -> log.warn(e.getMessage(), e)) // エラーが発生したらlogを残す。代替値を返すのでwarnに指定。
                   .onErrorReturn(AttachmentWrapperItem.ON_ERROR); // エラー発生でON_ERRORを返す。
    }
}

AttachmentAspectでON_ERRORをろ過するようにロジックを変更

前述のAttachmentAspectからList<Mono>を非同期で実行し、結果値をList<AttachmentWrapperItem>にまとめてAttachableに入れました。簡単に非同時実行結果がON_ERRORある場合をフィルタリングすれば、成功結果だけを集めてList<AttachmentWrapperItem>を作ることができます。

@Slf4j
@Component
@Aspect
public class AttachmentAspect {

    //...

    private List<AttachmentWrapperItem> executeMonoAndCollectList(List<Mono<AttachmentWrapperItem>> monoItems) {
        // timeoutでMonoが実行される最大時間を指定することもできる
        return Mono.zip(monoItems, this::filterItems)
                   .doOnError(e -> log.warn(e.getMessage(), e))
                   .onErrorReturn(Collections.emptyList()) // すべてのMonoを実行して集合する過程でエラーが発生した場合、emptyList()を返す
                   .block();
    }

    private List<AttachmentWrapperItem> filterItems(Object[] itemArray) {
        return Stream.of(itemArray)
                     .map(AttachmentWrapperItem.class::cast)
                     .filter(item -> item != AttachmentWrapperItem.ON_ERROR) // 例外処理によって失敗した要請は除く
                     .collect(Collectors.toList());
    }
}

実行

以前、100番の掲示板を呼び出すとき、作成者情報を取得しようとするとFeignClientで404を投げ、下記のようにAPI自体が失敗しました。
GET /boards/100?attachment=comments,writer

{
  "timestamp": "2018-03-08T07:55:22.127+0000",
  "status": 500,
  "error": "Internal Server Error",
  "message": "status 404 reading WriterClient#getWriter(long); content: {}",
  "path": "/boards/100"
}
feign.FeignException: status 404 reading WriterClient#getWriter(long); content: {}
    at feign.FeignException.errorStatus(FeignException.java:62) ~[feign-core-9.5.1.jar:na]
        ...
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]

failover適用後、例外はwarnでログを残し、成功した部分までは応答できるようになりました。

{
  {
    "id": 100,
    "title": "title100",
    "content": "content100",
    "comments": [
      {
        "id": 496,
        "email": "Zola@lizzie.com",
        "body": "neque unde voluptatem iure\nodio excepturi ipsam ad id\nipsa sed expedita error quam\nvoluptatem tempora necessitatibus suscipit culpa veniam porro iste vel"
      }
    ]
  }
}

 

2018-03-08 19:59:12.056  WARN 64890 --- [      elastic-5] c.p.s.s.a.s.w.AttachWriterToBoardService : status 404 reading WriterClient#getWriter(long); content:
{}

feign.FeignException: status 404 reading WriterClient#getWriter(long); content: {}
    at feign.FeignException.errorStatus(FeignException.java:62) ~[feign-core-9.5.1.jar:na]
        ...
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_152]

まとめ

Reactorを使って非同期プログラミングを行い、障害に対処して克服できるように試みました。途中、reactorにtimeout()を使用したので、この部分はclientからFeignClientを使用することで、application.ymlに引き出して別途管理することができます。以前共有したHystrixとも連携してfallbackを実装することもでき、強力な障害対応ができます。

依然として現在のコードは大きな欠点があります。AttachmentAspectでreactorのblock()を呼び出すという点です。これについては、reactor learnページから参照した画像で説明できそうです。

| 出典_:https://projectreactor.io/learn_ |

つまりNon-Blockingでリソースを効率的に使用しなかったということです。そのためSpringFramework 5ではwebfluxを使ってnetty基盤(基本設定)でNon-Blocking + Asyncを使用できるようにしました。

TOAST Meetup 編集部

TOASTの技術ナレッジやお得なイベント情報を発信していきます
pagetop