もう4回目の記事になるんですね。ナナオです。

今回は設定された期日まで15分おきにメッセージを送れるようにしてみたいと思います。

実装をモジュール化

今までの実装でメインモジュールがだいぶごちゃごちゃしてきたので分離してあげます。

LineGroupの構造体はmodel.rsを作ってそちらに移動します。

use sqlx::FromRow;
use chrono::NaiveDate;
use serde::{Serialize, Deserialize};

#[derive(FromRow, Serialize, Deserialize)]
pub struct LineGroup {
    pub id: String,
    pub deadline_date: NaiveDate,
}

DBのline_groupに対する処理はrepository.rsを作ってそちらに移動します。

use crate::model::LineGroup;
use sqlx::PgPool;


pub struct LineGroupRepository<'a> {
    pool: &'a PgPool
}
impl<'a> LineGroupRepository<'a> {
    pub fn new(pool: &'a PgPool) -> Self {
        Self {
            pool
        }
    }
    pub async fn select(&self, id: String) -> Result<Option<LineGroup>, sqlx::Error> {
        let line_group = sqlx::query_as(r"SELECT * FROM line_group WHERE id = $1;")
            .bind(id)
            .fetch_optional(self.pool)
            .await?;

        Ok(line_group)
    }

    pub async fn update(&self, data: &LineGroup) -> Result<(), sqlx::Error> {
        sqlx::query(r"UPDATE line_group SET deadline_date = $2 WHERE id = $1;")
            .bind(&data.id)
            .bind(&data.deadline_date)
            .execute(self.pool)
            .await?;

        Ok(())
    }

    pub async fn insert(&self, data: &LineGroup) -> Result<(), sqlx::Error> {
        sqlx::query(r"INSERT INTO line_group (id, deadline_date) VALUES ($1, $2);")
            .bind(&data.id)
            .bind(&data.deadline_date)
            .execute(self.pool)
            .await?;

        Ok(())
    }
}

署名認証処理はutils.rsを作ってそちらに移動します。

use axum::http::header::HeaderMap;
use base64::{Engine as _, engine::general_purpose};
use hmac::{Hmac, Mac};
use sha2::Sha256;

/// ヘッダーとリクエストボディの内容から署名検証を行います。
/// 
/// まだCHANNEL_SECRETがモジュール依存
pub fn verify_signature(
    channel_secret: String,
    headers: HeaderMap,
    body: String,
) -> bool {
    type HmacSha256 = Hmac<Sha256>;

    let signature: Option<&axum::http::HeaderValue> = headers.get("x-line-signature");

    let signature = signature
        .expect("署名が存在しません。")
        .to_str()
        .expect("署名を文字列として読み込めませんでした。")
        .to_string();

    let mut mac = HmacSha256::new_from_slice(channel_secret.as_bytes())
        .expect("HMACキーの検証に失敗しました。");

    mac.update(body.as_bytes());

    let code_bytes = mac.finalize().into_bytes();
    let encoded: String = general_purpose::STANDARD.encode(code_bytes);

    encoded == signature
}

コマンド関連の構造体もcommand.rsに移動しておきます。

use regex::Regex;
use serde_json::Value;
use sqlx::PgPool;
use chrono::NaiveDate;
use async_trait::async_trait;
use crate::model::LineGroup;
use crate::repository::LineGroupRepository;

#[async_trait]
trait BotCommand {
    async fn execute(&self, value: Value);
}

pub struct BotSetDeadlineCommand {
    pool: PgPool,
    line_group: LineGroup,
}
impl BotSetDeadlineCommand {
    fn new(
        pool: PgPool,
        group_id: &str, 
        deadline_date: &str,
    ) -> Self {
        let line_group = LineGroup {
            id: group_id.to_string(),
            deadline_date: NaiveDate::parse_from_str(deadline_date, "%Y-%m-%d")
                .expect("日付型への変換に失敗しました")
        };

        Self {
            pool,
            line_group
        }
    }
}
#[async_trait]
impl BotCommand for BotSetDeadlineCommand {
    async fn execute(&self, _: Value) {
        let repository = LineGroupRepository::new(&self.pool);
        let line_group = repository.select(self.line_group.id.clone())
            .await
            .expect("line_groupの取得時に問題が発生しました");
        if line_group.is_some() {
            repository.update(&self.line_group).await.expect("line_groupの更新に失敗しました");
        } else {
            repository.insert(&self.line_group).await.expect("line_groupの作成に失敗しました");
        }
    }
}

pub struct BotSetUrlCommand;
#[async_trait]
impl BotCommand for BotSetUrlCommand {
    async fn execute(&self, value: Value) {}
}

pub struct BotHelpCommand;
#[async_trait]
impl BotCommand for BotHelpCommand {
    async fn execute(&self, value: Value) {}
}

/// Commandを実行する
pub async fn execute_bot_command(
    pool: PgPool,
    account_name: String,
    value: Value,
) {
    // ...中略...
}

これでだいぶすっきりしました。

バッチの仕様

期日設定まではできたので、今度はバッチとメッセージ送信を実装しましょう。

本当は「グループ内の回答していないメンバーにDMする」という仕様にしたかったのですが、グループ内のメンバー取得がLINEからの審査に合格したアカウントじゃないといけないようなので、とりあえず最初はグループトークに送信する仕様にします。

Messaging APIリファレンス | LINE Developers

一応DMを送るための代替案はあって、グループトークにメッセージを送ってくれたユーザーのIDは取得可能なので、グループトークにヤバいさんを追加した段階でグループトーク内のすべての人にメッセージを送ってもらうという方法はあります。

まぁとりあえずバッチとメッセージ送信の実装ができたらその辺はすぐ実装できるので、あとにしましょう。

以前Shuttleでバッチを実装したときと同じように、バッチを実装していきます。

まずは依存関係を追加します。

tokio = "1.34.0"
tokio-cron-scheduler = "0.9.4"

では以前の記事で実装したように、スケジューラの実装を追加します。

// src/main.rs
use axum::{extract::State, http::{StatusCode, header::HeaderMap}, routing::post, Router};
use serde_json::Value;
use sqlx::PgPool;
use tokio_cron_scheduler::{Job, JobScheduler};

mod model;
mod repository;
mod utils;
mod command;

use utils::verify_signature;
use command::execute_bot_command;

const CHANNEL_SECRET: &str = "[シークレット]";
const ACCOUNT_NAME: &str = "ヤバいさん";

pub struct MultipleService {
    router: Router,
    scheduler: JobScheduler,
}
impl MultipleService {
    pub async fn new(pool: PgPool) -> Result<MultipleService, shuttle_runtime::Error> {
        let state = AxumState { pool };
        let router = Router::new()
            .route("/webhook", post(webhook))
            .with_state(state);

        let scheduler = JobScheduler::new().await.unwrap();

        scheduler.add(
            // 15分に一回実行する
            Job::new("0 */15 * * * *", |_uuid, _l| {
                // println!("I run every 15 minutes");
            }).unwrap()
        ).await.unwrap();

        Ok(Self {
            router,
            scheduler,
        })
    }
}

#[shuttle_runtime::async_trait]
impl shuttle_runtime::Service for MultipleService {
    async fn bind(mut self, addr: std::net::SocketAddr) -> Result<(), shuttle_runtime::Error> {
        let server = axum::Server::bind(&addr);

        let (_runner_hdl, _axum_hdl) =
            tokio::join!(self.scheduler.start(), server.serve(self.router.into_make_service()));

        Ok(())
    }
}

#[derive(Clone)]
struct AxumState {
    pool: PgPool,
}

async fn webhook(
    State(state): State<AxumState>,
    headers: HeaderMap,
    body: String,
) -> (StatusCode, &'static str) {
    if !verify_signature(CHANNEL_SECRET.to_string(), headers, body.clone()) {
        return (StatusCode::UNAUTHORIZED, "無効なリクエストです。サーバー側と署名が一致しませんでした。")
    }

    let v: Value = serde_json::from_str(body.as_str())
        .expect("JSONのパースに失敗しました");  

    execute_bot_command(state.pool, ACCOUNT_NAME.to_string(), v).await;

    (StatusCode::OK, "OK")
}

#[shuttle_runtime::main]
async fn main(
    #[shuttle_shared_db::Postgres] pool: PgPool,
) -> Result<MultipleService, shuttle_runtime::Error> {
    // マイグレーションを実行
    sqlx::migrate!("../migrations")
        .run(&pool)
        .await.unwrap();

    MultipleService::new(pool).await
}

ただ、このままではジョブスケジューラがDBPoolを持っていないため、DBからデータ取得ができません。

今まではpoolを扱うのがaxumだけだったのでよかったんですが、スケジューラと共有するので少し扱い方を変える必要があります。

ということで、Rust版のスマートポインタであるArcを使用してpoolをラップしてあげることで使用できるようにします。

// src/main.rs
// ...中略...

impl MultipleService {
    // 引数のpoolをArcでラップします
    pub async fn new(pool: Arc<PgPool>) -> Result<MultipleService, shuttle_runtime::Error> {
        let state = AxumState { pool: pool.clone() };
        let router = Router::new()
            .route("/webhook", post(webhook))
            .with_state(state);

        let scheduler = JobScheduler::new().await.unwrap();

        // 15分に一回
        let job = Job::new_async("0 */15 * * * *", move |_uuid, _l| {
            let clone_pool = pool.clone();

            Box::pin(async move {
                let line_group_repository = LineGroupRepository::new(&clone_pool);
                let line_group = line_group_repository.select("test".to_string()).await.unwrap();
                println!("line_group: {:?}", line_group.unwrap());
                println!("I run every 15 minutes");
            })
        }).unwrap();

        scheduler.add(job).await.unwrap();

        Ok(Self {
            router,
            scheduler,
        })
    }
}

// ...中略...

#[derive(Clone)]
struct AxumState {
    pool: Arc<PgPool>, // これもArcでラップします
}

async fn webhook(
    State(state): State<AxumState>,
    headers: HeaderMap,
    body: String,
) -> (StatusCode, &'static str) {
    if !verify_signature(CHANNEL_SECRET.to_string(), headers, body.clone()) {
        return (StatusCode::UNAUTHORIZED, "無効なリクエストです。サーバー側と署名が一致しませんでした。")
    }

    let v: Value = serde_json::from_str(body.as_str())
        .expect("JSONのパースに失敗しました");  

    execute_bot_command(state.pool, ACCOUNT_NAME.to_string(), v).await;

    (StatusCode::OK, "OK")
}

#[shuttle_runtime::main]
async fn main(
    #[shuttle_shared_db::Postgres] pool: PgPool,
) -> Result<MultipleService, shuttle_runtime::Error> {
    // マイグレーションを実行
    sqlx::migrate!("../migrations")
        .run(&pool)
        .await.unwrap();

    MultipleService::new(Arc::new(pool)).await // これもArcで(ry
}

これでリポジトリをスケジューラから扱えるようにできました。

あとは全てのline_groupが取得できる関数をリポジトリに追加しておきましょう。

impl<'a> LineGroupRepository<'a> {
    // ...中略...

    pub async fn get_all(&self) -> Result<Vec<LineGroup>, sqlx::Error> {
        let all_line_group = sqlx::query_as(r"SELECT * FROM line_group;")
            .fetch_all(self.pool)
            .await?;

        Ok(all_line_group)
    }

    // ...中略...
}

ではジョブの処理を修正します。

        let job = Job::new_async("0 */15 * * * *", move |_uuid, _l| {
            let clone_pool = pool.clone();

            Box::pin(async move {
                let line_group_repository = LineGroupRepository::new(&clone_pool);
                // get_allを使用する
                let line_group = line_group_repository.get_all().await.unwrap();
                println!("line_group: {:?}", line_group);
                println!("I run every 15 minutes");
            })
        }).unwrap();

実行してみます。

> cargo shuttle run
# ...中略...

    Starting yabai-app on http://127.0.0.1:8000

2023-11-15T16:38:38.626+09:00 [Runtime] Starting on 127.0.0.1:8000
2023-11-15T16:38:40.157+09:00 [Runtime] line_group: [LineGroup { id: "test", deadline_date: 2023-11-07 }, LineGroup { id: "C480b2f8b56ecaf62c2033867e2ff78b2", deadline_date: 2023-11-12 }]

ログに取得した全てのline_groupが表示されています!

ではここで取得したグループIDを使用してメッセージを送ってみましょう。

LINE Developerの設定画面から、チャネルアクセストークンを発行します。

ここで発行したアクセストークンを使用すると、メッセージ送信が可能になります。

Messaging APIリファレンス | LINE Developers

curlコマンドを発行して、このアクセストークンでメッセージが発行できるか確認してみます。

curl -v -X POST https://api.line.me/v2/bot/message/push \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer {チャネルアクセストークン}' \
-d '{
    "to": "{グループID}",
    "messages":[
        {
            "type":"text",
            "text":"Hello, world1"
        },
        {
            "type":"text",
            "text":"Hello, world2"
        }
    ]
}'

実行後、メッセージが送られていました。

大丈夫そうですね。

ではRustのHTTPクライアントであるreqwestを入れてメッセージを送れるようにしてみましょう。

reqwest = { version = "0.11.22", features = ["json"] }

LINEのテキストメッセージは普通のテキスト以外に絵文字も使えるようにちょっと複雑な仕様になっています。

Messaging APIリファレンス | LINE Developers

ただ今回は絵文字は使用しないので、シンプルな構造体を定義するくらいでクライアント定義はできます。

新しくline_clientモジュールを実装します。

// src/line_client.rs
use serde_json::{json, Value};

use reqwest::Client;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct PushMessageResponse {
    #[serde(rename = "sentMessages")]
    sent_messages: Vec<PushMessageSentMessage>
}

#[derive(Deserialize, Serialize)]
pub struct PushMessageSentMessage {
    id: String,
    #[serde(rename = "quoteToken")]
    quote_token: String,
}

pub struct LineClient {
    client: Client,
}
impl LineClient {
    pub fn new() -> Self {
        Self {
            client: Client::new(),
        }
    }

    pub async fn push_message(&self, id: String, messages: Vec<String>) -> Result<PushMessageResponse, reqwest::Error> {
        let messages: Vec<Value> = messages.iter().map(|message| json!({
            "type": "text",
            "text": message,
        })).collect();

        let req_json = json!({
            "to": id,
            "messages": messages,
        });

        let res: PushMessageResponse = self.client.post("https://api.line.me/v2/bot/message/push")
            .json(&req_json)
            .bearer_auth("{チャネルアクセストークン}")
            .send()
            .await?
            .json()
            .await?;

        Ok(res)
    }
}

うーん、シンプル is 美しい。

これをバッチから実行できるようにしましょう。

        // 15分に一回
        let job = Job::new_async("0 */15 * * * *", move |_uuid, _l| {
            let clone_pool = pool.clone();
            let line_scheduler = LineClient::new();

            Box::pin(async move {
                let line_group_repository = LineGroupRepository::new(&clone_pool);
                let all_line_group = line_group_repository.get_all().await.unwrap();

                for line_group in all_line_group.iter() {
                    line_scheduler.push_message(line_group.id.clone(), vec!["test message".to_string()]).await.unwrap();
                }

                println!("line_group: {:?}", &all_line_group);
                println!("I run every 15 minutes");
            })
        }).unwrap();

shuttleで実行してみましょう。

設定したメッセージが送られてきました!

まとめ

とりあえずバッチの実行ができるようになりました。

次回は調整さんのCSVパース処理を実装しようと思います。

ではまた。