Skip to content
This repository was archived by the owner on Mar 28, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 131 additions & 93 deletions home/isucon/webapp/go/app_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"net/http"
"strconv"
"sync"
"time"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -433,6 +434,16 @@ func appPostRides(w http.ResponseWriter, r *http.Request) {
return
}

// ユーザー通知
func() {
v, ok := userNotificationQueue.Load(ride.UserID)
if !ok {
return
}
queue := v.(chan (struct{}))
queue <- struct{}{}
}()

writeJSON(w, http.StatusAccepted, &appPostRidesResponse{
RideID: rideID,
Fare: fare,
Expand Down Expand Up @@ -626,6 +637,16 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) {
return
}

// ユーザー通知
func() {
v, ok := userNotificationQueue.Load(ride.UserID)
if !ok {
return
}
queue := v.(chan (struct{}))
queue <- struct{}{}
}()

writeJSON(w, http.StatusOK, &appPostRideEvaluationResponse{
CompletedAt: ride.UpdatedAt.UnixMilli(),
})
Expand Down Expand Up @@ -654,6 +675,10 @@ type appGetNotificationResponseChairStats struct {
TotalEvaluationAvg float64 `json:"total_evaluation_avg"`
}

// key: user_id
// value: chan(struct{})
var userNotificationQueue sync.Map

func appGetNotification(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
user := ctx.Value("user").(*User)
Expand All @@ -666,124 +691,137 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) {
f.Flush()
}

ticker := time.NewTicker(time.Second * 1)
var (
ticker = time.NewTicker(time.Second * 1)
first = make(chan (struct{}), 1)
queue chan (struct{})
)

first <- struct{}{}

for {
select {
case <-queue:
slog.Info("notify to user", "select", "queue")
case <-first:
slog.Info("notify to user", "select", "first")
case <-ticker.C:
tx, err := db.Beginx()
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to begin tx", "error", err)
return
slog.Info("notify to user", "select", "ticker")
case <-ctx.Done():
break
}

tx, err := db.Beginx()
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to begin tx", "error", err)
return
}
defer func() {
if tx != nil {
tx.Rollback()
}
defer func() {
if tx != nil {
tx.Rollback()
}
}()

ride := &Ride{}
if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
tx.Rollback()
tx = nil
slog.Info("no rides", "user_id", user.ID)
continue
}
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to get rides", "error", err, "user_id", user.ID)
return
}()

ride := &Ride{}
if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
tx.Rollback()
tx = nil
slog.Info("no rides", "user_id", user.ID)
continue
}
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to get rides", "error", err, "user_id", user.ID)
return
}

yetSentRideStatus := RideStatus{}
status := ""
if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
slog.Info("no ride_status", "ride_id", ride.ID)
status, err = getLatestRideStatus(ctx, tx, ride.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err)
return
}
} else {
yetSentRideStatus := RideStatus{}
status := ""
if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
slog.Info("no ride_status", "ride_id", ride.ID)
status, err = getLatestRideStatus(ctx, tx, ride.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to get rides", "error", err, "ride_id", ride.ID)
slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err)
return
}
} else {
status = yetSentRideStatus.Status
}

fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride)
slog.Error("failed to get rides", "error", err, "ride_id", ride.ID)
return
}
} else {
status = yetSentRideStatus.Status
}

response := appGetNotificationResponseData{
RideID: ride.ID,
PickupCoordinate: Coordinate{
Latitude: ride.PickupLatitude,
Longitude: ride.PickupLongitude,
},
DestinationCoordinate: Coordinate{
Latitude: ride.DestinationLatitude,
Longitude: ride.DestinationLongitude,
},
Fare: fare,
Status: status,
CreatedAt: ride.CreatedAt.UnixMilli(),
UpdateAt: ride.UpdatedAt.UnixMilli(),
}
fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride)
return
}

if ride.ChairID.Valid {
chair := GetChair(ride.ChairID.String)
response := appGetNotificationResponseData{
RideID: ride.ID,
PickupCoordinate: Coordinate{
Latitude: ride.PickupLatitude,
Longitude: ride.PickupLongitude,
},
DestinationCoordinate: Coordinate{
Latitude: ride.DestinationLatitude,
Longitude: ride.DestinationLongitude,
},
Fare: fare,
Status: status,
CreatedAt: ride.CreatedAt.UnixMilli(),
UpdateAt: ride.UpdatedAt.UnixMilli(),
}

stats, err := getChairStats(ctx, tx, chair.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID)
return
}
if ride.ChairID.Valid {
chair := GetChair(ride.ChairID.String)

response.Chair = &appGetNotificationResponseChair{
ID: chair.ID,
Name: chair.Name,
Model: chair.Model,
Stats: stats,
}
stats, err := getChairStats(ctx, tx, chair.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID)
return
}

if yetSentRideStatus.ID != "" {
_, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID)
return
}
response.Chair = &appGetNotificationResponseChair{
ID: chair.ID,
Name: chair.Name,
Model: chair.Model,
Stats: stats,
}
}

if err := tx.Commit(); err != nil {
if yetSentRideStatus.ID != "" {
_, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to commit", "error", err)
slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID)
return
}
tx = nil
}

w.Write([]byte("data: "))
if err := json.NewEncoder(w).Encode(response); err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to write response to http writer", "error", err, "response", response)
return
}
w.Write([]byte("\n\n"))
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
case <-ctx.Done():
break
if err := tx.Commit(); err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to commit", "error", err)
return
}
tx = nil

w.Write([]byte("data: "))
if err := json.NewEncoder(w).Encode(response); err != nil {
writeError(w, http.StatusInternalServerError, err)
slog.Error("failed to write response to http writer", "error", err, "response", response)
return
}
w.Write([]byte("\n\n"))
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
}
Expand Down
30 changes: 28 additions & 2 deletions home/isucon/webapp/go/chair_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,14 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) {
}

ride := &Ride{}
var status string
if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE chair_id = ? ORDER BY updated_at DESC LIMIT 1`, chair.ID); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
writeError(w, http.StatusInternalServerError, err)
return
}
} else {
status, err := getLatestRideStatus(ctx, tx, ride.ID)
status, err = getLatestRideStatus(ctx, tx, ride.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
return
Expand All @@ -197,13 +198,15 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err)
return
}
status = "PICKUP"
}

if req.Latitude == ride.DestinationLatitude && req.Longitude == ride.DestinationLongitude && status == "CARRYING" {
if _, err := tx.ExecContext(ctx, "INSERT INTO ride_statuses (id, ride_id, status) VALUES (?, ?, ?)", ulid.Make().String(), ride.ID, "ARRIVED"); err != nil {
writeError(w, http.StatusInternalServerError, err)
return
}
status = "ARRIVED"
}
}
}
Expand All @@ -213,6 +216,16 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) {
return
}

// ユーザー通知
func() {
v, ok := userNotificationQueue.Load(ride.UserID)
if !ok {
return
}
queue := v.(chan (struct{}))
queue <- struct{}{}
}()

writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{
RecordedAt: location.CreatedAt.UnixMilli(),
})
Expand Down Expand Up @@ -389,16 +402,18 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) {
return
}

var status string
switch req.Status {
// Acknowledge the ride
case "ENROUTE":
if _, err := tx.ExecContext(ctx, "INSERT INTO ride_statuses (id, ride_id, status) VALUES (?, ?, ?)", ulid.Make().String(), ride.ID, "ENROUTE"); err != nil {
writeError(w, http.StatusInternalServerError, err)
return
}
status = "ENROUTE"
// After Picking up user
case "CARRYING":
status, err := getLatestRideStatus(ctx, tx, ride.ID)
status, err = getLatestRideStatus(ctx, tx, ride.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err)
return
Expand All @@ -411,6 +426,7 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err)
return
}
status = "CARRYING"
default:
writeError(w, http.StatusBadRequest, errors.New("invalid status"))
}
Expand All @@ -420,5 +436,15 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) {
return
}

// ユーザー通知
func() {
v, ok := userNotificationQueue.Load(ride.UserID)
if !ok {
return
}
queue := v.(chan (struct{}))
queue <- struct{}{}
}()

w.WriteHeader(http.StatusNoContent)
}