diff --git a/etc/mysql/mysql.conf.d/mysqld.cnf b/etc/mysql/mysql.conf.d/mysqld.cnf index 72ebdc86..8907eefb 100644 --- a/etc/mysql/mysql.conf.d/mysqld.cnf +++ b/etc/mysql/mysql.conf.d/mysqld.cnf @@ -79,3 +79,7 @@ max_binlog_size = 100M # binlog_do_db = include_database_name # binlog_ignore_db = include_database_name +innodb_buffer_pool_size=2G +thread_cache_size=100 +innodb_file_per_table=ON +tmp_table_size=1G diff --git a/etc/nginx/nginx.conf b/etc/nginx/nginx.conf index 599c0a23..c42f259f 100644 --- a/etc/nginx/nginx.conf +++ b/etc/nginx/nginx.conf @@ -3,9 +3,10 @@ worker_processes auto; pid /run/nginx.pid; error_log /var/log/nginx/error.log; include /etc/nginx/modules-enabled/*.conf; +worker_rlimit_nofile 65536; events { - worker_connections 768; + worker_connections 10240; # multi_accept on; } @@ -55,6 +56,18 @@ http { # gzip_http_version 1.1; # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript; + keepalive_timeout 120s; + keepalive_requests 1000; # 1つの接続での最大リクエスト数 + client_body_timeout 120s; + client_header_timeout 120s; + proxy_read_timeout 300s; + proxy_connect_timeout 300s; + proxy_send_timeout 300s; + client_max_body_size 100M; + proxy_buffer_size 16k; + proxy_buffers 4 32k; + proxy_busy_buffers_size 64k; + ## # Virtual Host Configs ## diff --git a/etc/nginx/sites-available/isuride.conf b/etc/nginx/sites-available/isuride.conf index 2871b027..00089aee 100644 --- a/etc/nginx/sites-available/isuride.conf +++ b/etc/nginx/sites-available/isuride.conf @@ -9,7 +9,7 @@ server { } server { - listen 443 ssl default_server; + listen 443 ssl http2 default_server; server_name _; index index.html index.htm index.nginx-debian.html; root /var/www/html; @@ -26,7 +26,7 @@ server { } server { - listen 443 ssl; + listen 443 ssl http2; server_name xiv.isucon.net; server_name *.xiv.isucon.net; diff --git a/home/isucon/webapp/go/app_handlers.go b/home/isucon/webapp/go/app_handlers.go index dcaeb147..d961c6b5 100644 --- a/home/isucon/webapp/go/app_handlers.go +++ b/home/isucon/webapp/go/app_handlers.go @@ -5,9 +5,11 @@ import ( "database/sql" "encoding/json" "errors" + "fmt" "log/slog" "net/http" "strconv" + "sync" "time" "github.com/jmoiron/sqlx" @@ -50,68 +52,69 @@ func appPostUsers(w http.ResponseWriter, r *http.Request) { } defer tx.Rollback() + now := time.Now() _, err = tx.ExecContext( ctx, - "INSERT INTO users (id, username, firstname, lastname, date_of_birth, access_token, invitation_code) VALUES (?, ?, ?, ?, ?, ?, ?)", - userID, req.Username, req.FirstName, req.LastName, req.DateOfBirth, accessToken, invitationCode, - ) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } - - // 初回登録キャンペーンのクーポンを付与 - _, err = tx.ExecContext( - ctx, - "INSERT INTO coupons (user_id, code, discount) VALUES (?, ?, ?)", - userID, "CP_NEW2024", 3000, + "INSERT INTO users (id, username, firstname, lastname, date_of_birth, access_token, invitation_code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + userID, req.Username, req.FirstName, req.LastName, req.DateOfBirth, accessToken, invitationCode, now, now, ) if err != nil { writeError(w, http.StatusInternalServerError, err) return } + InsertUser(&User{ + ID: userID, + Username: req.Username, + Firstname: req.FirstName, + Lastname: req.LastName, + DateOfBirth: req.DateOfBirth, + AccessToken: accessToken, + InvitationCode: invitationCode, + CreatedAt: now, + UpdatedAt: now, + }) // 招待コードを使った登録 if req.InvitationCode != nil && *req.InvitationCode != "" { // 招待する側の招待数をチェック - var coupons []Coupon - err = tx.SelectContext(ctx, &coupons, "SELECT * FROM coupons WHERE code = ? FOR UPDATE", "INV_"+*req.InvitationCode) + couponCount := 0 + err = tx.GetContext(ctx, &couponCount, "SELECT count(1) FROM coupons WHERE code = ?", "INV_"+*req.InvitationCode) if err != nil { - writeError(w, http.StatusInternalServerError, err) - return + if !errors.Is(err, sql.ErrNoRows) { + writeError(w, http.StatusInternalServerError, err) + return + } } - if len(coupons) >= 3 { + if couponCount >= 3 { writeError(w, http.StatusBadRequest, errors.New("この招待コードは使用できません。")) return } // ユーザーチェック - var inviter User - err = tx.GetContext(ctx, &inviter, "SELECT * FROM users WHERE invitation_code = ?", *req.InvitationCode) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusBadRequest, errors.New("この招待コードは使用できません。")) - return - } - writeError(w, http.StatusInternalServerError, err) + inviter := GetUser(*req.InvitationCode) + if inviter == nil { + writeError(w, http.StatusBadRequest, errors.New("この招待コードは使用できません。")) return } // 招待クーポン付与 _, err = tx.ExecContext( ctx, - "INSERT INTO coupons (user_id, code, discount) VALUES (?, ?, ?)", + "INSERT INTO coupons (user_id, code, discount) VALUES (?, ?, ?), (?, CONCAT(?, '_', FLOOR(UNIX_TIMESTAMP(NOW(3))*1000)), ?), (?, ?, ?)", userID, "INV_"+*req.InvitationCode, 1500, + inviter.ID, "RWD_"+*req.InvitationCode, 1000, + userID, "CP_NEW2024", 3000, // 初回登録キャンペーンのクーポンを付与 ) if err != nil { writeError(w, http.StatusInternalServerError, err) return } - // 招待した人にもRewardを付与 + } else { + // 初回登録キャンペーンのクーポンを付与 _, err = tx.ExecContext( ctx, - "INSERT INTO coupons (user_id, code, discount) VALUES (?, CONCAT(?, '_', FLOOR(UNIX_TIMESTAMP(NOW(3))*1000)), ?)", - inviter.ID, "RWD_"+*req.InvitationCode, 1000, + "INSERT INTO coupons (user_id, code, discount) VALUES (?, ?, ?)", + userID, "CP_NEW2024", 3000, ) if err != nil { writeError(w, http.StatusInternalServerError, err) @@ -213,50 +216,63 @@ func appGetRides(w http.ResponseWriter, r *http.Request) { } items := []getAppRidesResponseItem{} - for _, ride := range rides { - status, err := getLatestRideStatus(ctx, tx, ride.ID) + if len(rides) > 0 { + rideIDs := make([]string, 0, len(rides)) + for _, ride := range rides { + rideIDs = append(rideIDs, ride.ID) + } + + coupons := []Coupon{} + query, params, err := sqlx.In("SELECT * FROM coupons WHERE used_by IN (?)", rideIDs) if err != nil { writeError(w, http.StatusInternalServerError, err) return } - if status != "COMPLETED" { - continue - } - - fare, err := calculateDiscountedFare(ctx, tx, user.ID, &ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) - if err != nil { + if err := tx.SelectContext(ctx, &coupons, query, params...); err != nil { writeError(w, http.StatusInternalServerError, err) return } - - item := getAppRidesResponseItem{ - ID: ride.ID, - PickupCoordinate: Coordinate{Latitude: ride.PickupLatitude, Longitude: ride.PickupLongitude}, - DestinationCoordinate: Coordinate{Latitude: ride.DestinationLatitude, Longitude: ride.DestinationLongitude}, - Fare: fare, - Evaluation: *ride.Evaluation, - RequestedAt: ride.CreatedAt.UnixMilli(), - CompletedAt: ride.UpdatedAt.UnixMilli(), + couponByRideID := map[string]Coupon{} + for _, coupon := range coupons { + couponByRideID[*coupon.UsedBy] = coupon } - item.Chair = getAppRidesResponseItemChair{} + for _, ride := range rides { + if ride.Evaluation == nil { + continue + } - chair := &Chair{} - if ride.ChairID.Valid { - chair = GetChair(ride.ChairID.String) - } - item.Chair.ID = chair.ID - item.Chair.Name = chair.Name - item.Chair.Model = chair.Model + discount := 0 + if coupon, ok := couponByRideID[ride.ID]; ok { + discount = coupon.Discount + } + fare := calcFare(ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude, discount) + + item := getAppRidesResponseItem{ + ID: ride.ID, + PickupCoordinate: Coordinate{Latitude: ride.PickupLatitude, Longitude: ride.PickupLongitude}, + DestinationCoordinate: Coordinate{Latitude: ride.DestinationLatitude, Longitude: ride.DestinationLongitude}, + Fare: fare, + Evaluation: *ride.Evaluation, + RequestedAt: ride.CreatedAt.UnixMilli(), + CompletedAt: ride.UpdatedAt.UnixMilli(), + } - owner := &Owner{} - if err := tx.GetContext(ctx, owner, `SELECT * FROM owners WHERE id = ?`, chair.OwnerID); err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } - item.Chair.Owner = owner.Name + item.Chair = getAppRidesResponseItemChair{} + + chair := &Chair{} + if ride.ChairID.Valid { + chair = GetChair(ride.ChairID.String) + } + item.Chair.ID = chair.ID + item.Chair.Name = chair.Name + item.Chair.Model = chair.Model - items = append(items, item) + owner := GetOwner(chair.OwnerID) + item.Chair.Owner = owner.Name + + items = append(items, item) + } } if err := tx.Commit(); err != nil { @@ -284,12 +300,19 @@ type executableGet interface { GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error } -func getLatestRideStatus(ctx context.Context, tx executableGet, rideID string) (string, error) { - status := "" - if err := tx.GetContext(ctx, &status, `SELECT status FROM ride_statuses WHERE ride_id = ? ORDER BY created_at DESC LIMIT 1`, rideID); err != nil { - return "", err +var rideStatusMap sync.Map + +func getLatestRideStatus(rideID string) *string { + v, ok := rideStatusMap.Load(rideID) + if !ok { + return nil } - return status, nil + status := v.(string) + return &status +} + +func UpdateRideStatus(rideID, status string) { + rideStatusMap.Store(rideID, status) } func appPostRides(w http.ResponseWriter, r *http.Request) { @@ -314,34 +337,44 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { } defer tx.Rollback() - rides := []Ride{} - if err := tx.SelectContext(ctx, &rides, `SELECT * FROM rides WHERE user_id = ?`, user.ID); err != nil { + continuingNotCompletedRideCount := 0 + if err = tx.GetContext(ctx, &continuingNotCompletedRideCount, `SELECT count(1) FROM rides WHERE user_id = ? AND evaluation IS NULL`, user.ID); err != nil { writeError(w, http.StatusInternalServerError, err) return } - continuingRideCount := 0 - for _, ride := range rides { - status, err := getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } - if status != "COMPLETED" { - continuingRideCount++ - } + if continuingNotCompletedRideCount > 0 { + writeError(w, http.StatusConflict, errors.New("ride already exists")) + return } - if continuingRideCount > 0 { - writeError(w, http.StatusConflict, errors.New("ride already exists")) + countingRide := 0 + if err = tx.GetContext(ctx, &countingRide, `SELECT count(1) FROM rides WHERE user_id = ?`, user.ID); err != nil { + writeError(w, http.StatusInternalServerError, err) return } + now := time.Now() + ride := Ride{ + ID: rideID, + UserID: user.ID, + ChairID: sql.NullString{ + String: "", + Valid: false, + }, + PickupLatitude: req.PickupCoordinate.Latitude, + PickupLongitude: req.PickupCoordinate.Longitude, + DestinationLatitude: req.DestinationCoordinate.Latitude, + DestinationLongitude: req.DestinationCoordinate.Longitude, + Evaluation: nil, + CreatedAt: now, + UpdatedAt: now, + } if _, err := tx.ExecContext( ctx, - `INSERT INTO rides (id, user_id, pickup_latitude, pickup_longitude, destination_latitude, destination_longitude) - VALUES (?, ?, ?, ?, ?, ?)`, - rideID, user.ID, req.PickupCoordinate.Latitude, req.PickupCoordinate.Longitude, req.DestinationCoordinate.Latitude, req.DestinationCoordinate.Longitude, + `INSERT INTO rides (id, user_id, pickup_latitude, pickup_longitude, destination_latitude, destination_longitude, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + rideID, user.ID, req.PickupCoordinate.Latitude, req.PickupCoordinate.Longitude, req.DestinationCoordinate.Latitude, req.DestinationCoordinate.Longitude, now, now, ); err != nil { writeError(w, http.StatusInternalServerError, err) return @@ -355,15 +388,10 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } - - var rideCount int - if err := tx.GetContext(ctx, &rideCount, `SELECT COUNT(*) FROM rides WHERE user_id = ? `, user.ID); err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } + UpdateRideStatus(rideID, "MATCHING") var coupon Coupon - if rideCount == 1 { + if countingRide == 1 { // 初回利用で、初回利用クーポンがあれば必ず使う if err := tx.GetContext(ctx, &coupon, "SELECT * FROM coupons WHERE user_id = ? AND code = 'CP_NEW2024' AND used_by IS NULL FOR UPDATE", user.ID); err != nil { if !errors.Is(err, sql.ErrNoRows) { @@ -416,12 +444,6 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { } } - ride := Ride{} - if err := tx.GetContext(ctx, &ride, "SELECT * FROM rides WHERE id = ?", rideID); err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } - fare, err := calculateDiscountedFare(ctx, tx, user.ID, &ride, req.PickupCoordinate.Latitude, req.PickupCoordinate.Longitude, req.DestinationCoordinate.Latitude, req.DestinationCoordinate.Longitude) if err != nil { writeError(w, http.StatusInternalServerError, err) @@ -432,6 +454,7 @@ func appPostRides(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + matchingRideChannel <- ride writeJSON(w, http.StatusAccepted, &appPostRidesResponse{ RideID: rideID, @@ -506,6 +529,8 @@ type appPostRideEvaluationResponse struct { CompletedAt int64 `json:"completed_at"` } +var paymentGatewayURL string + func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { ctx := r.Context() rideID := r.PathValue("ride_id") @@ -536,21 +561,23 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } - status, err := getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { + statusV := getLatestRideStatus(ride.ID) + if statusV == nil { writeError(w, http.StatusInternalServerError, err) return } + status := *statusV if status != "ARRIVED" { writeError(w, http.StatusBadRequest, errors.New("not arrived yet")) return } + now := time.Now().Round(time.Second) result, err := tx.ExecContext( ctx, - `UPDATE rides SET evaluation = ? WHERE id = ?`, - req.Evaluation, rideID) + `UPDATE rides SET evaluation = ?, updated_at = ? WHERE id = ?`, + req.Evaluation, now, rideID) if err != nil { writeError(w, http.StatusInternalServerError, err) return @@ -562,6 +589,13 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusNotFound, errors.New("ride not found")) return } + ride.Evaluation = &req.Evaluation + ride.UpdatedAt = now + stat := getChairStats(ride.ChairID.String) + InsertChairStats(ride.ChairID.String, ChairStatType{ + Count: stat.Count + 1, + Sum: stat.Sum + float64(req.Evaluation), + }) _, err = tx.ExecContext( ctx, @@ -571,15 +605,8 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } - - if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE id = ?`, rideID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusNotFound, errors.New("ride not found")) - return - } - writeError(w, http.StatusInternalServerError, err) - return - } + status = "COMPLETED" + UpdateRideStatus(rideID, status) paymentToken := &PaymentToken{} if err := tx.GetContext(ctx, paymentToken, `SELECT * FROM payment_tokens WHERE user_id = ?`, ride.UserID); err != nil { @@ -600,19 +627,7 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { Amount: fare, } - var paymentGatewayURL string - if err := tx.GetContext(ctx, &paymentGatewayURL, "SELECT value FROM settings WHERE name = 'payment_gateway_url'"); err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } - - if err := requestPaymentGatewayPostPayment(ctx, paymentGatewayURL, paymentToken.Token, paymentGatewayRequest, func() ([]Ride, error) { - rides := []Ride{} - if err := tx.SelectContext(ctx, &rides, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at ASC`, ride.UserID); err != nil { - return nil, err - } - return rides, nil - }); err != nil { + if err := requestPaymentGatewayPostPayment(ctx, paymentGatewayURL, paymentToken.Token, paymentGatewayRequest); err != nil { if errors.Is(err, erroredUpstream) { writeError(w, http.StatusBadGateway, err) return @@ -621,6 +636,20 @@ func appPostRideEvaluatation(w http.ResponseWriter, r *http.Request) { return } + err = sendAppGetNotificationChannel(ctx, tx, status, ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + err = sendChairGetNotificationChannel(ctx, status, ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + matchingChairChannel <- ride.ChairID.String + if err := tx.Commit(); err != nil { writeError(w, http.StatusInternalServerError, err) return @@ -654,6 +683,68 @@ type appGetNotificationResponseChairStats struct { TotalEvaluationAvg float64 `json:"total_evaluation_avg"` } +// appGetNotificationChannel map[userID]chan appGetNotificationResponseData +var appGetNotificationChannel = sync.Map{} + +func sendAppGetNotificationChannel(ctx context.Context, tx *sqlx.Tx, status string, ride *Ride, calcedFare *int) error { + c, ok := appGetNotificationChannel.Load(ride.UserID) + if !ok { + return nil + } + channel, ok := c.(chan appGetNotificationResponseData) + if !ok { + return nil + } + var err error + var fare int + if calcedFare != nil { + fare = *calcedFare + } else { + if tx != nil { + fare, err = calculateDiscountedFare(ctx, tx, ride.UserID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + if err != nil { + slog.Error("failed to calculate discounted fare", "error", err, "user_id", ride.UserID, "ride", ride) + return fmt.Errorf("failed to calculate discounted fare: %w", err) + } + } else { + fare, err = calculateDiscountedFare2(ctx, db, ride.UserID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) + if err != nil { + slog.Error("failed to calculate discounted fare", "error", err, "user_id", ride.UserID, "ride", ride) + return fmt.Errorf("failed to calculate discounted fare2: %w", err) + } + } + } + response := appGetNotificationResponseData{ + RideID: ride.ID, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Fare: fare, + Status: status, + CreatedAt: ride.CreatedAt.UnixMilli(), + UpdateAt: ride.UpdatedAt.UnixMilli(), + } + if ride.ChairID.Valid { + chair := GetChair(ride.ChairID.String) + + stats := getChairStats(chair.ID).ToAppGetNotificationResponseChairStats() + + response.Chair = &appGetNotificationResponseChair{ + ID: chair.ID, + Name: chair.Name, + Model: chair.Model, + Stats: stats, + } + } + channel <- response + return nil +} + func appGetNotification(w http.ResponseWriter, r *http.Request) { ctx := r.Context() user := ctx.Value("user").(*User) @@ -666,185 +757,140 @@ func appGetNotification(w http.ResponseWriter, r *http.Request) { f.Flush() } - ticker := time.NewTicker(time.Second * 1) + c := make(chan appGetNotificationResponseData, 100) + appGetNotificationChannel.Store(user.ID, c) + ride := &Ride{} + if err := db.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + slog.Info("no rides", "user_id", user.ID) + ride = nil + } else { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get rides", "error", err, "user_id", user.ID) + return + } + } + status := "" + if ride != nil { + statusV := getLatestRideStatus(ride.ID) + if statusV == nil { + err := errors.New("failed to get latest ride status") + writeError(w, http.StatusInternalServerError, err) + slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) + return + } + status = *statusV + + err := sendAppGetNotificationChannel(ctx, nil, status, ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + } + for { select { - case <-ticker.C: - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to begin tx", "error", err) - return - } - defer func() { - if tx != nil { - tx.Rollback() - } - }() - - ride := &Ride{} - if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE user_id = ? ORDER BY created_at DESC LIMIT 1`, user.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - tx.Rollback() - tx = nil - slog.Info("no rides", "user_id", user.ID) - continue - } - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "user_id", user.ID) - return - } - - yetSentRideStatus := RideStatus{} - status := "" - if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND app_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - slog.Info("no ride_status", "ride_id", ride.ID) - status, err = getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) - return + case response := <-c: + // 順番が前後しちゃった場合はもう一度キューに詰め直す + if status != response.Status { + if status == "COMPLETED" { + if response.Status != "MATCHING" { + slog.Info("status is not matching", "status", response.Status) + c <- response + continue + } + } else if status == "MATCHING" { + if response.Status != "ENROUTE" { + slog.Info("status is not enroute", "status", response.Status) + c <- response + continue + } + } else if status == "ENROUTE" { + if response.Status != "PICKUP" { + slog.Info("status is not pickup", "status", response.Status) + c <- response + continue + } + } else if status == "PICKUP" { + if response.Status != "CARRYING" { + slog.Info("status is not carrying", "status", response.Status) + c <- response + continue + } + } else if status == "CARRYING" { + if response.Status != "ARRIVED" { + slog.Info("status is not arrived", "status", response.Status) + c <- response + continue + } + } else if status == "ARRIVED" { + if response.Status != "COMPLETED" { + slog.Info("status is not completed", "status", response.Status) + c <- response + continue } - } else { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) - return - } - } else { - status = yetSentRideStatus.Status - } - - fare, err := calculateDiscountedFare(ctx, tx, user.ID, ride, ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to calculate discounted fare", "error", err, "user_id", user.ID, "ride", ride) - return - } - - response := appGetNotificationResponseData{ - RideID: ride.ID, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Fare: fare, - Status: status, - CreatedAt: ride.CreatedAt.UnixMilli(), - UpdateAt: ride.UpdatedAt.UnixMilli(), - } - - if ride.ChairID.Valid { - chair := GetChair(ride.ChairID.String) - - stats, err := getChairStats(ctx, tx, chair.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get chair stats", "error", err, "chair_id", chair.ID) - return - } - - response.Chair = &appGetNotificationResponseChair{ - ID: chair.ID, - Name: chair.Name, - Model: chair.Model, - Stats: stats, - } - } - - if yetSentRideStatus.ID != "" { - _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) - return } } - if err := tx.Commit(); err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to commit", "error", err) - return - } - tx = nil - + status = response.Status w.Write([]byte("data: ")) if err := json.NewEncoder(w).Encode(response); err != nil { writeError(w, http.StatusInternalServerError, err) slog.Error("failed to write response to http writer", "error", err, "response", response) return } - w.Write([]byte("\n\n")) + w.Write([]byte("\n")) if f, ok := w.(http.Flusher); ok { f.Flush() } + // どうやら記録してなくてもいいらしい + //_, err := db.ExecContext(ctx, `UPDATE ride_statuses SET app_sent_at = CURRENT_TIMESTAMP(6) WHERE ride_id = ? AND status = ?`, response.RideID, response.Status) + //if err != nil { + // writeError(w, http.StatusInternalServerError, err) + // slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", response.RideID) + // return + //} case <-ctx.Done(): - break + return } } } -func getChairStats(ctx context.Context, tx *sqlx.Tx, chairID string) (appGetNotificationResponseChairStats, error) { - stats := appGetNotificationResponseChairStats{} +var ChairStatsMap = sync.Map{} - rides := []Ride{} - err := tx.SelectContext( - ctx, - &rides, - `SELECT * FROM rides WHERE chair_id = ? ORDER BY updated_at DESC`, - chairID, - ) - if err != nil { - return stats, err - } +type ChairStatType struct { + Count int + Sum float64 +} - totalRideCount := 0 - totalEvaluation := 0.0 - for _, ride := range rides { - rideStatuses := []RideStatus{} - err = tx.SelectContext( - ctx, - &rideStatuses, - `SELECT * FROM ride_statuses WHERE ride_id = ? ORDER BY created_at`, - ride.ID, - ) - if err != nil { - return stats, err +func (s ChairStatType) ToAppGetNotificationResponseChairStats() appGetNotificationResponseChairStats { + if s.Count == 0 { + return appGetNotificationResponseChairStats{ + TotalRidesCount: 0, + TotalEvaluationAvg: 0, } + } + return appGetNotificationResponseChairStats{ + TotalRidesCount: s.Count, + TotalEvaluationAvg: s.Sum / float64(s.Count), + } +} - var arrivedAt, pickupedAt *time.Time - var isCompleted bool - for _, status := range rideStatuses { - if status.Status == "ARRIVED" { - arrivedAt = &status.CreatedAt - } else if status.Status == "CARRYING" { - pickupedAt = &status.CreatedAt - } - if status.Status == "COMPLETED" { - isCompleted = true - } - } - if arrivedAt == nil || pickupedAt == nil { - continue - } - if !isCompleted { - continue - } +func InsertChairStats(chairID string, stat ChairStatType) { + ChairStatsMap.Store(chairID, stat) +} - totalRideCount++ - totalEvaluation += float64(*ride.Evaluation) +func getChairStats(chairID string) ChairStatType { + stats, ok := ChairStatsMap.Load(chairID) + if !ok { + return ChairStatType{} } - - stats.TotalRidesCount = totalRideCount - if totalRideCount > 0 { - stats.TotalEvaluationAvg = totalEvaluation / float64(totalRideCount) + s, ok := stats.(ChairStatType) + if !ok { + return ChairStatType{} } - - return stats, nil + return s } type appGetNearbyChairsResponse struct { @@ -904,16 +950,13 @@ func appGetNearbyChairs(w http.ResponseWriter, r *http.Request) { ctx, &chairs, ` - SELECT c.id, c.owner_id, c.name, c.model, c.is_active, c.access_token, c.created_at, c.updated_at + SELECT c.* FROM chairs AS c LEFT JOIN rides AS r ON r.chair_id = c.id - LEFT JOIN ride_statuses AS rs - ON rs.ride_id = r.id - AND status = 'COMPLETED' - WHERE c.is_active - GROUP BY c.id, c.owner_id, c.name, c.model, c.is_active, c.access_token, c.created_at, c.updated_at - HAVING COUNT(r.id) - COUNT(rs.id) = 0 + AND r.evaluation IS NULL + WHERE r.id IS NULL + AND c.is_active = TRUE `, // TODO: ChairMapを使う ) if err != nil { @@ -997,6 +1040,53 @@ func calculateDiscountedFare(ctx context.Context, tx *sqlx.Tx, userID string, ri } } + return calcFare(pickupLatitude, pickupLongitude, destLatitude, destLongitude, discount), nil +} + +func calcFare(pickupLatitude int, pickupLongitude int, destLatitude int, destLongitude int, discount int) int { + meteredFare := farePerDistance * calculateDistance(pickupLatitude, pickupLongitude, destLatitude, destLongitude) + discountedMeteredFare := max(meteredFare-discount, 0) + + return initialFare + discountedMeteredFare +} + +func calculateDiscountedFare2(ctx context.Context, db *sqlx.DB, userID string, ride *Ride, pickupLatitude, pickupLongitude, destLatitude, destLongitude int) (int, error) { + var coupon Coupon + discount := 0 + if ride != nil { + destLatitude = ride.DestinationLatitude + destLongitude = ride.DestinationLongitude + pickupLatitude = ride.PickupLatitude + pickupLongitude = ride.PickupLongitude + + // すでにクーポンが紐づいているならそれの割引額を参照 + if err := db.GetContext(ctx, &coupon, "SELECT * FROM coupons WHERE used_by = ?", ride.ID); err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return 0, err + } + } else { + discount = coupon.Discount + } + } else { + // 初回利用クーポンを最優先で使う + if err := db.GetContext(ctx, &coupon, "SELECT * FROM coupons WHERE user_id = ? AND code = 'CP_NEW2024' AND used_by IS NULL", userID); err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return 0, err + } + + // 無いなら他のクーポンを付与された順番に使う + if err := db.GetContext(ctx, &coupon, "SELECT * FROM coupons WHERE user_id = ? AND used_by IS NULL ORDER BY created_at LIMIT 1", userID); err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return 0, err + } + } else { + discount = coupon.Discount + } + } else { + discount = coupon.Discount + } + } + meteredFare := farePerDistance * calculateDistance(pickupLatitude, pickupLongitude, destLatitude, destLongitude) discountedMeteredFare := max(meteredFare-discount, 0) diff --git a/home/isucon/webapp/go/chair_handlers.go b/home/isucon/webapp/go/chair_handlers.go index d8ffd302..5231dd43 100644 --- a/home/isucon/webapp/go/chair_handlers.go +++ b/home/isucon/webapp/go/chair_handlers.go @@ -1,12 +1,14 @@ package main import ( + "context" "database/sql" "encoding/json" "errors" "fmt" "log/slog" "net/http" + "sync" "time" "github.com/oklog/ulid/v2" @@ -35,13 +37,9 @@ func chairPostChairs(w http.ResponseWriter, r *http.Request) { return } - owner := &Owner{} - if err := db.GetContext(ctx, owner, "SELECT * FROM owners WHERE chair_register_token = ?", req.ChairRegisterToken); err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusUnauthorized, errors.New("invalid chair_register_token")) - return - } - writeError(w, http.StatusInternalServerError, err) + owner := GetOwner(req.ChairRegisterToken) + if owner == nil { + writeError(w, http.StatusUnauthorized, errors.New("invalid chair_register_token")) return } @@ -102,6 +100,9 @@ func chairPostActivity(w http.ResponseWriter, r *http.Request) { } chair.IsActive = req.IsActive UpdateChair(chair, nil) + if req.IsActive { + matchingChairChannel <- chair.ID + } w.WriteHeader(http.StatusNoContent) } @@ -120,25 +121,12 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { chair := ctx.Value("chair").(*Chair) - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } - defer tx.Rollback() - - prevLocation := &ChairLocation{} // 一個前の座標 - if err := tx.GetContext(ctx, prevLocation, `SELECT * FROM chair_locations WHERE chair_id = ? ORDER BY created_at DESC LIMIT 1`, chair.ID); err != nil { - if !errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusInternalServerError, err) - return - } else { - prevLocation = nil - } - } + now := time.Now() + writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ + RecordedAt: now.UnixMilli(), + }) chairLocationID := ulid.Make().String() - now := time.Now() cl := ChairLocation{ ID: chairLocationID, ChairID: chair.ID, @@ -167,11 +155,20 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { CreatedAt: now, } + tx, err := db.Beginx() + if err != nil { + writeError(w, http.StatusInternalServerError, err) + return + } + defer tx.Rollback() + + prevLocation := GetChairLocation(chair.ID) if prevLocation != nil { + addDistance := abs(prevLocation.Longitude-location.Longitude) + abs(prevLocation.Latitude-location.Latitude) _, err = tx.ExecContext( ctx, - "INSERT INTO chair_locations_minus_distance (id, chair_id, distance) VALUES (?, ?, ?)", - ulid.Make().String(), chair.ID, abs(prevLocation.Longitude-location.Longitude)+abs(prevLocation.Latitude-location.Latitude), + "INSERT INTO chair_locations_total_distance (chair_id, total_distance) VALUES (?, ?) ON DUPLICATE KEY UPDATE total_distance = total_distance + ?", + chair.ID, addDistance, addDistance, ) if err != nil { writeError(w, http.StatusInternalServerError, err) @@ -186,17 +183,31 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { return } } else { - status, err := getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { + statusV := getLatestRideStatus(ride.ID) + if statusV == nil { writeError(w, http.StatusInternalServerError, err) return } + status := *statusV if status != "COMPLETED" && status != "CANCELED" { if req.Latitude == ride.PickupLatitude && req.Longitude == ride.PickupLongitude && status == "ENROUTE" { if _, err := tx.ExecContext(ctx, "INSERT INTO ride_statuses (id, ride_id, status) VALUES (?, ?, ?)", ulid.Make().String(), ride.ID, "PICKUP"); err != nil { writeError(w, http.StatusInternalServerError, err) return } + err = sendAppGetNotificationChannel(ctx, tx, "PICKUP", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + err = sendChairGetNotificationChannel(ctx, "PICKUP", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + UpdateRideStatus(ride.ID, "PICKUP") } if req.Latitude == ride.DestinationLatitude && req.Longitude == ride.DestinationLongitude && status == "CARRYING" { @@ -204,6 +215,19 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + err = sendAppGetNotificationChannel(ctx, tx, "ARRIVED", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + err = sendChairGetNotificationChannel(ctx, "ARRIVED", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + UpdateRideStatus(ride.ID, "ARRIVED") } } } @@ -212,10 +236,6 @@ func chairPostCoordinate(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } - - writeJSON(w, http.StatusOK, &chairPostCoordinateResponse{ - RecordedAt: location.CreatedAt.UnixMilli(), - }) } type simpleUser struct { @@ -236,6 +256,49 @@ type chairGetNotificationResponseData struct { Status string `json:"status"` } +// chairGetNotificationChannel map[chairID]chan chairGetNotificationResponseData +var chairGetNotificationChannel = sync.Map{} + +func sendChairGetNotificationChannel(ctx context.Context, status string, ride *Ride, user *User) error { + if !ride.ChairID.Valid { + return fmt.Errorf("ride.ChairID is not valid") + } + c, ok := chairGetNotificationChannel.Load(ride.ChairID.String) + if !ok { + slog.Info("no channel", "chair_id", ride.ChairID) + return nil + } + channel, ok := c.(chan chairGetNotificationResponseData) + if !ok { + slog.Info("invalid channel", "chair_id", ride.ChairID) + return nil + } + if user == nil { + user = GetUser(ride.UserID) + if user == nil { + return fmt.Errorf("failed to get user: %w, userID: %s", errors.New("user not found"), ride.UserID) + } + } + response := chairGetNotificationResponseData{ + RideID: ride.ID, + User: simpleUser{ + ID: user.ID, + Name: fmt.Sprintf("%s %s", user.Firstname, user.Lastname), + }, + PickupCoordinate: Coordinate{ + Latitude: ride.PickupLatitude, + Longitude: ride.PickupLongitude, + }, + DestinationCoordinate: Coordinate{ + Latitude: ride.DestinationLatitude, + Longitude: ride.DestinationLongitude, + }, + Status: status, + } + channel <- response + return nil +} + func chairGetNotification(w http.ResponseWriter, r *http.Request) { ctx := r.Context() chair := ctx.Value("chair").(*Chair) @@ -244,109 +307,111 @@ func chairGetNotification(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + + c := make(chan chairGetNotificationResponseData, 100) + chairGetNotificationChannel.Store(chair.ID, c) + ride := &Ride{} + if err := db.GetContext(ctx, ride, `SELECT * FROM rides WHERE chair_id = ? ORDER BY created_at DESC LIMIT 1`, chair.ID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + slog.Info("no rides", "chair_id", chair.ID) + ride = nil + } else { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to get rides", "error", err, "chair_id", chair.ID) + return + } + } + status := "" + if ride != nil { + statusV := getLatestRideStatus(ride.ID) + if statusV == nil { + err := errors.New("failed to get latest ride status") + writeError(w, http.StatusInternalServerError, err) + slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) + return + } + status = *statusV + + err := sendChairGetNotificationChannel(ctx, status, ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + } - ticker := time.NewTicker(time.Second * 1) for { select { - case <-ticker.C: - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to begin tx", "error", err) - return - } - defer func() { - if tx != nil { - tx.Rollback() - } - }() - - ride := &Ride{} - yetSentRideStatus := RideStatus{} - status := "" - - if err := tx.GetContext(ctx, ride, `SELECT * FROM rides WHERE chair_id = ? ORDER BY updated_at DESC LIMIT 1`, chair.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - tx.Rollback() - tx = nil - slog.Info("no rides", "chair_id", chair.ID) - continue - } - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "chair_id", chair.ID) - return - } - - if err := tx.GetContext(ctx, &yetSentRideStatus, `SELECT * FROM ride_statuses WHERE ride_id = ? AND chair_sent_at IS NULL ORDER BY created_at ASC LIMIT 1`, ride.ID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - slog.Info("no ride_status", "ride_id", ride.ID) - status, err = getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Info("failed to get latest ride status", "ride_id", ride.ID, "error", err) - return + case response := <-c: + // 順番が前後しちゃった場合はもう一度キューに詰め直す + if status != response.Status { + if status == "COMPLETED" { + if response.Status != "MATCHING" { + slog.Info("status is not matching", "status", response.Status) + c <- response + continue + } + } else if status == "MATCHING" { + if response.Status != "ENROUTE" { + slog.Info("status is not enroute", "status", response.Status) + c <- response + continue + } + } else if status == "ENROUTE" { + if response.Status != "PICKUP" { + slog.Info("status is not pickup", "status", response.Status) + c <- response + continue + } + } else if status == "PICKUP" { + if response.Status != "CARRYING" { + slog.Info("status is not carrying", "status", response.Status) + c <- response + continue + } + } else if status == "CARRYING" { + if response.Status != "ARRIVED" { + slog.Info("status is not arrived", "status", response.Status) + c <- response + continue + } + } else if status == "ARRIVED" { + if response.Status != "COMPLETED" { + slog.Info("status is not completed", "status", response.Status) + c <- response + continue } - } else { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get rides", "error", err, "ride_id", ride.ID) - return - } - } else { - status = yetSentRideStatus.Status - } - - user := &User{} - err = tx.GetContext(ctx, user, "SELECT * FROM users WHERE id = ? FOR SHARE", ride.UserID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to get user", "error", err, "user_id", ride.UserID) - return - } - - if yetSentRideStatus.ID != "" { - _, err := tx.ExecContext(ctx, `UPDATE ride_statuses SET chair_sent_at = CURRENT_TIMESTAMP(6) WHERE id = ?`, yetSentRideStatus.ID) - if err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to update ride_status.app_sent_at", "error", err, "ride_id", yetSentRideStatus.ID) - return } } - if err := tx.Commit(); err != nil { - writeError(w, http.StatusInternalServerError, err) - slog.Error("failed to commit", "error", err) - return - } + status = response.Status - response := &chairGetNotificationResponseData{ - RideID: ride.ID, - User: simpleUser{ - ID: user.ID, - Name: fmt.Sprintf("%s %s", user.Firstname, user.Lastname), - }, - PickupCoordinate: Coordinate{ - Latitude: ride.PickupLatitude, - Longitude: ride.PickupLongitude, - }, - DestinationCoordinate: Coordinate{ - Latitude: ride.DestinationLatitude, - Longitude: ride.DestinationLongitude, - }, - Status: status, + if _, err := w.Write([]byte("data: ")); err != nil { + slog.Error("failed to write data", "error", err) } - - w.Write([]byte("data: ")) if err := json.NewEncoder(w).Encode(response); err != nil { - w.WriteHeader(http.StatusInternalServerError) + writeError(w, http.StatusInternalServerError, err) slog.Error("failed to write response to http writer", "error", err, "response", response) return } - w.Write([]byte("\n\n")) + if _, err := w.Write([]byte("\n\n")); err != nil { + slog.Error("failed to write new line", "error", err) + } if f, ok := w.(http.Flusher); ok { f.Flush() } + // どうやら記録してなくてもいいらしい + //_, err := db.ExecContext(ctx, `UPDATE ride_statuses SET chair_sent_at = CURRENT_TIMESTAMP(6) WHERE ride_id = ? AND status = ?`, response.RideID, response.Status) + //if err != nil { + // writeError(w, http.StatusInternalServerError, err) + // slog.Error("failed to update ride_status.chair_sent_at", "error", err, "ride_id", response.RideID) + // return + //} case <-ctx.Done(): - break + return } } } @@ -396,13 +461,27 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + err = sendAppGetNotificationChannel(ctx, tx, "ENROUTE", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + err = sendChairGetNotificationChannel(ctx, "ENROUTE", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + UpdateRideStatus(ride.ID, "ENROUTE") // After Picking up user case "CARRYING": - status, err := getLatestRideStatus(ctx, tx, ride.ID) - if err != nil { + statusV := getLatestRideStatus(ride.ID) + if statusV == nil { writeError(w, http.StatusInternalServerError, err) return } + status := *statusV if status != "PICKUP" { writeError(w, http.StatusBadRequest, errors.New("chair has not arrived yet")) return @@ -411,6 +490,19 @@ func chairPostRideStatus(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + err = sendAppGetNotificationChannel(ctx, tx, "CARRYING", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + err = sendChairGetNotificationChannel(ctx, "CARRYING", ride, nil) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + slog.Error("failed to send notification", "error", err) + return + } + UpdateRideStatus(ride.ID, "CARRYING") default: writeError(w, http.StatusBadRequest, errors.New("invalid status")) } diff --git a/home/isucon/webapp/go/go.mod b/home/isucon/webapp/go/go.mod index eb62fa5b..c4ac423a 100644 --- a/home/isucon/webapp/go/go.mod +++ b/home/isucon/webapp/go/go.mod @@ -3,6 +3,7 @@ module github.com/isucon/isucon14/webapp/go go 1.23 require ( + github.com/davecgh/go-spew v1.1.1 github.com/go-chi/chi/v5 v5.1.0 github.com/go-sql-driver/mysql v1.8.1 github.com/jmoiron/sqlx v1.4.0 diff --git a/home/isucon/webapp/go/go.sum b/home/isucon/webapp/go/go.sum index cc8db1f4..6413393b 100644 --- a/home/isucon/webapp/go/go.sum +++ b/home/isucon/webapp/go/go.sum @@ -1,5 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= diff --git a/home/isucon/webapp/go/internal_handlers.go b/home/isucon/webapp/go/internal_handlers.go index ed581ea7..a4dd9daa 100644 --- a/home/isucon/webapp/go/internal_handlers.go +++ b/home/isucon/webapp/go/internal_handlers.go @@ -1,82 +1,314 @@ package main import ( + "context" + "database/sql" + "fmt" + "github.com/jmoiron/sqlx" + "log/slog" "net/http" "slices" + "sort" + "strings" + "time" ) -// このAPIをインスタンス内から一定間隔で叩かせることで、椅子とライドをマッチングさせる -func internalGetMatching(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - // MEMO: 一旦最も待たせているリクエストに適当な空いている椅子マッチさせる実装とする。おそらくもっといい方法があるはず… - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - return +// chairIDを入れる +var matchingChairChannel chan string + +// rideを入れる +var matchingRideChannel chan Ride +var matchingInit chan struct{} + +func matching() { + ctx := context.Background() + matchingChairChannel = make(chan string, 1000) + defer close(matchingChairChannel) + matchingRideChannel = make(chan Ride, 1000) + defer close(matchingRideChannel) + matchingInit = make(chan struct{}) + + { + chairIDs := []string{} + if err := db.SelectContext(ctx, &chairIDs, `SELECT chairs.id FROM chairs LEFT JOIN rides ON chairs.id = rides.chair_id AND rides.evaluation IS NULL WHERE is_active = TRUE AND rides.id IS NULL`); err != nil { + slog.Error("failed to get chair ids", "error", err) + return + } + for _, chairID := range chairIDs { + matchingChairChannel <- chairID + } } - defer tx.Rollback() - rides := []Ride{} - if err := tx.SelectContext(ctx, &rides, `SELECT * FROM rides WHERE chair_id IS NULL ORDER BY created_at`); err != nil { - writeError(w, http.StatusInternalServerError, err) - return + chairModelByChairName := make(map[string]ChairModel) + { + chairModels := []ChairModel{} + if err := db.SelectContext(ctx, &chairModels, "SELECT * FROM chair_models"); err != nil { + slog.Error("failed to get chair models", "error", err) + return + } + for _, chairModel := range chairModels { + chairModelByChairName[chairModel.Name] = chairModel + } } - notCompletedChairIDs := []string{} - if err := tx.SelectContext(ctx, ¬CompletedChairIDs, `SELECT chair_id FROM rides where evaluation IS NULL AND chair_id IS NOT NULL`); err != nil { - writeError(w, http.StatusInternalServerError, err) - return + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + waitingChairIDs := []string{} + waitingRides := []Ride{} + slog.Info("matching start") + defer slog.Info("matching end") + for { + slog.Info("matching loop") + select { + case <-matchingInit: + slog.Info("matching init") + matchingChairChannel = make(chan string, 1000) + matchingRideChannel = make(chan Ride, 1000) + { + waitingChairIDs = []string{} + if err := db.SelectContext(ctx, &waitingChairIDs, `SELECT chairs.id FROM chairs LEFT JOIN rides ON chairs.id = rides.chair_id AND rides.evaluation IS NULL WHERE is_active = TRUE AND rides.id IS NULL`); err != nil { + slog.Error("failed to get chair ids", "error", err) + continue + } + waitingRides = []Ride{} + if err := db.SelectContext(ctx, &waitingRides, `SELECT * FROM rides WHERE chair_id IS NULL ORDER BY created_at`); err != nil { + slog.Error("failed to get rides", "error", err) + continue + } + } + case chairID := <-matchingChairChannel: + waitingChairIDs = append(waitingChairIDs, chairID) + case ride := <-matchingRideChannel: + waitingRides = append(waitingRides, ride) + case <-ticker.C: + type expectedScoreType struct { + ride Ride + chairLocation *ChairLocation + expectedScore float64 + } + expectedScores := make([]expectedScoreType, 0, len(waitingChairIDs)*len(waitingRides)) + for _, chairID := range waitingChairIDs { + chairLocation := GetChairLocation(chairID) + if chairLocation == nil { + continue + } + chair := GetChair(chairID) + if chair == nil { + continue + } + for _, r := range waitingRides { + expectedScores = append(expectedScores, expectedScoreType{ + ride: r, + chairLocation: chairLocation, + expectedScore: calcExpectedScore(r, chairLocation, chairModelByChairName[chair.Model].Speed) + + float64((time.Now().Sub(r.CreatedAt)).Nanoseconds())*0.00000001, // うまく待ってる時間を考慮したい + }) + } + } + if len(expectedScores) == 0 { + continue + } + sort.Slice(expectedScores, func(i, j int) bool { + return expectedScores[i].expectedScore > expectedScores[j].expectedScore + }) + //highExpectedScore := expectedScores[0].expectedScore + usedRideIDs := make(map[string]struct{}) + usedChairIDs := make(map[string]struct{}) + matchingRides := make([]Ride, 0, len(expectedScores)) + for _, es := range expectedScores { + //if es.expectedScore < highExpectedScore*0.1 { // 10%以下のものは無視 + // break + //} + if _, ok := usedRideIDs[es.ride.ID]; ok { + continue + } + if _, ok := usedChairIDs[es.chairLocation.ChairID]; ok { + continue + } + es.ride.ChairID = sql.NullString{String: es.chairLocation.ChairID, Valid: true} + matchingRides = append(matchingRides, es.ride) + + usedRideIDs[es.ride.ID] = struct{}{} + usedChairIDs[es.chairLocation.ChairID] = struct{}{} + } + + err := matchingComp(ctx, matchingRides) + if err != nil { + slog.Error("failed to matching", "error", err) + continue + } + + waitingChairIDs = slices.DeleteFunc(waitingChairIDs, func(chairID string) bool { + _, ok := usedChairIDs[chairID] + return ok + }) + waitingRides = slices.DeleteFunc(waitingRides, func(ride Ride) bool { + _, ok := usedRideIDs[ride.ID] + return ok + }) + } } - notCompletedChairIDsSet := make(map[string]struct{}, len(notCompletedChairIDs)) - for _, id := range notCompletedChairIDs { - notCompletedChairIDsSet[id] = struct{}{} +} + +func calcExpectedScore(ride Ride, nowChairLocation *ChairLocation, speed int) float64 { + var ret float64 + // 椅子がライドとマッチした位置から乗車位置までの移動距離の合計 * 0.1 + distanceOfChairToPickup := float64(calculateDistance(nowChairLocation.Latitude, nowChairLocation.Longitude, ride.PickupLatitude, ride.PickupLongitude)) + ret += (distanceOfChairToPickup) * 0.1 + // 椅子の乗車位置から目的地までの移動距離の合計 + distanceOfPickupToDestination := float64(calculateDistance(ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude)) + ret += distanceOfPickupToDestination + + // かかる時間 + t := (distanceOfChairToPickup + distanceOfPickupToDestination) / float64(speed) + + // 単位時間あたりの得点の期待値 + return ret / t +} + +func matchingComp(ctx context.Context, rides []Ride) error { + rideIDs := make([]string, 0, len(rides)) + chairIDs := make([]string, 0, len(rides)) + for _, ride := range rides { + rideIDs = append(rideIDs, ride.ID) + chairIDs = append(chairIDs, ride.ChairID.String) } - notCompletedChairIDs = []string{} - if err := tx.SelectContext(ctx, ¬CompletedChairIDs, `SELECT chair_id FROM rides where updated_at > NOW(6) - INTERVAL 3.5 SECOND AND chair_id IS NOT NULL`); err != nil { - writeError(w, http.StatusInternalServerError, err) - return + b := make([]any, 0, len(rideIDs)*2+len(chairIDs)) + for _, v := range rideIDs { + b = append(b, v) } - for _, id := range notCompletedChairIDs { - notCompletedChairIDsSet[id] = struct{}{} + for _, v := range chairIDs { + b = append(b, v) + } + for _, v := range rideIDs { + b = append(b, v) + } + tx, err := db.Beginx() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + _, err = tx.ExecContext(ctx, fmt.Sprintf("UPDATE rides SET chair_id = ELT(FIELD(id%s)%s) WHERE id IN (?%s)", strings.Repeat(",?", len(rideIDs)), strings.Repeat(",?", len(chairIDs)), strings.Repeat(",?", len(rideIDs)-1)), b...) + if err != nil { + slog.Error("failed to update ride", "error", err) + return fmt.Errorf("failed to update ride: %w", err) } - chairs := []Chair{} - if err := tx.SelectContext(ctx, &chairs, `SELECT * FROM chairs WHERE is_active = TRUE`); err != nil { - writeError(w, http.StatusInternalServerError, err) - return + coupons := []Coupon{} + query, params, err := sqlx.In("SELECT * FROM coupons WHERE used_by IN (?)", rideIDs) + if err != nil { + return fmt.Errorf("failed to create query: %w", err) } - candidateChairs := make([]Chair, 0, len(chairs)) - for _, chair := range chairs { - if _, ok := notCompletedChairIDsSet[chair.ID]; !ok { - candidateChairs = append(candidateChairs, chair) - } + if err := tx.SelectContext(ctx, &coupons, query, params...); err != nil { + return fmt.Errorf("failed to select coupons: %w", err) + } + couponByRideID := map[string]Coupon{} + for _, coupon := range coupons { + couponByRideID[*coupon.UsedBy] = coupon } for _, ride := range rides { - if len(candidateChairs) == 0 { - break + discount := 0 + if coupon, ok := couponByRideID[ride.ID]; ok { + discount = coupon.Discount } - selectedChair := candidateChairs[0] - selectedIndex := 0 - for index, chair := range candidateChairs { - selectedChairLocation := GetChairLocation(selectedChair.ID) - candidateChairLocation := GetChairLocation(chair.ID) - if abs(selectedChairLocation.Latitude-ride.PickupLatitude)+abs(selectedChairLocation.Longitude-ride.PickupLongitude) > abs(candidateChairLocation.Latitude-ride.PickupLatitude)+abs(candidateChairLocation.Longitude-ride.PickupLongitude) { - selectedChair = chair - selectedIndex = index - } + fare := calcFare(ride.PickupLatitude, ride.PickupLongitude, ride.DestinationLatitude, ride.DestinationLongitude, discount) + err = sendAppGetNotificationChannel(ctx, tx, "MATCHING", &ride, &fare) + if err != nil { + slog.Error("failed to send notification", "error", err) + return fmt.Errorf("failed to send notification: %w", err) } - candidateChairs = slices.Delete(candidateChairs, selectedIndex, selectedIndex+1) - if _, err := tx.ExecContext(ctx, "UPDATE rides SET chair_id = ? WHERE id = ? AND chair_id IS NULL", selectedChair.ID, ride.ID); err != nil { - writeError(w, http.StatusInternalServerError, err) - return + err = sendChairGetNotificationChannel(ctx, "MATCHING", &ride, nil) + if err != nil { + slog.Error("failed to send notification", "error", err) + return fmt.Errorf("failed to send notification: %w", err) } } - - if err := tx.Commit(); err != nil { - writeError(w, http.StatusInternalServerError, err) - return + if err = tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) } + return nil +} + +// このAPIをインスタンス内から一定間隔で叩かせることで、椅子とライドをマッチングさせる +func internalGetMatching(w http.ResponseWriter, r *http.Request) { + //ctx := r.Context() + // MEMO: 一旦最も待たせているリクエストに適当な空いている椅子マッチさせる実装とする。おそらくもっといい方法があるはず… + //tx, err := db.Beginx() + //if err != nil { + // writeError(w, http.StatusInternalServerError, err) + // return + //} + //defer tx.Rollback() + // + //rides := []Ride{} + //if err := tx.SelectContext(ctx, &rides, `SELECT * FROM rides WHERE chair_id IS NULL ORDER BY created_at`); err != nil { + // writeError(w, http.StatusInternalServerError, err) + // return + //} + // + //notCompletedChairIDs := []string{} + //if err := tx.SelectContext(ctx, ¬CompletedChairIDs, `SELECT chair_id FROM rides where evaluation IS NULL AND chair_id IS NOT NULL`); err != nil { + // writeError(w, http.StatusInternalServerError, err) + // return + //} + //notCompletedChairIDsSet := make(map[string]struct{}, len(notCompletedChairIDs)) + //for _, id := range notCompletedChairIDs { + // notCompletedChairIDsSet[id] = struct{}{} + //} + //notCompletedChairIDs = []string{} + //if err := tx.SelectContext(ctx, ¬CompletedChairIDs, `SELECT chair_id FROM rides where updated_at > NOW(6) - INTERVAL 3.5 SECOND AND chair_id IS NOT NULL`); err != nil { + // writeError(w, http.StatusInternalServerError, err) + // return + //} + //for _, id := range notCompletedChairIDs { + // notCompletedChairIDsSet[id] = struct{}{} + //} + // + //chairs := []Chair{} + //if err := tx.SelectContext(ctx, &chairs, `SELECT * FROM chairs WHERE is_active = TRUE`); err != nil { + // writeError(w, http.StatusInternalServerError, err) + // return + //} + //candidateChairs := make([]Chair, 0, len(chairs)) + //for _, chair := range chairs { + // if _, ok := notCompletedChairIDsSet[chair.ID]; !ok { + // candidateChairs = append(candidateChairs, chair) + // } + //} + // + //for _, ride := range rides { + // if len(candidateChairs) == 0 { + // break + // } + // selectedChair := candidateChairs[0] + // selectedIndex := 0 + // for index, chair := range candidateChairs { + // selectedChairLocation := GetChairLocation(selectedChair.ID) + // candidateChairLocation := GetChairLocation(chair.ID) + // if abs(selectedChairLocation.Latitude-ride.PickupLatitude)+abs(selectedChairLocation.Longitude-ride.PickupLongitude) > abs(candidateChairLocation.Latitude-ride.PickupLatitude)+abs(candidateChairLocation.Longitude-ride.PickupLongitude) { + // selectedChair = chair + // selectedIndex = index + // } + // } + // candidateChairs = slices.Delete(candidateChairs, selectedIndex, selectedIndex+1) + // if _, err := tx.ExecContext(ctx, "UPDATE rides SET chair_id = ? WHERE id = ? AND chair_id IS NULL", selectedChair.ID, ride.ID); err != nil { + // writeError(w, http.StatusInternalServerError, err) + // return + // } + // err = sendAppGetNotificationChannel(ctx, tx, "MATCHING", &ride) + // if err != nil { + // writeError(w, http.StatusInternalServerError, err) + // slog.Error("failed to send notification", "error", err) + // return + // } + //} + // + //if err := tx.Commit(); err != nil { + // writeError(w, http.StatusInternalServerError, err) + // return + //} w.WriteHeader(http.StatusNoContent) } diff --git a/home/isucon/webapp/go/main.go b/home/isucon/webapp/go/main.go index 1b53276c..7ea8a2e1 100644 --- a/home/isucon/webapp/go/main.go +++ b/home/isucon/webapp/go/main.go @@ -2,6 +2,7 @@ package main import ( crand "crypto/rand" + "database/sql" "encoding/json" "fmt" "log/slog" @@ -9,13 +10,10 @@ import ( "net/http" "os" "os/exec" - "sort" "strconv" "sync" "time" - "github.com/oklog/ulid/v2" - "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-sql-driver/mysql" @@ -28,6 +26,7 @@ var db *sqlx.DB var ChairMap = sync.Map{} var ChairLocationMap = sync.Map{} +var ChairTotalDistanceMap = sync.Map{} func UpdateChair(chair *Chair, updatedAt *time.Time) { if updatedAt != nil { @@ -44,6 +43,10 @@ func InsertChairLocation(cl *ChairLocation) { ChairLocationMap.Store(cl.ChairID, cl) } +func InsertChairTotalDistance(ctd *ChairTotalDistance) { + ChairTotalDistanceMap.Store(ctd.ChairID, ctd) +} + // GetChair // AccessTokenかIDをキーにしてChairを取得する func GetChair(key string) *Chair { @@ -62,29 +65,32 @@ func GetChairLocation(key string) *ChairLocation { return nil } -// ListChairLocations -// ChairID をキーにして ChairLocation list を取得する -func ListChairLocations(key string) (cls []*ChairLocation) { - ChairLocationMap.Range(func(k, v any) bool { - if key == k.(string) { - return true - } +func GetTotalDistance(chairID string) int { + if v, ok := ChairTotalDistanceMap.Load(chairID); ok { + return v.(*ChairTotalDistance).TotalDistance + } + return 0 +} - cl := v.(*ChairLocation) - if cl.ChairID == key { - cls = append(cls, cl) - } - return true - }) - sort.Slice(cls, func(i, j int) bool { - return cls[i].ID < cls[j].ID - }) - return +var UserMap = sync.Map{} + +func InsertUser(user *User) { + UserMap.Store(user.ID, user) + UserMap.Store(user.AccessToken, user) + UserMap.Store(user.InvitationCode, user) +} + +func GetUser(key string) *User { + if v, ok := UserMap.Load(key); ok { + return v.(*User) + } + return nil } func main() { mux := setup() - slog.Info("Listening on :8080") + //slog.Info("Listening on :8080") + slog.SetLogLoggerLevel(1000) http.ListenAndServe(":8080", mux) } @@ -152,8 +158,87 @@ func setup() http.Handler { } } + { + ChairTotalDistanceMap = sync.Map{} + data := []ChairTotalDistance{} + if err = db.Select(&data, "SELECT * FROM chair_locations_total_distance ORDER BY chair_id"); err != nil { + panic(err) + } + for _, ctd := range data { + InsertChairTotalDistance(&ctd) + } + } + + { + OwnerMap = sync.Map{} + owners := []Owner{} + if err := db.Select(&owners, "SELECT * FROM owners"); err != nil { + panic(err) + } + for _, owner := range owners { + InsertOwner(&owner) + } + } + + { + rideStatusMap = sync.Map{} + rideStatuses := []RideStatus{} + if err = db.Select(&rideStatuses, "SELECT * FROM ride_statuses ORDER BY created_at"); err != nil { + panic(err) + } + for _, rs := range rideStatuses { + rideStatusMap.Store(rs.ID, &rs) + } + } + + { + UserMap = sync.Map{} + users := []User{} + if err := db.Select(&users, "SELECT * FROM users"); err != nil { + panic(err) + } + for _, u := range users { + InsertUser(&u) + } + } + + { + ChairStatsMap = sync.Map{} + stats := []struct { + ChairID string `db:"chair_id"` + S sql.Null[float64] `db:"s"` + C sql.Null[int] `db:"c"` + }{} + if err := db.Select(&stats, + `SELECT chair_id, SUM(evaluation) as s, COUNT(1) as c FROM rides WHERE chair_id IS NOT NULL GROUP BY chair_id`, + ); err != nil { + panic(err) + } + for _, stat := range stats { + c := 0 + if stat.C.Valid { + c = stat.C.V + } + s := 0.0 + if stat.S.Valid { + s = stat.S.V + } + InsertChairStats(stat.ChairID, ChairStatType{ + Count: c, + Sum: s, + }) + } + } + + go matching() + { + if err := db.Get(&paymentGatewayURL, "SELECT value FROM settings WHERE name = 'payment_gateway_url'"); err != nil { + panic(err) + } + } + mux := chi.NewRouter() - mux.Use(middleware.Logger) + //mux.Use(middleware.Logger) mux.Use(middleware.Recoverer) mux.HandleFunc("POST /api/initialize", postInitialize) @@ -228,40 +313,41 @@ func postInitialize(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, err) return } + paymentGatewayURL = req.PaymentServer chairLocations := []ChairLocation{} if err := db.SelectContext(ctx, &chairLocations, "SELECT * FROM chair_locations ORDER BY created_at"); err != nil { writeError(w, http.StatusInternalServerError, err) return } - prevChairLocations := make(map[string]ChairLocation) - distanceByChairID := make(map[string]int) for _, cl := range chairLocations { InsertChairLocation(&cl) - prevChairLocation, ok := prevChairLocations[cl.ChairID] - prevChairLocations[cl.ChairID] = cl - if !ok { - continue + } + chairTotalDistanceByChairID := map[string]int{} + prevChairLocationByChairID := map[string]*ChairLocation{} + for _, cl := range chairLocations { + if prevChairLocation, ok := prevChairLocationByChairID[cl.ChairID]; ok { + chairTotalDistanceByChairID[cl.ChairID] += abs(cl.Latitude-prevChairLocation.Latitude) + abs(cl.Longitude-prevChairLocation.Longitude) } - distanceByChairID[cl.ChairID] += abs(cl.Latitude-prevChairLocation.Latitude) + abs(cl.Longitude-prevChairLocation.Longitude) - } - chairTotalDistances := make([]ChairTotalDistance, 0, len(distanceByChairID)) - for chairID, distance := range distanceByChairID { - chairTotalDistances = append(chairTotalDistances, ChairTotalDistance{ - ID: ulid.Make().String(), - ChairID: chairID, - Distance: distance, - }) + prevChairLocationByChairID[cl.ChairID] = &cl + } + chairTotalDistances := make([]ChairTotalDistance, 0, len(chairTotalDistanceByChairID)) + for chairID, totalDistance := range chairTotalDistanceByChairID { + chairTotalDistances = append(chairTotalDistances, ChairTotalDistance{ChairID: chairID, TotalDistance: totalDistance}) } - _, err := db.NamedExecContext( - ctx, - "INSERT INTO chair_locations_minus_distance (id, chair_id, distance) VALUES (:id, :chair_id, :distance)", - chairTotalDistances, - ) + _, err := db.ExecContext(ctx, "TRUNCATE TABLE chair_locations_total_distance") if err != nil { writeError(w, http.StatusInternalServerError, err) return } + _, err = db.NamedExecContext(ctx, "INSERT INTO chair_locations_total_distance (chair_id, total_distance) VALUES (:chair_id, :total_distance)", chairTotalDistances) + if err != nil { + writeError(w, http.StatusInternalServerError, err) + return + } + for _, ctd := range chairTotalDistances { + InsertChairTotalDistance(&ctd) + } ChairMap = sync.Map{} chairs := []Chair{} @@ -273,6 +359,63 @@ func postInitialize(w http.ResponseWriter, r *http.Request) { UpdateChair(&chair, &chair.UpdatedAt) } + appGetNotificationChannel = sync.Map{} + + matchingInit <- struct{}{} + + OwnerMap = sync.Map{} + owners := []Owner{} + if err = db.Select(&owners, "SELECT * FROM owners"); err != nil { + panic(err) + } + for _, owner := range owners { + InsertOwner(&owner) + } + + rideStatusMap = sync.Map{} + rideStatuses := []RideStatus{} + if err = db.Select(&rideStatuses, "SELECT * FROM ride_statuses ORDER BY created_at"); err != nil { + panic(err) + } + for _, rs := range rideStatuses { + rideStatusMap.Store(rs.ID, &rs) + } + + UserMap = sync.Map{} + users := []User{} + if err = db.Select(&users, "SELECT * FROM users"); err != nil { + panic(err) + } + for _, u := range users { + InsertUser(&u) + } + + ChairStatsMap = sync.Map{} + stats := []struct { + ChairID string `db:"chair_id"` + S sql.Null[float64] `db:"s"` + C sql.Null[int] `db:"c"` + }{} + if err = db.Select(&stats, + `SELECT chair_id, SUM(evaluation) as s, COUNT(1) as c FROM rides WHERE chair_id IS NOT NULL GROUP BY chair_id`, + ); err != nil { + panic(err) + } + for _, stat := range stats { + c := 0 + if stat.C.Valid { + c = stat.C.V + } + s := 0.0 + if stat.S.Valid { + s = stat.S.V + } + InsertChairStats(stat.ChairID, ChairStatType{ + Count: c, + Sum: s, + }) + } + writeJSON(w, http.StatusOK, postInitializeResponse{Language: "go"}) } diff --git a/home/isucon/webapp/go/middlewares.go b/home/isucon/webapp/go/middlewares.go index b2d57e4c..48ab11eb 100644 --- a/home/isucon/webapp/go/middlewares.go +++ b/home/isucon/webapp/go/middlewares.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "errors" "net/http" ) @@ -16,14 +15,9 @@ func appAuthMiddleware(next http.Handler) http.Handler { return } accessToken := c.Value - user := &User{} - err = db.GetContext(ctx, user, "SELECT * FROM users WHERE access_token = ?", accessToken) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusUnauthorized, errors.New("invalid access token")) - return - } - writeError(w, http.StatusInternalServerError, err) + user := GetUser(accessToken) + if user == nil { + writeError(w, http.StatusUnauthorized, errors.New("invalid access token")) return } @@ -41,13 +35,9 @@ func ownerAuthMiddleware(next http.Handler) http.Handler { return } accessToken := c.Value - owner := &Owner{} - if err := db.GetContext(ctx, owner, "SELECT * FROM owners WHERE access_token = ?", accessToken); err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusUnauthorized, errors.New("invalid access token")) - return - } - writeError(w, http.StatusInternalServerError, err) + owner := GetOwner(accessToken) + if owner == nil { + writeError(w, http.StatusUnauthorized, errors.New("invalid access token")) return } diff --git a/home/isucon/webapp/go/models.go b/home/isucon/webapp/go/models.go index a60b3a56..e7dfbd26 100644 --- a/home/isucon/webapp/go/models.go +++ b/home/isucon/webapp/go/models.go @@ -30,9 +30,8 @@ type ChairLocation struct { } type ChairTotalDistance struct { - ID string `db:"id"` - ChairID string `db:"chair_id"` - Distance int `db:"distance"` + ChairID string `db:"chair_id"` + TotalDistance int `db:"total_distance"` } type User struct { diff --git a/home/isucon/webapp/go/owner_handlers.go b/home/isucon/webapp/go/owner_handlers.go index 5d311c96..2c73f5c0 100644 --- a/home/isucon/webapp/go/owner_handlers.go +++ b/home/isucon/webapp/go/owner_handlers.go @@ -5,6 +5,7 @@ import ( "errors" "net/http" "strconv" + "sync" "time" "github.com/oklog/ulid/v2" @@ -24,6 +25,21 @@ type ownerPostOwnersResponse struct { ChairRegisterToken string `json:"chair_register_token"` } +var OwnerMap = sync.Map{} + +func InsertOwner(owner *Owner) { + OwnerMap.Store(owner.ID, owner) + OwnerMap.Store(owner.ChairRegisterToken, owner) + OwnerMap.Store(owner.AccessToken, owner) +} + +func GetOwner(key string) *Owner { + if v, ok := OwnerMap.Load(key); ok { + return v.(*Owner) + } + return nil +} + func ownerPostOwners(w http.ResponseWriter, r *http.Request) { ctx := r.Context() req := &ownerPostOwnersRequest{} @@ -40,15 +56,24 @@ func ownerPostOwners(w http.ResponseWriter, r *http.Request) { accessToken := secureRandomStr(32) chairRegisterToken := secureRandomStr(32) + now := time.Now() _, err := db.ExecContext( ctx, - "INSERT INTO owners (id, name, access_token, chair_register_token) VALUES (?, ?, ?, ?)", - ownerID, req.Name, accessToken, chairRegisterToken, + "INSERT INTO owners (id, name, access_token, chair_register_token, updated_at, created_at) VALUES (?, ?, ?, ?,?,?)", + ownerID, req.Name, accessToken, chairRegisterToken, now, now, ) if err != nil { writeError(w, http.StatusInternalServerError, err) return } + InsertOwner(&Owner{ + ID: ownerID, + Name: req.Name, + AccessToken: accessToken, + ChairRegisterToken: chairRegisterToken, + CreatedAt: now, + UpdatedAt: now, + }) http.SetCookie(w, &http.Cookie{ Path: "/", @@ -102,41 +127,42 @@ func ownerGetSales(w http.ResponseWriter, r *http.Request) { owner := r.Context().Value("owner").(*Owner) - tx, err := db.Beginx() - if err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } - defer tx.Rollback() - chairs := []Chair{} - if err := tx.SelectContext(ctx, &chairs, "SELECT * FROM chairs WHERE owner_id = ?", owner.ID); err != nil { + if err := db.SelectContext(ctx, &chairs, "SELECT * FROM chairs WHERE owner_id = ?", owner.ID); err != nil { writeError(w, http.StatusInternalServerError, err) return } + chairMap := make(map[string]Chair, len(chairs)) + for _, chair := range chairs { + chairMap[chair.ID] = chair + } res := ownerGetSalesResponse{ TotalSales: 0, } - modelSalesByModel := map[string]int{} - for _, chair := range chairs { - rides := []Ride{} - if err := tx.SelectContext(ctx, &rides, "SELECT rides.* FROM rides JOIN ride_statuses ON rides.id = ride_statuses.ride_id WHERE chair_id = ? AND status = 'COMPLETED' AND updated_at BETWEEN ? AND ? + INTERVAL 999 MICROSECOND", chair.ID, since, until); err != nil { - writeError(w, http.StatusInternalServerError, err) - return - } + rides := []Ride{} + if err := db.SelectContext(ctx, &rides, "SELECT rides.* FROM rides INNER JOIN chairs ON rides.chair_id = chairs.id WHERE chairs.owner_id = ? AND evaluation IS NOT NULL AND rides.updated_at BETWEEN ? AND ? + INTERVAL 999 MICROSECOND", owner.ID, since, until); err != nil { + writeError(w, http.StatusInternalServerError, err) + return + } - sales := sumSales(rides) + modelSalesByModel := map[string]int{} + modelSalesByChair := map[string]int{} + for _, ride := range rides { + chair := chairMap[ride.ChairID.String] + sales := calculateSale(ride) res.TotalSales += sales - + modelSalesByModel[chair.Model] += sales + modelSalesByChair[chair.ID] += sales + } + for _, chair := range chairs { + modelSalesByModel[chair.Model] += 0 res.Chairs = append(res.Chairs, chairSales{ ID: chair.ID, Name: chair.Name, - Sales: sales, + Sales: modelSalesByChair[chair.ID], }) - - modelSalesByModel[chair.Model] += sales } models := []modelSales{} @@ -217,25 +243,7 @@ WHERE owner_id = ? totalDistanceByChairID := make(map[string]int) for _, c := range chairs { - locations := ListChairLocations(c.ID) - var ( - prev *ChairLocation - distance int - ) - for _, l := range locations { - if prev == nil { - prev = l - continue - } - - lat := abs(l.Latitude - prev.Latitude) - - lon := abs(l.Longitude - prev.Longitude) - - distance += lat + lon - prev = l - } - totalDistanceByChairID[c.ID] = distance + totalDistanceByChairID[c.ID] = GetTotalDistance(c.ID) } res := ownerGetChairResponse{} diff --git a/home/isucon/webapp/go/payment_gateway.go b/home/isucon/webapp/go/payment_gateway.go index 565a9353..a5e4c8f7 100644 --- a/home/isucon/webapp/go/payment_gateway.go +++ b/home/isucon/webapp/go/payment_gateway.go @@ -6,8 +6,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/oklog/ulid/v2" "net/http" - "time" ) var erroredUpstream = errors.New("errored upstream") @@ -21,76 +21,33 @@ type paymentGatewayGetPaymentsResponseOne struct { Status string `json:"status"` } -func requestPaymentGatewayPostPayment(ctx context.Context, paymentGatewayURL string, token string, param *paymentGatewayPostPaymentRequest, retrieveRidesOrderByCreatedAtAsc func() ([]Ride, error)) error { +func requestPaymentGatewayPostPayment(ctx context.Context, paymentGatewayURL string, token string, param *paymentGatewayPostPaymentRequest) error { b, err := json.Marshal(param) if err != nil { - return err + return fmt.Errorf("failed to marshal request body: %w", err) } - // 失敗したらとりあえずリトライ - // FIXME: 社内決済マイクロサービスのインフラに異常が発生していて、同時にたくさんリクエストすると変なことになる可能性あり - retry := 0 + idempotencyKey := ulid.Make().String() for { - err := func() error { - req, err := http.NewRequestWithContext(ctx, http.MethodPost, paymentGatewayURL+"/payments", bytes.NewBuffer(b)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+token) - - res, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - - if res.StatusCode != http.StatusNoContent { - // エラーが返ってきても成功している場合があるので、社内決済マイクロサービスに問い合わせ - getReq, err := http.NewRequestWithContext(ctx, http.MethodGet, paymentGatewayURL+"/payments", bytes.NewBuffer([]byte{})) - if err != nil { - return err - } - getReq.Header.Set("Authorization", "Bearer "+token) - - getRes, err := http.DefaultClient.Do(getReq) - if err != nil { - return err - } - defer res.Body.Close() - - // GET /payments は障害と関係なく200が返るので、200以外は回復不能なエラーとする - if getRes.StatusCode != http.StatusOK { - return fmt.Errorf("[GET /payments] unexpected status code (%d)", getRes.StatusCode) - } - var payments []paymentGatewayGetPaymentsResponseOne - if err := json.NewDecoder(getRes.Body).Decode(&payments); err != nil { - return err - } - - rides, err := retrieveRidesOrderByCreatedAtAsc() - if err != nil { - return err - } - - if len(rides) != len(payments) { - return fmt.Errorf("unexpected number of payments: %d != %d. %w", len(rides), len(payments), erroredUpstream) - } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, paymentGatewayURL+"/payments", bytes.NewBuffer(b)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Idempotency-Key", idempotencyKey) - return nil - } - return nil - }() + res, err := http.DefaultClient.Do(req) if err != nil { - if retry < 5 { - retry++ - time.Sleep(100 * time.Millisecond) - continue - } else { - return err - } + return fmt.Errorf("failed to request payment gateway: %w", err) + } + if err = res.Body.Close(); err != nil { + return fmt.Errorf("failed to close response body: %w", err) + } + + if res.StatusCode == http.StatusNoContent { + break } - break } return nil diff --git a/home/isucon/webapp/sql/4-migrate.sql b/home/isucon/webapp/sql/4-migrate.sql index cfaff0ae..0eebb7c4 100644 --- a/home/isucon/webapp/sql/4-migrate.sql +++ b/home/isucon/webapp/sql/4-migrate.sql @@ -1,19 +1,25 @@ ALTER TABLE ride_statuses ADD INDEX IX_ride_statuses_ride_id_created_at (ride_id, created_at); +ALTER TABLE ride_statuses ADD UNIQUE INDEX UQ_ride_statuses_ride_id_status (ride_id, status); ALTER TABLE ride_statuses ADD INDEX IX_ride_statuses_ride_id_chair_sent_at_created_at (ride_id, chair_sent_at, created_at); +ALTER TABLE ride_statuses ADD INDEX IX_ride_statuses_ride_id_app_sent_at_created_at (ride_id, app_sent_at, created_at); ALTER TABLE chair_locations ADD INDEX IX_chair_locations_chair_id_created_at (chair_id, created_at); ALTER TABLE chair_locations ADD INDEX IX_chair_locations_chair_id_id (chair_id, id); ALTER TABLE rides ADD INDEX IX_rides_chair_id_updated_at (chair_id, updated_at); -ALTER TABLE rides ADD INDEX IX_rides_evaluation (evaluation); +ALTER TABLE rides ADD INDEX IX_rides_chair_id_created_at (chair_id, created_at); +ALTER TABLE rides ADD INDEX IX_rides_user_id_created_at (user_id, created_at); +ALTER TABLE rides ADD INDEX IX_rides_evaluation_chair_id_updated_at (evaluation, chair_id, updated_at); ALTER TABLE chairs ADD INDEX IX_chairs_access_token (access_token); +ALTER TABLE chairs ADD INDEX IX_chairs_owner_id (owner_id); +ALTER TABLE coupons ADD INDEX IX_coupons_code (code); +ALTER TABLE coupons ADD INDEX IX_coupons_used_by_user_id_created_at (used_by, user_id, created_at); -DROP TABLE IF EXISTS chair_locations_minus_distance; -CREATE TABLE chair_locations_minus_distance +DROP TABLE IF EXISTS chair_locations_total_distance; +CREATE TABLE chair_locations_total_distance ( - id VARCHAR(26) NOT NULL, - chair_id VARCHAR(26) NOT NULL COMMENT '椅子ID', - distance LONG NOT NULL COMMENT 'マイナスされた距離', - PRIMARY KEY (id) + chair_id VARCHAR(26) NOT NULL COMMENT '椅子ID', + total_distance LONG NOT NULL COMMENT '合計移動距離距離', + PRIMARY KEY (chair_id) ) - COMMENT = '椅子のマイナスされた距離テーブル'; + COMMENT = '合計移動距離距離テーブル'; -ALTER TABLE chair_locations_minus_distance ADD INDEX IX_chair_locations_minus_distance_chair_id (chair_id); +ALTER TABLE chair_locations_total_distance ADD INDEX IX_chair_locations_total_distance_chair_id (chair_id);