きみはねこみたいなにゃんにゃんなまほう

ねこもスクリプトをかくなり

graphql-subscriptions を使ってみる - WebSocket に乗せてみる

(=˘ ꒳ ˘=) GraphQL の Subscriptions の道ってなんでこんなに険しいのか... import 地獄すぎる...

の続きです。Adding Subscriptions To Schema | GraphQL Subscriptions Docs をベースに Subscription のハンズオンを行なっていきます。

今回は本当に最低限の Subscription を WebSocket に乗せるところまでを行います。最低限とはいえ GraphQL + Webサーバのコードになるので怒涛の import を覚悟しておいてください。

最低限のコードを書いて動作確認

本当に単純に WebSocket 経由での最低限の動作を確認したいので、まずは毎秒自動で publish される PubSub インスタンスを用意して実験してみます。

graphql-toolsmekaExecutableSchema を使ってスキーマschema.ts に定義します。

// schema.ts
import { makeExecutableSchema } from 'graphql-tools'
import { PubSub } from 'graphql-subscriptions'
import { v4 as uuid } from 'uuid' // わかりやすい変化が欲しいのでUUIDをID値に利用します

export const pubsub = new PubSub()

const typeDefs = `
type Comment {
    id: String!
    content: String!
}

type Subscription {
  commentAdded: Comment!
}

# 最低一つ Query 定義が必要らしいのでダミーで定義
type Query {
  comments: [Comment!]!
}

schema {
  query: Query
  subscription: Subscription
}`

const resolvers = {
  Query: {},
  Subscription: {
    commentAdded: {
      subscribe: () => pubsub.asyncIterator('commentAdded'),
    },
  },
}

// 一秒ごとに publish する
setInterval(() => {
  pubsub.publish('commentAdded', { commentAdded: { id: uuid(), content: 'Hello!' } })
}, 1000)

export default makeExecutableSchema({ typeDefs, resolvers })

次にサーバを立てます。ここはほぼ Apollo 公式のデモのままです。

import express from 'express'
import { graphqlExpress, graphiqlExpress } from 'apollo-server-express'
import bodyParser from 'body-parser'
import { execute, subscribe } from 'graphql'
import { createServer } from 'http'
import { SubscriptionServer } from 'subscriptions-transport-ws'
import schema from './schema'

const PORT = 3000
const server = express()

server.use('/graphql', bodyParser.json(), graphqlExpress({ schema }))

server.use('/graphiql', graphiqlExpress({
  endpointURL: '/graphql',
  subscriptionsEndpoint: `ws://localhost:${PORT}/subscriptions`,
}))

const ws = createServer(server)
ws.listen(PORT, () => {
  console.log(`Apollo Server is now running on http://localhost:${PORT}`)
  new SubscriptionServer(
    { execute, subscribe, schema },
    { server: ws, path: '/subscriptions' },
  )
})

これで http://localhost:3000/graphiql にアクセスすると GraphiQL が起動します。GraphiQL では Subscription が利用できるので

subscription {
  commentAdded { id }
}

とクエリを入力し実行すると以下のような出力が一秒単位に更新されます。

f:id:lightbulbcat:20180206022145p:plain

GraphQL って何よりめげるのがライブラリ数の多さなんですよね。最低限に絞ってもこれだけの import が必要になるという... 初めの頃は「え、嘘でしょ...」と何度思ったかわかりません。もう少しパッケージングがされてると入りやすいんですけどね。

resolver 定義

一番のハマりどころはフィールドの定義が Query の場合と異なるということでした。Query だと

Query: {
  comments: () => ...
},

となるところが、Subscription だと

Subscription: {
  commentAdded: {
    subscribe: () => ...,
  },
}

となるという、フィールド定義をオブジェクトで行なってその中のフィールドとして subscribe を定義しないといけないという所に気づけず数時間を無駄にしました。

ペイロードの形式

上の件に負けず劣らずハマりどころだったのが PubSubpublishペイロードの形式です。

setInterval(() => {
  pubsub.publish('commentAdded', { commentAdded: { id: uuid(), content: 'Hello!' } })
}, 1000)

なんか commentAdded という同じような引数が続いていて見逃してしまっていたんですよね...

初めの commentAdded は型情報上は triggerName と呼ばれる、EventEmitter でいうイベント名のようなもので、情報通信チャンネル名というと分かりやすいでしょうか。これをキーに

pubsub.asyncIterator('commentAdded')

とすることで Async Iterator をインタフェースにして、非同期で情報を受け取り続けることができます。

そして次のペイロードに含まれる commentAdded については現状はただのお作法という認識しかできない状態です... おそらく Query のように複数クエリを混ぜた時にそれぞれのフィールド名になるような使い方をされるのだと予想していますが、理解はまたいずれ、後ほどということにしておきます。

subscribe で動作確認

さて、GraphQL に WebSocket に Async Iterator と、正直この技術スタックは全然使い慣れておりません。 実際上のコードも四苦八苦のトラブルシュートをしながら行き着きました。 確認の方法がHTTP経由で GraphiQL のみだと問題の切り分けが難しいので直接 GrpahQL を叩く方法を覚えておくといいかもしれません。

graphql に含まれる parse メソッドを利用することで以下のように Async Iterator を取得することができます。

subscribe(schema, parse('subscription { commentAdded { id }}')).then(console.log)

これで正しく makeExecutableSchema ができているかどうかまでは判断できます。ここで失敗するということは WebSocket 以前の問題であると判断がつきますね。

ちなみに Async Iterator は Async Iterable ではないので直接 for..await..of で回すことはできません。

Async Iterator については以下の投稿で掘り下げておりますのでよろしければお読みください。

それから graphql の subscribe については以下の投稿で軽く触ったものになります。

だんだん GraphQL の経験が積み上がってきた感触が確かなものになってまいりました。

Subscription を Mutation に連動させてみる

さて、今回の締めに Mutation に連動させた Subscription を実装してみたいと思います。 やっぱり任意のタイミングで情報を送れてこその非同期通信ですよね。

先ほどの schema.ts を以下のように書き換えました。

import { makeExecutableSchema } from 'graphql-tools'
import { PubSub } from 'graphql-subscriptions'
import { v4 as uuid } from 'uuid'

export const pubsub = new PubSub()

const typeDefs = `
type Comment {
    id: String!
    content: String!
}

type Query {
  comments: [Comment!]!
}

type Mutation {
  addComment(content: String!): Comment!
}

type Subscription {
  commentAdded: Comment!
}

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}`

// 最強のオンメモリデータストア (*˘꒳˘*) はいれつ
const comments = []

// Query と Mutation を足した resolvers
// せっかくなので Subscription のフィールド定義との違いもご覧ください
const resolvers = {
  Query: {
    comments() { return comments }
  },
  Mutation: {
    addComment(_, { content }) {
      const comment = { id: uuid(), content }
      comments.push(comment)
      
      // きみのコメントはぶじ追加されたよ (*˘꒳˘*) どやぁ...
      pubsub.publish('commentAdded', { commentAdded: comment })
      return comment
    }
  },
  Subscription: {
    commentAdded: {
      subscribe: () => pubsub.asyncIterator('commentAdded'),
    },
  },
}

export default makeExecutableSchema({ typeDefs, resolvers })

サーバの方のスクリプトは据え置きです。今度は GraphiQL を2枚開いて、あらかじめ片方で

subscription {
  commentAdded { id, content }
}

クエリを投げて subscription を設定した状態で、もう片方から

mutation addComment {
  addComment(content: "hey!") { id }
}

のようにデータの追加を行うと、subscription した方の表示が更新される様子が見られると思います。

といったところで今回も長い道のりでしたが以上です。Subscription には他にもフィルタなどの機能が用意されているようなので見ていきたいところですね。