秒殺中的分桶策略 —— 提高單臺數據庫的效能

語言: CN / TW / HK

theme: channing-cyan

背景

眾所周知,資料庫的單行併發寫能力極為有限,比如 MySQL 的單行併發寫大概在300~500TPS之間。所以,將資料分桶儲存可以線性提升併發寫入能力。分桶解決的是單個數據庫的併發能力

分桶模型

每一個我們可以看成單個數據庫中的一行記錄,將原來的一行記錄儲存100件庫存,變為用5行記錄分別儲存20件庫存。在對庫存進行操作的時候,就可以通過對使用者ID取模,確定該使用者操縱的是那一行記錄,從而提高單個數據庫的併發能力

秒殺場景中,我們要對庫存的數量進行快取,所以也要對快取進行分桶。每一個桶看成Redis中的一個記錄即可。

快取中的資料相當於庫存的預扣減,預扣減成功那麼就讓該請求去修改資料庫,失敗直接拒絕該請求即可。這裡儘量保持快取資料(弱一致)與資料庫中的資料(強一致)的一致性,但是快取和資料庫分桶之間的關係是一定要保證的。

分桶設計與實現

分桶編排思路

在整個分桶編排的過程,有幾個重要的點:

  1. 在進行分桶編排之前,要先暫停分桶服務,設定為維護狀態,此時使用者無法下單
  2. 暫停分桶服務時,必須使用獨立事務手動提交,確保在繼續執行分桶前,分桶狀態已經提交到資料庫;
  3. 分桶儲存到資料庫後,應同步資料到快取中;
  4. 全量和增量:全量分桶意味著將當前傳入的庫存總量作為最終總量,重新計算分桶資料;而增量分桶則是將傳入的庫存總量累加到已有的庫存中,然後再重新計算分桶資料;
  5. 有無歷史分桶資料:如果此前已有分桶資料,那麼在分桶時則要先進行庫存回收,隨後再統一分配;如果此前無分桶資料,則直接建立新的分桶集;
  6. 分桶中出現任何異常應丟擲以觸發事務回滾
  7. 無論分桶成功或失敗,最後都要重新開啟分桶服務,即取消分桶維護狀態,否則秒殺品將無法售賣;

分桶編排程式碼實現

分桶編排程式碼

``` public void arrangeStockBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, Integer arrangementMode) { logger.info("arrangeBuckets|準備庫存分桶|{},{},{}", itemId, totalStocksAmount, bucketsQuantity); if (itemId == null || totalStocksAmount == null || totalStocksAmount < 0 || bucketsQuantity == null || bucketsQuantity <= 0) { throw new StockBucketException(ErrorCode.INVALID_PARAMS); } // 保證只有一個執行緒對itemId進行更新 DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(ITEM_STOCK_BUCKETS_SUSPEND_KEY + itemId);

    try {
        boolean tryLock = distributedLock.tryLock(5, 5, TimeUnit.SECONDS);
        if (!tryLock) {
            logger.info("arrangeStockBuckets|獲取鎖失敗|{}", itemId);
            return;
        }

        // 手動新增事務
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            // 設定為禁用狀態
            logger.info("suspendBuckets|禁用庫存分桶|{}", itemId);
            int updateStatusByItemId = seckillBucketMapper.updateStatusByItemId(itemId, SeckillBucketStatus.DISABLED.getCode());
            if (updateStatusByItemId < 0) {
                logger.info("arrangeBuckets|關閉庫存分桶失敗|{}", itemId);
                throw new StockBucketException(ErrorCode.ARRANGE_STOCK_BUCKETS_FAILED);
            }
            logger.info("suspendBuckets|庫存分桶已禁用|{}", itemId);
            dataSourceTransactionManager.commit(transactionStatus);
        } catch (Exception e) {
            logger.info("arrangeBuckets|關閉分桶失敗回滾中|{}", itemId, e);
            dataSourceTransactionManager.rollback(transactionStatus);
        }


        List<SeckillBucket> seckillBuckets = seckillBucketMapper.selectByItemId(itemId);
        if (seckillBuckets == null || seckillBuckets.size() == 0) {
            initStockBuckets(itemId, totalStocksAmount, bucketsQuantity);
            return;
        }

        // 根據總量分桶
        if (ArrangementMode.isTotalAmountMode(arrangementMode)) {
            arrangeStockBucketsBasedTotalMode(itemId, totalStocksAmount, bucketsQuantity, seckillBuckets);
        }

        // 根據增量分桶
        if (ArrangementMode.isIncrementalAmountMode(arrangementMode)) {
            rearrangeStockBucketsBasedIncrementalMode(itemId, totalStocksAmount, bucketsQuantity, seckillBuckets);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

```

構建(初始化)分桶程式碼:

``` private void initStockBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity) { SeckillBucket primaryBucket = new SeckillBucket() .initPrimary() .setItemId(itemId) .setTotalStocksAmount(totalStocksAmount); List presentBuckets = buildBuckets(itemId, totalStocksAmount, bucketsQuantity, primaryBucket); submitBucketsToArrange(itemId, presentBuckets); }

private List<SeckillBucket> buildBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, SeckillBucket primaryBucket) {
    if (itemId == null || totalStocksAmount == null || bucketsQuantity == null || bucketsQuantity <= 0) {
        throw new StockBucketException(ErrorCode.INVALID_PARAMS);
    }

    List<SeckillBucket> seckillBucketList = new ArrayList<>();
    Integer averageStockAmount = totalStocksAmount / bucketsQuantity;
    Integer remainStockAmount = totalStocksAmount % bucketsQuantity;
    for (int i = 0; i < bucketsQuantity; i++) {
        if (i == 0) {
            if (primaryBucket == null) {
                primaryBucket = new SeckillBucket();
            }
            primaryBucket
                    .setAvailableStocksAmount(averageStockAmount)
                    .setSerialNo(i)
                    .setStatus(SeckillBucketStatus.ENABLED.getCode());
            seckillBucketList.add(primaryBucket);
            continue;
        }
        SeckillBucket seckillBucket = new SeckillBucket()
                .setSerialNo(i)
                .setStatus(SeckillBucketStatus.ENABLED.getCode())
                .setItemId(itemId);

        if (i < bucketsQuantity - 1) {
            seckillBucket.setAvailableStocksAmount(averageStockAmount)
                    .setTotalStocksAmount(averageStockAmount);
        }

        if (i == bucketsQuantity - 1) {
            Integer restAvailableStocksAmount = averageStockAmount + remainStockAmount;
            seckillBucket.setAvailableStocksAmount(restAvailableStocksAmount)
                    .setTotalStocksAmount(restAvailableStocksAmount);

        }
        seckillBucketList.add(seckillBucket);
    }
    return seckillBucketList;
}

```

儲存入快取和資料庫程式碼

先入資料庫再入快取。

private void submitBucketsToArrange(Long itemId, List<SeckillBucket> presentBuckets) { logger.info("arrangeBuckets|編排庫存分桶|{},{}", itemId, JSON.toJSONString(presentBuckets)); if (itemId == null || itemId <= 0 || CollectionUtils.isEmpty(presentBuckets)) { logger.info("arrangeBuckets|庫存分桶引數錯誤|{}", itemId); throw new BusinessException(ErrorCode.INVALID_PARAMS); } // 先刪除再加入 seckillBucketMapper.deleteById(itemId); int insertBatch = seckillBucketMapper.insertBatch(presentBuckets); if (insertBatch > 1) { // 存入快取 presentBuckets.forEach((seckillBucket -> { distributedCacheService.put(getBucketAvailableStocksCacheKey(itemId, seckillBucket.getSerialNo()), seckillBucket.getAvailableStocksAmount()); distributedCacheService.put(getItemStockBucketsQuantityCacheKey(itemId), presentBuckets.size()); })); } else { logger.info("submitBucketsToArrange|庫存分桶錯誤|{}, {}", itemId, JSON.toJSONString(presentBuckets)); throw new StockBucketException(ErrorCode.ARRANGE_STOCK_BUCKETS_FAILED); } }

根據全量分桶

``` private void arrangeStockBucketsBasedTotalMode(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, List existingBuckets) { // 計運算元桶的剩餘的庫存數 int remainAvailableStocks = existingBuckets.stream() .filter(SeckillBucket::isSubSeckillBucket) .mapToInt(SeckillBucket::getAvailableStocksAmount).sum(); Optional optionalSeckillBucket = existingBuckets.stream().filter(SeckillBucket::isPrimarySeckillBucket).findFirst(); if (!optionalSeckillBucket.isPresent()) { throw new StockBucketException(ErrorCode.PRIMARY_BUCKET_IS_MISSING); }

    // 回收分桶庫存到主桶
    SeckillBucket primarySeckillBucket = optionalSeckillBucket.get();
    primarySeckillBucket.addAvailableStocks(remainAvailableStocks);
    // 已售出的庫存
    int soldStocksAmount = primarySeckillBucket.getTotalStocksAmount() - primarySeckillBucket.getAvailableStocksAmount();

    if (soldStocksAmount > totalStocksAmount) {
        throw new StockBucketException(799, "已售庫存大於當期所設庫存總量!");
    }

    // 設定最新庫存,重新分桶
    primarySeckillBucket.setTotalStocksAmount(totalStocksAmount);
    List<SeckillBucket> seckillBucketList = buildBuckets(itemId, totalStocksAmount, bucketsQuantity, primarySeckillBucket);
    submitBucketsToArrange(itemId, seckillBucketList);
}

```

根據增量分桶

``` private void rearrangeStockBucketsBasedIncrementalMode(Long itemId, Integer incrementalStocksAmount, Integer bucketsQuantity, List existingBuckets) { Optional optionalSeckillBucket = existingBuckets.stream().filter(SeckillBucket::isPrimarySeckillBucket).findFirst(); if (!optionalSeckillBucket.isPresent()) { throw new StockBucketException(ErrorCode.PRIMARY_BUCKET_IS_MISSING); }

    // 回收分桶庫存 (獲取當前所有桶剩餘的可用庫存數)
    int remainAvailableStocks = existingBuckets.stream().mapToInt(SeckillBucket::getAvailableStocksAmount).sum();

    // 加上要新增的庫存數
    Integer totalAvailableStocks = remainAvailableStocks + incrementalStocksAmount;
    int presentAvailableStocks = remainAvailableStocks + incrementalStocksAmount;

    if (presentAvailableStocks < 0) {
        throw new StockBucketException(ErrorCode.STOCK_NOT_ENOUGH);
    }

    SeckillBucket primarySeckillBucket = optionalSeckillBucket.get();
    primarySeckillBucket.increaseTotalStocksAmount(incrementalStocksAmount);

    List<SeckillBucket> seckillBucketList = buildBuckets(itemId, totalAvailableStocks, bucketsQuantity, primarySeckillBucket);
    submitBucketsToArrange(itemId, seckillBucketList);
}

```

不同分桶之間的數量差異

存在問題

使用者在訪問可用庫存的時候,會存在一個問題:路由到不同分桶的流量可能存在差異和不均,這會導致不同分桶的餘量不同,展示到不同使用者上的數量就會不同。例如:#1桶中庫存為0,但#2桶中庫存大於0。

解決方法

  1. 設計庫存借用機制,當某個分桶庫存不足時,可以從其他桶借庫存;
  2. 主桶和分桶留有一定冗餘庫存,分桶庫存不足時可以向主桶申請;
  3. 允許不同使用者看到不同的庫存餘量,所路由到的分桶沒有庫存時直接展示無庫存;

在秒殺場景中,我們一般選擇第三種,因為它足夠的簡單高效,重點維護服務端的資料一致性極致的效能。前面兩種方法會大大增加系統的複雜度,在選擇的時候要慎重考慮

扣減庫存實現程式碼

先扣除快取在扣減資料庫,快取充當一個預扣減的作用,這裡不再詳細討論。參考文章https://juejin.cn/post/7185205290278027319

扣減快取庫存的Lua指令碼

``` --- 對應的庫存鍵不存在 if (redis.call('exists', KEYS[1]) == 0) then return -996 end --- 分桶禁用鎖 if (redis.call('exists', KEYS[2]) == 1) then return -998 end --- 庫存排程鎖 if (redis.call('exists', KEYS[3]) == 1) then return -997 end if (redis.call('exists', KEYS[1]) == 1) then local stocksAmount = tonumber(redis.call('get', KEYS[1])) local quantity = tonumber(ARGV[1]) --- 庫存不夠 if (stocksAmount < quantity) then return -1 end

if (stocksAmount >= quantity) then
    redis.call('incrby', KEYS[1], 0 - quantity)
    return 1
end

end

return -10000 ```

使用樂觀鎖釦減資料庫庫存

<update id="decreaseBucketStock"> update seckill_bucket set available_stocks_amount = available_stocks_amount - #{quantity,jdbcType=NUMERIC} where item_id = #{itemId,jdbcType=NUMERIC} AND serial_no = #{serialNo,jdbcType=NUMERIC} AND available_stocks_amount = #{oldAvailableStocksAmount,jdbcType=NUMERIC} AND available_stocks_amount <![CDATA[ >= ]]> #{quantity,jdbcType=NUMERIC} </update>