GitLab EventStore

背景

モノリシックなGitLabプロジェクトは大きくなり、より多くのドメインが定義されるようになってきました。その結果、これらのドメインは時間的な結合によって互いに絡み合うようになってきています。

象徴的な例は、PostReceive ワーカーで、複数のドメインで多くのことが起こっています。新しいコミットがプッシュされたときに新しい動作が反応する場合、PostReceive またはそのサブコンポーネント (Git::ProcessRefChangesService など) のどこかにコードを追加します。

このようなアーキテクチャです:

  • 単一責任原則の違反です。
  • 慣れていないコードベースにコードを追加するリスクが高まります。あなたの知らないニュアンスがあり、それがバグやパフォーマンスの低下を引き起こすかもしれません。
  • ドメインの境界を侵します。特定の名前空間の内部(たとえばGit:: )で、突然他のドメインのクラスが追加されることがあります(Ci::MergeRequests:: など)。

EventStore とは何ですか?

Gitlab:EventStore は、既存のSidekiqワーカーと今日私たちが持っているobservabilityの上に構築された基本的なpub-subシステムです。私たちはこのシステムを使って、結合を最小限に抑えながらドメインをモデリングする際にイベントドリブンアプローチを適用します。

これは基本的に、既存のSidekiqワーカーをそのまま残して非同期作業を実行しますが、依存関係を反転させます。

EventStoreの例

CIパイプラインが作成されると、パイプラインのref にマッチするマージリクエストに対して、ヘッドパイプラインを更新します。マージリクエストは最新のパイプラインのステータスを表示することができます。

EventStoreがなければ

Ci::CreatePipelineService を変更し、パイプラインが作成されたかどうかをチェックするロジック(if ステートメントなど)を追加します。そして、MergeRequests:: ドメインの副作用を実行するワーカーをスケジュールします。

このスタイルはOpen-Closed Principleに違反し、不必要に他のドメインから副作用のロジックを追加し、結合を増やします:

graph LR subgraph ci[CI] cp[CreatePipelineService] end subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end cp -- perform_async --> upw cp -- perform_async --> pow

イベントストア

Ci::CreatePipelineServiceCi::PipelineCreatedEvent イベントを発行し、その責任はここで終わります。

MergeRequests:: ドメインはワーカーMergeRequests::UpdateHeadPipelineWorker でこのイベントを購読できます:

  • 副作用は非同期にスケジューリングされ、ドメインイベントを発するメインビジネストランザクショ ンに影響を与えません。
  • メイン・ビジネス・トランザクションを変更することなく、副作用を追加できます。
  • どのドメインが関与しているのか、またその所有者は何なのかが明確にわかります。
  • 明示的に宣言されているため、システムでどのようなイベントが発生するかを特定できます。

Gitlab::EventStore 、サブスクライバー(Sidekiqワーカー)とドメインイベントのスキーマの間にはまだ結合があります。この結合のレベルは、メイントランザクション(Ci::CreatePipelineService)が結合するよりもはるかに小さい:

  • 複数のサブスクライバ。
  • 複数のサブスクライバ起動方法(条件付き起動を含む)。
  • パラメータを渡す複数の方法。
graph LR subgraph ci[CI] cp[CreatePipelineService] cp -- publish --> e[PipelineCreateEvent] end subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end upw -. subscribe .-> e pow -. subscribe .-> e

各サブスクライバは、それ自身がSidekiqワーカーであるため、担当する作業のタイプに関連する属性を指定することができます。例えば、あるサブスクライバーはurgency: high を定義することができ、別の重要度の低いサブスクライバーはurgency: low を設定することができます。

EventStore は、Dependency Inversion を可能にする抽象化に過ぎません。これは、ビジネストランザクションを(他のドメインで実行されることの多い)副作用から分離するのに役立ちます。

イベントがパブリッシュされると、EventStore は各サブスクライブワーカーでperform_async を呼び出し、引数としてイベント情報を渡します。これは基本的に、各サブスクライバのキューに Sidekiq ジョブをスケジュールします。

つまり、サブスクライバーは単なるSidekiqワーカーであるため、サブスクライバーがどのように動作するかに関して、他には何も変わりません。例えば、ワーカー(サブスクライバー)がジョブの実行に失敗した場合、ジョブはSidekiqに戻され、再試行されます。

EventStoreの利点

  • Subscribers(Sidekiqワーカー)は、副作用が重要な場合、ワーカーのウェイトを変更することで、より速く実行するように設定できます。
  • 副作用が非同期で実行されることを自動的に強制します。これにより、メインのビジネストランザクションのパフォーマンスに影響を与えることなく、他のドメインがイベントを安全にサブスクライブできるようになります。

イベントの定義

Event オブジェクトは、バインドされたコンテキストで発生したドメインイベントを表します。イベントをパブリッシュすることで、他のバウンデッドコンテキストに発生したイベントを通知し、他のバウンデッドコンテキストがそれに反応できるようにします。

app/events/<namespace>/ の下に、過去に起こったことを表す名前を持つ新しいイベントクラスを定義します:

class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event
  def schema
    {
      'type' => 'object',
      'required' => ['pipeline_id'],
      'properties' => {
        'pipeline_id' => { 'type' => 'integer' },
        'ref' => { 'type' => 'string' }
      }
    }
  end
end

スキーマはイベントオブジェクトを初期化する際にすぐに検証されるので、パブリッシャーがサブスクライバーとの契約に従っていることを確認することができます。

スキーマ変更のためのロールアウトを少なくするために、できるだけオプションのプロパティを使用することを推奨します。しかし、required プロパティはイベントのサブジェクトの一意な識別子のために使うことができます。例えば

  • pipeline_idCi::PipelineCreatedEvent の必須プロパティにすることができます。
  • project_idProjects::ProjectDeletedEvent の必須プロパティにすることができます。

ペイロードを特定のサブスクライバに合わせることなく、サブスクライバが必要とする プロパティのみをパブリッシュしてください。ペイロードはイベントを完全に表すべきであり、緩く関連したプロパティ を含むべきではない[SHOULD]。例えば

Ci::PipelineCreatedEvent.new(data: {
  pipeline_id: pipeline.id,
  # unless all subscribers need merge request IDs,
  # this is data that can be fetched by the subscriber.
  merge_request_ids: pipeline.all_merge_requests.pluck(:id)
})

より多くのプロパティを持つイベントをパブリッシュすることで、サブスクライバは最初に必要なデータを得ることができます。そうでない場合、サブスクライバはデータベースから追加データを取得する必要があります。しかし、これはスキーマの継続的な変更につながり、単一の真実のソースを表していないプロパティを追加する可能性があります。このテクニックはパフォーマンスの最適化として使用するのがベストです。例えば、イベントに多くのサブスクライバが存在し、すべてのサブスクライバが同じデータをデータベースから再度フェッチするような場合です。

スキーマの更新

スキーマの変更には複数のロールアウトが必要です。新バージョンのデプロイ中に

  • 既存のパブリッシャーは旧バージョンを使用してイベントを発行できます。
  • 既存の購読者は旧バージョンを使ってイベントを消費できます。
  • イベントはジョブの引数としてSidekiqキューに永続化されるので、デプロイ中に2つのバージョンのスキーマを持つことができます。

スキーマの変更は最終的にSidekiqの引数に影響を与えるため、複数のロールアウトに関してはSidekiqスタイルガイドを参照してください。

プロパティの追加

  1. ロールアウト 1:
    • 新しいプロパティをオプションとして追加 (required ではありません)。
    • サブスクライバを更新し、新しいプロパティの有無にかかわらずイベントを消費できるようにします。
  2. ロールアウト 2:
    • 新しいプロパティを提供するためにパブリッシャーを変更します。
  3. ロールアウト 3: (プロパティがrequiredであるべき場合):
    • スキーマとサブスクライバコードを常にそれを期待するように変更します。

プロパティの削除

  1. ロールアウト 1:
    • プロパティがrequired の場合、オプションにします。
    • サブスクライバが常にプロパティを予期しないように更新します。
  2. ロールアウト 2:
    • イベントパブリッシングからプロパティを削除します。
    • プロパティを処理するサブスクライバからコードを削除します。

その他の変更

プロパティ名の変更など、その他の変更も同じ手順で行います:

  1. 古いプロパティを削除
  2. 新しいプロパティを追加

イベントの公開

先ほどのイベントを公開します:

Gitlab::EventStore.publish(
  Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id })
)

イベントは、可能な限り関連するServiceクラスからディスパッチされるべきです。ステートマシンの遷移のように、モデルにイベントのパブリッシュを許可する例外もあります。例えば、副作用のコレクションを実行するCi::BuildFinishedWorker をスケジューリングする代わりに、Ci::BuildFinishedEvent をパブリッシュして、他のドメインが非同期に反応するようにすることができます。

ActiveRecord コールバックはドメインイベントを表すには低レベルすぎます。コールバックはデータベースのレコードの変更を表します。意味がある場合もあるかもしれませんが、そのような例外は考慮すべきです。

サブスクライバーの作成

サブスクライバーは、Gitlab::EventStore::Subscriber モジュールを含む Sidekiq ワーカーです。このモジュールは、perform メソッドを処理し、handle_event メソッドを介して安全にイベントを処理するためのより良い抽象化を提供します。例えば

module MergeRequests
  class UpdateHeadPipelineWorker
    include Gitlab::EventStore::Subscriber

    def handle_event(event)
      Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
        # ...
      end
    end
  end
end

イベントへのサブスクライバーの登録

lib/gitlab/event_store.rb でワーカーを特定のイベントにサブスクライブするには、Gitlab::EventStore.configure! メソッドに次のような行を追加します:

module Gitlab
  module EventStore
    def self.configure!(store)
      # ...

      store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent

      # ...
    end
  end
end

EE コードベースでのみ定義されているワーカーも、ee/lib/ee/gitlab/event_store.rb でサブスクリプションを宣言することで、同じようにイベントをサブスクライブできます。

サブスクリプションは Rails アプリがロードされたときにメモリに保存され、すぐに凍結されます。実行時にサブスクリプションを変更することはできません。

イベントの条件付きディスパッチ

サブスクリプションはイベントを受け入れる条件を指定することができます:

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  if: -> (event) { event.data[:merge_request_id].present? }

これは、条件が満たされた場合、Ci::PipelineCreatedEventをサブスクライバーにディスパッチするようにイベントストアに指示します。

このテクニックは、サブスクライバーが小さなイベントのサブセットにしか興味がない場合、Sidekiqジョブのスケジューリングを回避することができます。

caution
条件付きディスパッチを使用する場合、与えられたイベントが発行されるたびに同期的に実行されるため、安い条件だけを含める必要があります。

複雑な条件の場合は、すべてのイベントをサブスクライブし、サブスクライバ ワーカーのhandle_event メソッドでロジックを処理するのがベストです。

テスト

パブリッシャーのテスト

パブリッシャーの責任はイベントが正しくパブリッシュされることです。

イベントが正しく発行されたかどうかを調べるには、RSpec matcher:publish_event を使います:

it 'publishes a ProjectDeleted event with project id and namespace id' do
  expected_data = { project_id: project.id, namespace_id: project.namespace_id }

  # The matcher verifies that when the block is called, the block publishes the expected event and data.
  expect { destroy_project(project, user, {}) }
    .to publish_event(Projects::ProjectDeletedEvent)
    .with(expected_data)
end

また、:publish_event matcher の内部で matcher を Composer することもできます。これは、イベントがある種類の値で作成されたことを保証したいが、事前にその値がわからない場合に便利です。この例は、新しいレコードを作成した後にイベントをパブリッシュする場合です。

it 'publishes a ProjectCreatedEvent with project id and namespace id' do
  # The project ID will only be generated when the `create_project`
  # is called in the expect block.
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }

  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
end

サブスクライバのテスト

サブスクライバは、発行されたイベントが正しく消費されることを確認する必要があります。この目的のために、サブスクライバをテストする方法を標準化するために、ヘルパーと共有サンプルを追加しました:

RSpec.describe MergeRequests::UpdateHeadPipelineWorker do
  let(:pipeline_created_event) { Ci::PipelineCreatedEvent.new(data: ({ pipeline_id: pipeline.id })) }

  # This shared example ensures that an event is published and correctly processed by
  # the current subscriber (`described_class`). It also ensures that the worker is idempotent.
  it_behaves_like 'subscribes to event' do
    let(:event) { pipeline_created_event }
  end

  # This shared example ensures that an published event is ignored. This might be useful for
  # conditional dispatch testing.
  it_behaves_like 'ignores the published event' do
    let(:event) { pipeline_created_event }
  end

  it 'does something' do
    # This helper directly executes `perform` ensuring that `handle_event` is called correctly.
    consume_event(subscriber: described_class, event: pipeline_created_event)

    # run expectations
  end
end

ベストプラクティス

  • CEとEEの分離と互換性のメンテナー:
    • イベントクラスを定義し、イベントが常に発生する同じコード(CEまたはEE)でイベントを発行します。
      • CE機能の結果としてイベントが発生する場合、イベント・クラスはCEで定義およびパブリッシュされる必要があります。同様に、イベントがEE機能の結果として発生する場合、イベント・クラスはEEで定義およびパブリッシュされる必要があります。
    • イベントに依存するサブスクライバは、依存する機能(CE または EE)が存在する同じコードで定義します。
      • CE でパブリッシュされたイベント(例えば、Projects::ProjectCreatedEvent )と、EE で定義されたこのイベントに依存するサブスクライバ(例えば、Security::SyncSecurityPolicyWorker)を持つことができます。
  • イベント・クラスの定義とイベントの発行は、同じ境界コンテキスト(最上位の Ruby 名前空間)内で行います。
    • 指定されたバウンデッドコンテキストは、それ自身のコンテキストに関連するイベントのみを公開する必要があります。
  • イベントをサブスクライブするときのシグナル/ノイズ比を評価します。サブスクライバー内で処理するイベントと無視するイベントの数は?小さなイベントのサブセットだけに興味がある場合は、条件付きディスパッチの使用を検討してください。条件付きディスパッチで同期チェックを実行するか、潜在的に冗長なワーカーをスケジュールするかのバランス。