From 5d7f2f0210d89500d651db8585c0157805e90bf6 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 02:28:11 +0900 Subject: [PATCH 01/20] Notify on post chair coordinate --- home/isucon/webapp/go/app_handlers.go | 20 ++++++++++ home/isucon/webapp/go/chair_handlers.go | 52 ++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index dcaeb147..ff34b05d 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -8,6 +8,7 @@ import ( "log/slog" "net/http" "strconv" + "sync" "time" "github.com/jmoiron/sqlx" @@ -654,6 +655,10 @@ type appGetNotificationResponseChairStats struct { TotalEvaluationAvg float64 `json:"total_evaluation_avg"` } +// key: user_id +// value: chan(appGetNotificationResponseData) +var userNotificationQueue sync.Map + func appGetNotification(w http.ResponseWriter, r *http.Request) { ctx := r.Context() user := ctx.Value("user").(*User) @@ -667,8 +672,23 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { } ticker := time.NewTicker(time.Second * 1) + + v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData))) + queue := v.(chan (*appGetNotificationResponseData)) + for { select { + case response := <-queue: + w.Write([]byte("data: ")) + if err := json.NewEncoder(w).Encode(response); err != nil { + w.WriteHeader(http.StatusInternalServerError) + slog.Error("failed to write response to http writer", "error", err, "response", response) + return + } + w.Write([]byte("\n\n")) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } case <-ticker.C: tx, err := db.Beginx() if err != nil { diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index d8ffd302..c33c147b 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -1,6 +1,7 @@ package main import ( + "context" "database/sql" "encoding/json" "errors" @@ -180,13 +181,14 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { } ride := &Ride{} + var status string if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE chair_id = ? ORDER BY updated_at DESC LIMIT 1`, chair.ID); err != nil { if !errors.Is(err, sql.ErrNoRows) { writeError(w, http.StatusInternalServerError, err) return } } else { - status, err := getLatestRideStatus(ctx, tx, ride.ID) + status, err = getLatestRideStatus(ctx, tx, ride.ID) if err != nil { writeError(w, http.StatusInternalServerError, err) return @@ -213,6 +215,54 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + go func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + + ctx := context.Background() + + tx, err := db.Beginx() + defer tx.Rollback() + + fare, err := calculateDiscountedFare(ctx, tx, ride.UserID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + if err != nil { + slog.Warn("failed to calculate dscounted fare", "error", err, "user_id", ride.UserID, "ride", ride) + return + } + + stats, err := getChairStats(ctx, tx, chair.ID) + if err != nil { + slog.Warn("failed to calculate dscounted fare", "error", err, "chair_id", chair.ID) + return + } + + queue := v.(chan (*appGetNotificationResponseData)) + queue <- &appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: status, + Chair: &appGetNotificationResponseChair{ + ID: chair.ID, + Name: chair.Name, + Model: chair.Model, + Stats: stats, + }, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } + }() + writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ RecordedAt: location.CreatedAt.UnixMilli(), }) From 7aee8ca35b58682f9c84f45d3d111c8ab56c1149 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 02:37:10 +0900 Subject: [PATCH 02/20] Add debug log --- home/isucon/webapp/go/chair_handlers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index c33c147b..a2d54902 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -261,6 +261,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { CreatedAt: ride.CreatedAt.UnixMilli(), UpdateAt: ride.UpdatedAt.UnixMilli(), } + slog.Info("notifified", "ride_id", ride.ID, "user_id", ride.UserID) }() writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ From b739f25f5f522cf3df16f062b92bfd00ef49fed7 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 02:43:54 +0900 Subject: [PATCH 03/20] debug log --- home/isucon/webapp/go/app_handlers.go | 1 + home/isucon/webapp/go/chair_handlers.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index ff34b05d..38981965 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -679,6 +679,7 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { for { select { case response := <-queue: + slog.Info("pop from userNotificationQueue", "response", response) w.Write([]byte("data: ")) if err := json.NewEncoder(w).Encode(response); err != nil { w.WriteHeader(http.StatusInternalServerError) diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index a2d54902..379e8e59 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -261,7 +261,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { CreatedAt: ride.CreatedAt.UnixMilli(), UpdateAt: ride.UpdatedAt.UnixMilli(), } - slog.Info("notifified", "ride_id", ride.ID, "user_id", ride.UserID) + slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) }() writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ From 39cef458ea318d6a907b5d3b9927c1497c8f5b2a Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 02:46:48 +0900 Subject: [PATCH 04/20] Change ticker --- home/isucon/webapp/go/app_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 38981965..4a28a4a5 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -671,7 +671,7 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { f.Flush() } - ticker := time.NewTicker(time.Second * 1) + ticker := time.NewTicker(time.Second * 10) v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData))) queue := v.(chan (*appGetNotificationResponseData)) From 25a0f13b228f1d3ee8f3eaaafbc198a78b622a53 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:00:38 +0900 Subject: [PATCH 05/20] First sent --- home/isucon/webapp/go/app_handlers.go | 116 ++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 4a28a4a5..5cbfeba4 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -673,6 +673,9 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { ticker := time.NewTicker(time.Second * 10) + first := make(chan (struct{}), 1) + first <- struct{}{} + v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData))) queue := v.(chan (*appGetNotificationResponseData)) @@ -690,6 +693,119 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { if f, ok := w.(http.Flusher); ok { f.Flush() } + case <-first: + tx, err := db.Beginx() + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to begin tx", "error", err) + return + } + defer func() { + if tx != nil { + tx.Rollback() + } + }() + + ride := &Ride{} + if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + tx.Rollback() + tx = nil + slog.Info("no rides", "user_id", user.ID) + continue + } + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get rides", "error", err, "user_id", user.ID) + return + } + + yetSentRideStatus := RideStatus{} + status := "" + if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + slog.Info("no ride_status", "ride_id", ride.ID) + status, err = getLatestRideStatus(ctx, tx, ride.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) + return + } + } else { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) + return + } + } else { + status = yetSentRideStatus.Status + } + + fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride) + return + } + + response := appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: status, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } + + if ride.ChairID.Valid { + chair := GetChair(ride.ChairID.String) + + stats, err := getChairStats(ctx, tx, chair.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID) + return + } + + response.Chair = &appGetNotificationResponseChair{ + ID: chair.ID, + Name: chair.Name, + Model: chair.Model, + Stats: stats, + } + } + + if yetSentRideStatus.ID != "" { + _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) + return + } + } + + if err := tx.Commit(); err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to commit", "error", err) + return + } + tx = nil + + w.Write([]byte("data: ")) + if err := json.NewEncoder(w).Encode(response); err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to write response to http writer", "error", err, "response", response) + return + } + w.Write([]byte("\n\n")) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } case <-ticker.C: tx, err := db.Beginx() if err != nil { From 50778bdf52edcb8c8b4e547b4bc04024532435e7 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:03:09 +0900 Subject: [PATCH 06/20] init queue after sent first message --- home/isucon/webapp/go/app_handlers.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 5cbfeba4..6e8473b2 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -676,8 +676,7 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { first := make(chan (struct{}), 1) first <- struct{}{} - v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData))) - queue := v.(chan (*appGetNotificationResponseData)) + var queue chan (*appGetNotificationResponseData) for { select { @@ -806,6 +805,9 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { if f, ok := w.(http.Flusher); ok { f.Flush() } + + v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData))) + queue = v.(chan (*appGetNotificationResponseData)) case <-ticker.C: tx, err := db.Beginx() if err != nil { From efb8d34849a46b1f61a8ca7bbf4bd5f7541f908d Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:05:13 +0900 Subject: [PATCH 07/20] Revert tick --- home/isucon/webapp/go/app_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 6e8473b2..be8a561d 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -671,7 +671,7 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { f.Flush() } - ticker := time.NewTicker(time.Second * 10) + ticker := time.NewTicker(time.Second * 1) first := make(chan (struct{}), 1) first <- struct{}{} From 20b74a70df32b98ebb10f2b1f993659767630ee5 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:15:28 +0900 Subject: [PATCH 08/20] Notify on ride is created and status is changed --- home/isucon/webapp/go/app_handlers.go | 27 +++++++++++++ home/isucon/webapp/go/chair_handlers.go | 52 +++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index be8a561d..467903f4 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -434,6 +434,33 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + go func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + + queue := v.(chan (*appGetNotificationResponseData)) + queue <- &appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: "MATCHING", + Chair: nil, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } + slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) + }() + writeJSON(w, http.StatusAccepted, &appPostRidesResponse{ RideID: rideID, Fare: fare, diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index 379e8e59..761ef1b9 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -440,6 +440,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { return } + var status string switch req.Status { // Acknowledge the ride case "ENROUTE": @@ -447,6 +448,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "ENROUTE" // After Picking up user case "CARRYING": status, err := getLatestRideStatus(ctx, tx, ride.ID) @@ -462,6 +464,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "CARRYING" default: writeError(w, http.StatusBadRequest, errors.New("invalid status")) } @@ -471,5 +474,54 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + go func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + + ctx := context.Background() + + tx, err := db.Beginx() + defer tx.Rollback() + + fare, err := calculateDiscountedFare(ctx, tx, ride.UserID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + if err != nil { + slog.Warn("failed to calculate dscounted fare", "error", err, "user_id", ride.UserID, "ride", ride) + return + } + + stats, err := getChairStats(ctx, tx, chair.ID) + if err != nil { + slog.Warn("failed to calculate dscounted fare", "error", err, "chair_id", chair.ID) + return + } + + queue := v.(chan (*appGetNotificationResponseData)) + queue <- &appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: status, + Chair: &appGetNotificationResponseChair{ + ID: chair.ID, + Name: chair.Name, + Model: chair.Model, + Stats: stats, + }, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } + slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) + }() + w.WriteHeader(http.StatusNoContent) } From b635235122d06288f6a0af37f4838b24d0728747 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:19:10 +0900 Subject: [PATCH 09/20] Fix status --- home/isucon/webapp/go/chair_handlers.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index 761ef1b9..2ff60f94 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -199,6 +199,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "PICKUP" } if req.Latitude == ride.DestinationLatitude && req.Longitude == ride.DestinationLongitude && status == "CARRYING" { @@ -206,6 +207,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + status = "ARRIVED" } } } From 83301967e2ad50494a417dbe97c58a135dc168c6 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:30:49 +0900 Subject: [PATCH 10/20] Notify completed --- home/isucon/webapp/go/app_handlers.go | 50 +++++++++++++++++++++++-- home/isucon/webapp/go/chair_handlers.go | 2 +- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 467903f4..b6ed456a 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -441,6 +441,18 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { return } + ctx := context.Background() + + tx, err := db.Beginx() + defer tx.Rollback() + + chair := GetChair(ride.ChairID.String) + stats, err := getChairStats(ctx, tx, chair.ID) + if err != nil { + slog.Warn("failed to calculate dscounted fare", "error", err, "chair_id", chair.ID) + return + } + queue := v.(chan (*appGetNotificationResponseData)) queue <- &appGetNotificationResponseData{ RideID: ride.ID, @@ -452,9 +464,14 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { Latitude: ride.DestinationLatitude, Longitude: ride.DestinationLongitude, }, - Fare: fare, - Status: "MATCHING", - Chair: nil, + Fare: fare, + Status: "MATCHING", + Chair: &appGetNotificationResponseChair{ + ID: chair.ID, + Name: chair.Name, + Model: chair.Model, + Stats: stats, + }, CreatedAt: ride.CreatedAt.UnixMilli(), UpdateAt: ride.UpdatedAt.UnixMilli(), } @@ -654,6 +671,33 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { return } + // ユーザー通知 + go func() { + v, ok := userNotificationQueue.Load(ride.UserID) + if !ok { + return + } + + queue := v.(chan (*appGetNotificationResponseData)) + queue <- &appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: "COMPLETED", + Chair: nil, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } + slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) + }() + writeJSON(w, http.StatusOK, &appPostRideEvaluationResponse{ CompletedAt: ride.UpdatedAt.UnixMilli(), }) diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index 2ff60f94..b35f1f43 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -453,7 +453,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { status = "ENROUTE" // After Picking up user case "CARRYING": - status, err := getLatestRideStatus(ctx, tx, ride.ID) + status, err = getLatestRideStatus(ctx, tx, ride.ID) if err != nil { writeError(w, http.StatusInternalServerError, err) return From 29617b81a8f24ba775852d7f93423f079cd395b9 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:40:00 +0900 Subject: [PATCH 11/20] Notify every 10 sec --- home/isucon/webapp/go/app_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index b6ed456a..513ac6a3 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -742,7 +742,7 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { f.Flush() } - ticker := time.NewTicker(time.Second * 1) + ticker := time.NewTicker(time.Second * 10) first := make(chan (struct{}), 1) first <- struct{}{} From 4c413fd7679f0a1a5d91391e51305a7551d68469 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:45:40 +0900 Subject: [PATCH 12/20] Notify sync --- home/isucon/webapp/go/app_handlers.go | 8 ++++---- home/isucon/webapp/go/chair_handlers.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 513ac6a3..e5cad0dd 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -435,7 +435,7 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -476,7 +476,7 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - }() + } writeJSON(w, http.StatusAccepted, &appPostRidesResponse{ RideID: rideID, @@ -672,7 +672,7 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -696,7 +696,7 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - }() + } writeJSON(w, http.StatusOK, &appPostRideEvaluationResponse{ CompletedAt: ride.UpdatedAt.UnixMilli(), diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index b35f1f43..fe8151a5 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -218,7 +218,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -264,7 +264,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - }() + } writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ RecordedAt: location.CreatedAt.UnixMilli(), @@ -477,7 +477,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -523,7 +523,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - }() + } w.WriteHeader(http.StatusNoContent) } From 50cb5e65511c99f1a2b6969e97ebd9f8fa19128e Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:47:21 +0900 Subject: [PATCH 13/20] Set chan size --- home/isucon/webapp/go/app_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index e5cad0dd..6e8fdbf4 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -877,7 +877,7 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { f.Flush() } - v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData))) + v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData), 100)) queue = v.(chan (*appGetNotificationResponseData)) case <-ticker.C: tx, err := db.Beginx() From 805aad6687cb51c9cc223728c2dbbf9161ca51d3 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:49:01 +0900 Subject: [PATCH 14/20] Revert "Notify sync" This reverts commit 4c413fd7679f0a1a5d91391e51305a7551d68469. --- home/isucon/webapp/go/app_handlers.go | 8 ++++---- home/isucon/webapp/go/chair_handlers.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 6e8fdbf4..71e2bafc 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -435,7 +435,7 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - { + go func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -476,7 +476,7 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - } + }() writeJSON(w, http.StatusAccepted, &appPostRidesResponse{ RideID: rideID, @@ -672,7 +672,7 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - { + go func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -696,7 +696,7 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - } + }() writeJSON(w, http.StatusOK, &appPostRideEvaluationResponse{ CompletedAt: ride.UpdatedAt.UnixMilli(), diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index fe8151a5..b35f1f43 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -218,7 +218,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - { + go func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -264,7 +264,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - } + }() writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ RecordedAt: location.CreatedAt.UnixMilli(), @@ -477,7 +477,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - { + go func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -523,7 +523,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { UpdateAt: ride.UpdatedAt.UnixMilli(), } slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) - } + }() w.WriteHeader(http.StatusNoContent) } From 829a72d8fac04410802336f043b90437b4029a23 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 03:49:39 +0900 Subject: [PATCH 15/20] Sync notify --- home/isucon/webapp/go/app_handlers.go | 4 ++-- home/isucon/webapp/go/chair_handlers.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 71e2bafc..09688b98 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -435,7 +435,7 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -672,7 +672,7 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index b35f1f43..810a0c79 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -218,7 +218,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return @@ -477,7 +477,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { } // ユーザー通知 - go func() { + func() { v, ok := userNotificationQueue.Load(ride.UserID) if !ok { return From 4fec53549e3e7b679177a3ce9bc840a41e14c304 Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 10:24:09 +0900 Subject: [PATCH 16/20] Use chan only notify --- home/isucon/webapp/go/app_handlers.go | 314 ++++++++------------------ 1 file changed, 95 insertions(+), 219 deletions(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 09688b98..5d89f045 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -742,258 +742,134 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { f.Flush() } - ticker := time.NewTicker(time.Second * 10) + var ( + ticker = time.NewTicker(time.Second * 10) + first = make(chan (struct{}), 1) + queue chan (*appGetNotificationResponseData) + ) - first := make(chan (struct{}), 1) first <- struct{}{} - var queue chan (*appGetNotificationResponseData) - for { select { - case response := <-queue: - slog.Info("pop from userNotificationQueue", "response", response) - w.Write([]byte("data: ")) - if err := json.NewEncoder(w).Encode(response); err != nil { - w.WriteHeader(http.StatusInternalServerError) - slog.Error("failed to write response to http writer", "error", err, "response", response) - return - } - w.Write([]byte("\n\n")) - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + case <-queue: case <-first: - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to begin tx", "error", err) - return - } - defer func() { - if tx != nil { - tx.Rollback() - } - }() - - ride := &Ride{} - if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - tx.Rollback() - tx = nil - slog.Info("no rides", "user_id", user.ID) - continue - } - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "user_id", user.ID) - return - } - - yetSentRideStatus := RideStatus{} - status := "" - if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - slog.Info("no ride_status", "ride_id", ride.ID) - status, err = getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) - return - } - } else { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) - return - } - } else { - status = yetSentRideStatus.Status - } - - fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride) - return - } + case <-ticker.C: + case <-ctx.Done(): + break + } - response := appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: status, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), + tx, err := db.Beginx() + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to begin tx", "error", err) + return + } + defer func() { + if tx != nil { + tx.Rollback() } + }() - if ride.ChairID.Valid { - chair := GetChair(ride.ChairID.String) - - stats, err := getChairStats(ctx, tx, chair.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID) - return - } - - response.Chair = &appGetNotificationResponseChair{ - ID: chair.ID, - Name: chair.Name, - Model: chair.Model, - Stats: stats, - } + ride := &Ride{} + if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + tx.Rollback() + tx = nil + slog.Info("no rides", "user_id", user.ID) + continue } + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get rides", "error", err, "user_id", user.ID) + return + } - if yetSentRideStatus.ID != "" { - _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) + yetSentRideStatus := RideStatus{} + status := "" + if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + slog.Info("no ride_status", "ride_id", ride.ID) + status, err = getLatestRideStatus(ctx, tx, ride.ID) if err != nil { writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) + slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) return } - } - - if err := tx.Commit(); err != nil { + } else { writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to commit", "error", err) + slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) return } - tx = nil + } else { + status = yetSentRideStatus.Status + } - w.Write([]byte("data: ")) - if err := json.NewEncoder(w).Encode(response); err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to write response to http writer", "error", err, "response", response) - return - } - w.Write([]byte("\n\n")) - if f, ok := w.(http.Flusher); ok { - f.Flush() - } + fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride) + return + } - v, _ := userNotificationQueue.LoadOrStore(user.ID, make(chan (*appGetNotificationResponseData), 100)) - queue = v.(chan (*appGetNotificationResponseData)) - case <-ticker.C: - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to begin tx", "error", err) - return - } - defer func() { - if tx != nil { - tx.Rollback() - } - }() - - ride := &Ride{} - if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - tx.Rollback() - tx = nil - slog.Info("no rides", "user_id", user.ID) - continue - } - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "user_id", user.ID) - return - } + response := appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: status, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } - yetSentRideStatus := RideStatus{} - status := "" - if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - slog.Info("no ride_status", "ride_id", ride.ID) - status, err = getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) - return - } - } else { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) - return - } - } else { - status = yetSentRideStatus.Status - } + if ride.ChairID.Valid { + chair := GetChair(ride.ChairID.String) - fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + stats, err := getChairStats(ctx, tx, chair.ID) if err != nil { writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride) + slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID) return } - response := appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: status, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), - } - - if ride.ChairID.Valid { - chair := GetChair(ride.ChairID.String) - - stats, err := getChairStats(ctx, tx, chair.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID) - return - } - - response.Chair = &appGetNotificationResponseChair{ - ID: chair.ID, - Name: chair.Name, - Model: chair.Model, - Stats: stats, - } - } - - if yetSentRideStatus.ID != "" { - _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) - return - } + response.Chair = &appGetNotificationResponseChair{ + ID: chair.ID, + Name: chair.Name, + Model: chair.Model, + Stats: stats, } + } - if err := tx.Commit(); err != nil { + if yetSentRideStatus.ID != "" { + _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) + if err != nil { writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to commit", "error", err) + slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) return } - tx = nil + } - w.Write([]byte("data: ")) - if err := json.NewEncoder(w).Encode(response); err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to write response to http writer", "error", err, "response", response) - return - } - w.Write([]byte("\n\n")) - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - case <-ctx.Done(): - break + if err := tx.Commit(); err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to commit", "error", err) + return + } + tx = nil + + w.Write([]byte("data: ")) + if err := json.NewEncoder(w).Encode(response); err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to write response to http writer", "error", err, "response", response) + return + } + w.Write([]byte("\n\n")) + if f, ok := w.(http.Flusher); ok { + f.Flush() } } } From 3337e8c6db688f6ea38c95795a42ba6d3633a0de Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 10:36:32 +0900 Subject: [PATCH 17/20] Improve chan type --- home/isucon/webapp/go/app_handlers.go | 68 +++---------------- home/isucon/webapp/go/chair_handlers.go | 87 ++----------------------- 2 files changed, 14 insertions(+), 141 deletions(-) diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index 5d89f045..86d368f5 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -440,42 +440,8 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { if !ok { return } - - ctx := context.Background() - - tx, err := db.Beginx() - defer tx.Rollback() - - chair := GetChair(ride.ChairID.String) - stats, err := getChairStats(ctx, tx, chair.ID) - if err != nil { - slog.Warn("failed to calculate dscounted fare", "error", err, "chair_id", chair.ID) - return - } - - queue := v.(chan (*appGetNotificationResponseData)) - queue <- &appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: "MATCHING", - Chair: &appGetNotificationResponseChair{ - ID: chair.ID, - Name: chair.Name, - Model: chair.Model, - Stats: stats, - }, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), - } - slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) + queue := v.(chan (struct{})) + queue <- struct{}{} }() writeJSON(w, http.StatusAccepted, &appPostRidesResponse{ @@ -677,25 +643,8 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { if !ok { return } - - queue := v.(chan (*appGetNotificationResponseData)) - queue <- &appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: "COMPLETED", - Chair: nil, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), - } - slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) + queue := v.(chan (struct{})) + queue <- struct{}{} }() writeJSON(w, http.StatusOK, &appPostRideEvaluationResponse{ @@ -727,7 +676,7 @@ type appGetNotificationResponseChairStats struct { } // key: user_id -// value: chan(appGetNotificationResponseData) +// value: chan(struct{}) var userNotificationQueue sync.Map func appGetNotification(w http.ResponseWriter, r *http.Request) { @@ -743,9 +692,9 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { } var ( - ticker = time.NewTicker(time.Second * 10) + ticker = time.NewTicker(time.Second * 1) first = make(chan (struct{}), 1) - queue chan (*appGetNotificationResponseData) + queue chan (struct{}) ) first <- struct{}{} @@ -753,8 +702,11 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { for { select { case <-queue: + slog.Info("notify to user", "select", "queue") case <-first: + slog.Info("notify to user", "select", "first") case <-ticker.C: + slog.Info("notify to user", "select", "ticker") case <-ctx.Done(): break } diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index 810a0c79..1152c4b5 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -1,7 +1,6 @@ package main import ( - "context" "database/sql" "encoding/json" "errors" @@ -223,47 +222,8 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { if !ok { return } - - ctx := context.Background() - - tx, err := db.Beginx() - defer tx.Rollback() - - fare, err := calculateDiscountedFare(ctx, tx, ride.UserID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) - if err != nil { - slog.Warn("failed to calculate dscounted fare", "error", err, "user_id", ride.UserID, "ride", ride) - return - } - - stats, err := getChairStats(ctx, tx, chair.ID) - if err != nil { - slog.Warn("failed to calculate dscounted fare", "error", err, "chair_id", chair.ID) - return - } - - queue := v.(chan (*appGetNotificationResponseData)) - queue <- &appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: status, - Chair: &appGetNotificationResponseChair{ - ID: chair.ID, - Name: chair.Name, - Model: chair.Model, - Stats: stats, - }, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), - } - slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) + queue := v.(chan (struct{})) + queue <- struct{}{} }() writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ @@ -482,47 +442,8 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { if !ok { return } - - ctx := context.Background() - - tx, err := db.Beginx() - defer tx.Rollback() - - fare, err := calculateDiscountedFare(ctx, tx, ride.UserID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) - if err != nil { - slog.Warn("failed to calculate dscounted fare", "error", err, "user_id", ride.UserID, "ride", ride) - return - } - - stats, err := getChairStats(ctx, tx, chair.ID) - if err != nil { - slog.Warn("failed to calculate dscounted fare", "error", err, "chair_id", chair.ID) - return - } - - queue := v.(chan (*appGetNotificationResponseData)) - queue <- &appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: status, - Chair: &appGetNotificationResponseChair{ - ID: chair.ID, - Name: chair.Name, - Model: chair.Model, - Stats: stats, - }, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), - } - slog.Info("push to userNotificationQueue", "ride_id", ride.ID, "user_id", ride.UserID) + queue := v.(chan (struct{})) + queue <- struct{}{} }() w.WriteHeader(http.StatusNoContent) From 2e2e7680fac279ee8ccc9dd425f8345a7c06717e Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 11:06:33 +0900 Subject: [PATCH 18/20] Do not sleep insert chair location --- home/isucon/webapp/go/chair_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index 1152c4b5..bb0f27cf 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -148,7 +148,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { } InsertChairLocation(&cl) go func() { - time.Sleep(90 * time.Second) + //time.Sleep(90 * time.Second) db.ExecContext( ctx, `INSERT INTO chair_locations (id, chair_id, latitude, longitude, created_at) VALUES (?, ?, ?, ?, ?)`, From f573a3f7eba965e4856a7f233c4410937af5c06f Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 11:10:28 +0900 Subject: [PATCH 19/20] =?UTF-8?q?Revert=20"chairID=E3=81=AF=E9=96=A2?= =?UTF-8?q?=E4=BF=82=E3=81=AA=E3=81=97"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 82e3c6c56e3f99e1cb27eb6f9aa24e3bbdf86955. --- home/isucon/webapp/go/internal_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/internal_handlers.go b/home/isucon/webapp/go/internal_handlers.go index 661953cd..ed581ea7 100644 --- a/home/isucon/webapp/go/internal_handlers.go +++ b/home/isucon/webapp/go/internal_handlers.go @@ -32,7 +32,7 @@ func internalGetMatching(w http.ResponseWriter, r *http.Request) { notCompletedChairIDsSet[id] = struct{}{} } notCompletedChairIDs = []string{} - if err := tx.SelectContext(ctx, ¬CompletedChairIDs, `SELECT chair_id FROM rides where updated_at > NOW(6) - INTERVAL 3.5 SECOND`); err != nil { + if err := tx.SelectContext(ctx, ¬CompletedChairIDs, `SELECT chair_id FROM rides where updated_at > NOW(6) - INTERVAL 3.5 SECOND AND chair_id IS NOT NULL`); err != nil { writeError(w, http.StatusInternalServerError, err) return } From 7e82f400d3e290e4d3d6c83b561cbb933deaf5aa Mon Sep 17 00:00:00 2001 From: bgpat Date: Wed, 11 Dec 2024 11:13:28 +0900 Subject: [PATCH 20/20] Revert "Do not sleep insert chair location" This reverts commit 2e2e7680fac279ee8ccc9dd425f8345a7c06717e. --- home/isucon/webapp/go/chair_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index bb0f27cf..1152c4b5 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -148,7 +148,7 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { } InsertChairLocation(&cl) go func() { - //time.Sleep(90 * time.Second) + time.Sleep(90 * time.Second) db.ExecContext( ctx, `INSERT INTO chair_locations (id, chair_id, latitude, longitude, created_at) VALUES (?, ?, ?, ?, ?)`,