どうも、ナナオです。

結構長いこと書いていますが、今回で完成かな…?(前回も同じこと言ったような気がするけど

いや、完成させます!

期日設定によって送られるメッセージ頻度を変化させる

最初の記事で想定していたように、このBotは全員が回答するまでメッセージを送り続けるようにしたいです。

その頻度は以下のように期日からの経過時間によって変わります。

  • 期日まで
    • 未回答者に1日に一回催促のDMが送られる
  • 期日経過~1日
    • 未回答者に1時間に一回催促のDMが送られる
  • 期日経過1日~2日
    • 未回答者に30分に一回催促のDMが送られる
  • 期日経過3日~
    • 未回答者に15分に一回催促のDMが送られる

これを実装していきます。

pub struct MultipleService {
    router: Router,
    scheduler: JobScheduler,
}
impl MultipleService {
    pub async fn new(pool: PgPool) -> Result<MultipleService, shuttle_runtime::Error> {
        let state = AxumState { pool: pool.clone() };
        let router = Router::new()
            .route("/webhook", post(webhook))
            .route("/healthcheck", get(healthcheck))
            .with_state(state);

        let scheduler = JobScheduler::new().await.unwrap();

        // 時間ごとにバッチを追加
        // 1日に一回
        let pool_clone = pool.clone();
        let daily_job = Job::new_async("0 0 0 * * *", move |_, _| {
            let inner_pool = pool_clone.clone();
            Self::send_message_with_deadline(inner_pool, DailyCondition)
        }).expect("ジョブの作成に失敗しました");
        // 1時間に一回
        let pool_clone = pool.clone();
        let hourly_job = Job::new_async("0 0 * * * *", move |_, _| {
            let inner_pool = pool_clone.clone();
            Self::send_message_with_deadline(inner_pool, HourlyCondition)
        }).expect("ジョブの作成に失敗しました");
        // 30分に一回
        let pool_clone = pool.clone();
        let every_thirty_minutes_job = Job::new_async("0 */30 * * * *", move |_, _| {
            let inner_pool = pool_clone.clone();
            Self::send_message_with_deadline(inner_pool, EveryThirtyMinutesCondition)
        }).expect("ジョブの作成に失敗しました");
        // 15分に一回
        let pool_clone = pool.clone();
        let every_fifteen_minutes_job = Job::new_async("0 */15 * * * *", move |_, _| {
            let inner_pool = pool_clone.clone();
            Self::send_message_with_deadline(inner_pool, EveryFifteenMinutesCondition)
        }).expect("ジョブの作成に失敗しました");

        scheduler.add(daily_job).await.expect("スケジューラへジョブを追加するのに失敗しました。");
        scheduler.add(hourly_job).await.expect("スケジューラへジョブを追加するのに失敗しました。");
        scheduler.add(every_thirty_minutes_job).await.expect("スケジューラへジョブを追加するのに失敗しました。");
        scheduler.add(every_fifteen_minutes_job).await.expect("スケジューラへジョブを追加するのに失敗しました。");
        
        Ok(Self {
            router,
            scheduler,
        })
    }

    // バッチ内の処理はほぼ同じことをやっているので共通関数として定義
    fn send_message_with_deadline(pool: PgPool, cond: impl JobCondition + Send + 'static) -> Pin<Box<(dyn Future<Output = ()> + Send)>> {
        let inner_pool = pool.clone();
        let http_client = HttpClient::new();

        Box::pin(async move {
            let line_group_repository = LineGroupRepository::new(inner_pool);
            let all_line_group = line_group_repository.get_all()
                .await
                .expect("全てのline_group取得に失敗しました");

            for line_group in all_line_group.iter() {
                if let Some(chousei_id) = line_group.chousei_id.clone() {
                    let group_count = http_client.count_group_members(line_group.id.clone())
                        .await
                        .expect(format!("line_groupの人数取得に失敗しました。id: {}", line_group.id).as_str())
                        .try_into()
                        .expect("グループカウントをusizeに変換できませんでした");
                    
                    let res = http_client.get_chousei_csv(chousei_id)
                        .await
                        .expect("調整さんのCSV取得に失敗しました");
                    // グループ内の人数より調整さんの回答者数が少ない場合はメッセージをプッシュします
                    if res.member_info_map.len() < group_count {
                        if let Some(deadline_date) = line_group.deadline_date {
                            if cond.check(deadline_date) { // 実行時間ごとに条件が異なるため、JobConditionトレイトを使用して条件を入れ替え可能にしています。
                                let message = "調整さんに回答してください!".to_string();
                                http_client.push_message(line_group.id.clone(), vec![message])
                                    .await
                                    .expect("lineのメッセージ送信に失敗しました");
                            }
                        }
                    }
                }
            }
        })
    }
}

少しだけテクいのは、send_message_with_deadline関数でJobConditionトレイトを使用しているあたりです。

ストラテジーパターンを利用することで条件をトレイトごとに分離させることができるんでですね。

JobConditionの実装は以下のようになっています。


trait JobCondition {
    fn check(&self, deadline_date: NaiveDate) -> bool;
}

struct DailyCondition;
impl JobCondition for DailyCondition {
    fn check(&self, deadline_date: NaiveDate) -> bool {
        let now = Local::now();
        let today = now.date_naive();

        deadline_date < today
    }
}

struct HourlyCondition;
impl JobCondition for HourlyCondition {
    fn check(&self, deadline_date: NaiveDate) -> bool {
        let now = Local::now();
        let today = now.date_naive();

        deadline_date + Duration::days(1) == today
    }
}

struct EveryThirtyMinutesCondition;
impl JobCondition for EveryThirtyMinutesCondition {
    fn check(&self, deadline_date: NaiveDate) -> bool {
        let now = Local::now();
        let today = now.date_naive();

        deadline_date + Duration::days(2) == today
    }
}

struct EveryFifteenMinutesCondition;
impl JobCondition for EveryFifteenMinutesCondition {
    fn check(&self, deadline_date: NaiveDate) -> bool {
        let now = Local::now();
        let today = now.date_naive();

        deadline_date + Duration::days(3) == today
    }
}

アイドルモードへの移行を回避するため、healthcheckエンドポイントを実装する

これで時間経過とともにメッセージが送られる頻度も変わるはず…なのですが、Shuttleの仕様でリクエストが30分ないと自動的にアイドルモードになってしまうそうです。

Idle Projects - Shuttle

これではバッチを動かすことができなくなってしまう!ので、GASで15分に一回healthcheckエンドポイントを叩けるようにします。

とりあえずhealthcheckエンドポイントを実装します。


impl MultipleService {
    pub async fn new(pool: PgPool) -> Result<MultipleService, shuttle_runtime::Error> {
        let state = AxumState { pool: pool.clone() };
        let router = Router::new()
            .route("/webhook", post(webhook))
            .route("/healthcheck", get(healthcheck)) // healthcheckエンドポイントを追加
            .with_state(state);
        
        // ...中略...
    }
}

async fn healthcheck() -> &'static str {
    "OK"
}

追加したら、このエンドポイントを15分に一回コールするGASを実装します。

Apps Script  |  Google for Developers

HTTPリクエストの実装はこちらを参考にしました。

// 15分ごとにヘルスチェックエンドポイントにリクエストします。
// shuttleがアイドル状態になることを防ぐためです
function main() {
  let res = UrlFetchApp.fetch("https://[ドメイン名]/healthcheck");
}

こんな感じのコードを書いたら、15分に一回コードを実行するようにトリガーを追加します。

トリガーの設定はこんな感じにしておきます。

これでプロジェクトがアイドル状態にはならなくなりました。

まとめ

とりあえず完成しました!!

Rustでデザパタを実装するの楽しい。

あとはバッチがちゃんと実行されるか観察して、ちゃんと動いていたら追加実装していきます。

結構長いスパンで実装しましたが、Shuttleの扱いがちょっと分かってきたので他のサービスもShuttleを活用して実装していきたいです。

(というかなんかこっちから送ったメッセージに対してはデフォルトメッセージのままになっているので、カスタムメッセージを返すような実装を追加するのもいいかも??)

それではまた。