diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index dcaeb147..86d368f5 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -8,6 +8,7 @@ import ( "log/slog" "net/http" "strconv" + "sync" "time" "github.com/jmoiron/sqlx" @@ -433,6 +434,16 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + queue := v.(chan (struct{})) + queue <- struct{}{} + }() + writeJSON(w, http.StatusAccepted, &appPostRidesResponse{ RideID: rideID, Fare: fare, @@ -626,6 +637,16 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + queue := v.(chan (struct{})) + queue <- struct{}{} + }() + writeJSON(w, http.StatusOK, &appPostRideEvaluationResponse{ CompletedAt: ride.UpdatedAt.UnixMilli(), }) @@ -654,6 +675,10 @@ type appGetNotificationResponseChairStats struct { TotalEvaluationAvg float64 `json:"total_evaluation_avg"` } +// key: user_id +// value: chan(struct{}) +var userNotificationQueue sync.Map + func appGetNotification(w http.ResponseWriter, r *http.Request) { ctx := r.Context() user := ctx.Value("user").(*User) @@ -666,124 +691,137 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { f.Flush() } - ticker := time.NewTicker(time.Second * 1) + var ( + ticker = time.NewTicker(time.Second * 1) + first = make(chan (struct{}), 1) + queue chan (struct{}) + ) + + first <- struct{}{} + for { select { + case <-queue: + slog.Info("notify to user", "select", "queue") + case <-first: + slog.Info("notify to user", "select", "first") case <-ticker.C: - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to begin tx", "error", err) - return + slog.Info("notify to user", "select", "ticker") + case <-ctx.Done(): + break + } + + tx, err := db.Beginx() + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to begin tx", "error", err) + return + } + defer func() { + if tx != nil { + tx.Rollback() } - defer func() { - if tx != nil { - tx.Rollback() - } - }() - - ride := &Ride{} - if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - tx.Rollback() - tx = nil - slog.Info("no rides", "user_id", user.ID) - continue - } - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "user_id", user.ID) - return + }() + + ride := &Ride{} + if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + tx.Rollback() + tx = nil + slog.Info("no rides", "user_id", user.ID) + continue } + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get rides", "error", err, "user_id", user.ID) + return + } - yetSentRideStatus := RideStatus{} - status := "" - if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - slog.Info("no ride_status", "ride_id", ride.ID) - status, err = getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) - return - } - } else { + yetSentRideStatus := RideStatus{} + status := "" + if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + slog.Info("no ride_status", "ride_id", ride.ID) + status, err = getLatestRideStatus(ctx, tx, ride.ID) + if err != nil { writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) + slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) return } } else { - status = yetSentRideStatus.Status - } - - fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) - if err != nil { writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride) + slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) return } + } else { + status = yetSentRideStatus.Status + } - response := appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: status, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), - } + fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride) + return + } - if ride.ChairID.Valid { - chair := GetChair(ride.ChairID.String) + response := appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: status, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } - stats, err := getChairStats(ctx, tx, chair.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID) - return - } + if ride.ChairID.Valid { + chair := GetChair(ride.ChairID.String) - response.Chair = &appGetNotificationResponseChair{ - ID: chair.ID, - Name: chair.Name, - Model: chair.Model, - Stats: stats, - } + stats, err := getChairStats(ctx, tx, chair.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID) + return } - if yetSentRideStatus.ID != "" { - _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) - return - } + response.Chair = &appGetNotificationResponseChair{ + ID: chair.ID, + Name: chair.Name, + Model: chair.Model, + Stats: stats, } + } - if err := tx.Commit(); err != nil { + if yetSentRideStatus.ID != "" { + _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) + if err != nil { writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to commit", "error", err) + slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) return } - tx = nil + } - w.Write([]byte("data: ")) - if err := json.NewEncoder(w).Encode(response); err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to write response to http writer", "error", err, "response", response) - return - } - w.Write([]byte("\n\n")) - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - case <-ctx.Done(): - break + if err := tx.Commit(); err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to commit", "error", err) + return + } + tx = nil + + w.Write([]byte("data: ")) + if err := json.NewEncoder(w).Encode(response); err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to write response to http writer", "error", err, "response", response) + return + } + w.Write([]byte("\n\n")) + if f, ok := w.(http.Flusher); ok { + f.Flush() } } } diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index d8ffd302..1152c4b5 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -180,13 +180,14 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { } ride := &Ride{} + var status string if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE chair_id = ? ORDER BY updated_at DESC LIMIT 1`, chair.ID); err != nil { if !errors.Is(err, sql.ErrNoRows) { writeError(w, http.StatusInternalServerError, err) return } } else { - status, err := getLatestRideStatus(ctx, tx, ride.ID) + status, err = getLatestRideStatus(ctx, tx, ride.ID) if err != nil { writeError(w, http.StatusInternalServerError, err) return @@ -197,6 +198,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "PICKUP" } if req.Latitude == ride.DestinationLatitude && req.Longitude == ride.DestinationLongitude && status == "CARRYING" { @@ -204,6 +206,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "ARRIVED" } } } @@ -213,6 +216,16 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + queue := v.(chan (struct{})) + queue <- struct{}{} + }() + writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ RecordedAt: location.CreatedAt.UnixMilli(), }) @@ -389,6 +402,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { return } + var status string switch req.Status { // Acknowledge the ride case "ENROUTE": @@ -396,9 +410,10 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "ENROUTE" // After Picking up user case "CARRYING": - status, err := getLatestRideStatus(ctx, tx, ride.ID) + status, err = getLatestRideStatus(ctx, tx, ride.ID) if err != nil { writeError(w, http.StatusInternalServerError, err) return @@ -411,6 +426,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "CARRYING" default: writeError(w, http.StatusBadRequest, errors.New("invalid status")) } @@ -420,5 +436,15 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + queue := v.(chan (struct{})) + queue <- struct{}{} + }() + w.WriteHeader(http.StatusNoContent) }