Cloud Pub/Subの第2世代用のライブラリを第1世代のGAEで使用することは公式では推奨されていません。ではどうしても使いたい場合にはどのように実装するとよいのか。また使用することによってレイテンシが悪化したり使用上の制限がかかったりといった問題点があるのかを調査しました。
バックエンドグループの青松です。タクシー配車アプリGOのサーバーサイドの開発を担当しています。
GOアプリではメインのAPIサーバーとしてGAEを使用しており、分析ログをBigQueryに送信するためにCloud Pub/Subを使用しています。
現在Cloud Pub/Sub用のライブラリとしては下記の2つがあります。
基本的には1つめの新しいライブラリを使うことを推奨されているのですが、GAEの第1世代においては2つめのライブラリを使うことを公式は推奨しています。
私達のチームで使っているGAEは第1世代のもので、第2世代にそろそろ移行したいけれど第1世代特有の依存ライブラリを置き換えて実装し直さなければならない箇所が残っているため、まだ第2世代に移行しきれていないという状況です。
今回私は第2世代への移行を見据えてという気持ちと、本当に第1世代では使えないのかという確認の気持ちを込めて1つめのライブラリを使用してみることにしました。
また、調査段階から下記の記事を参考にしました。
まずはクライアントを作成します。クライアントはパッケージ変数に配置し、GetPubSubClient()関数を使って各ハンドラが取得するようにしました。
クライアントを作成する関数自体は時間がかかりませんが、コネクションプールを使用することによって一度確立したコネクションを使い回すことができて、2度目のパブリッシュから高速に動作するようになります。
リクエストの度にクライアントを作成するようにするとコネクションプーリングがうまく機能しないため、このようにパッケージ変数に置いています。
各ハンドラの初期化時にクライアントを注入することによって、パッケージ変数を参照してはいるけれど、Mockで置き換えることが可能になります。
パッケージ変数を色々な箇所から読んでしまうと、テストを書くときに負債になるので避けたほうがよいです。
package infra
import ... // 省略
var pubsubClient domain.PubSubClient
type PubSubClient struct {
Client *pubsub.Client
ProjectID string
resultMutex sync.RWMutex
resultMap map[string]*pubsub.PublishResult
}
func GetPubSubClient() domain.PubSubClient {
return pubsubClient
}
func InitPubSubClient() error {
ctx := context.Background()
project := os.Getenv("...")
credential := os.Getenv("...")
c, err := newPubSubClient(ctx, project, credential)
pubsubClient = c
return err
}
func newPubSubClient(ctx context.Context, project string, credential string) (domain.PubSubClient, error) {
c, err := pubsub.NewClient(ctx, project,
option.WithCredentialsFile(credential),
option.WithGRPCConnectionPool(10))
if err != nil {
return nil, err
}
client := new(PubSubClient)
client.Client = c
client.ProjectID = project
client.resultMap = map[string]*pubsub.PublishResult{}
return client, nil
}
インターフェイスの定義は以下のようになっています。
package domain
type PubSubClient interface {
Publish(string, []byte) (string, error)
GetResult(string) error
Close()
}
トピックの新しいライブラリにおけるパブリッシュ処理においては、PublishとGetがわかれています。
func (psc *PubSubClient) Publish(topicName string, data []byte) (string, error) {
topic := psc.Client.TopicInProject(topicName, psc.ProjectID)
res := topic.Publish(context.Background(), &pubsub.Message{
Data: data,
})
s := uuid.New().String()
psc.resultMutex.Lock()
defer psc.resultMutex.Unlock()
psc.resultMap[s] = res
return s, nil
}
func (psc *PubSubClient) GetResult(key string) error {
psc.resultMutex.RLock()
r, ok := psc.resultMap[key]
psc.resultMutex.RUnlock()
if !ok {
return errors.New("no value for that key")
}
// この Get に 20-60 milli sec ぐらいの待ちが発生する
_, err := r.Get(context.Background())
// publishの度にmapにidが追加されるため、エラーの有無によらずにdeleteを行う
psc.resultMutex.Lock()
defer psc.resultMutex.Unlock()
delete(psc.resultMap, key)
return err
}
興味のある方は 採用ページ も見ていただけると嬉しいです。
Twitter @mot_techtalk のフォローもよろしくお願いします!