後端思維篇:手把手教你寫一個並行調用模板
1. 一個串行調用的例子
如果讓你設計一個APP首頁查詢的接口,它需要查用户信息、需要查banner信息、需要查標籤信息等等。一般情況,小夥伴會實現如下:
public AppHeadInfoResponse queryAppHeadInfo(AppInfoReq req) { //查用户信息 UserInfoParam userInfoParam = buildUserParam(req); UserInfoDTO userInfoDTO = userService.queryUserInfo(userInfoParam); //查banner信息 BannerParam bannerParam = buildBannerParam(req); BannerDTO bannerDTO = bannerService.queryBannerInfo(bannerParam); //查標籤信息 LabelParam labelParam = buildLabelParam(req); LabelDTO labelDTO = labelService.queryLabelInfo(labelParam); //組裝結果 return buildResponse(userInfoDTO,bannerDTO,labelDTO); }
這段代碼會有什麼問題嘛?其實這是一段挺正常的代碼,但是這個方法實現中,查詢用户、banner、標籤信息,是串行的。如果查詢用户信息耗時200ms,查詢banner信息100ms,查詢標籤信息200ms的話,耗時就是500ms啦。
其實為了優化性能,我們可以修改為並行調用的方式,耗時可以降為200ms,如下圖所示:
2. CompletionService實現並行調用
對於上面的例子,如何實現並行調用呢?
有小夥伴説,可以使用Future+Callable實現多個任務的並行調用。但是線程池執行批量任務時,返回值用Future的get()獲取是阻塞的,如果前一個任務執行比較耗時的話,get()方法會阻塞,形成排隊等待的情況。
而CompletionService是對定義ExecutorService進行了包裝,可以一邊生成任務,一邊獲取任務的返回值。讓這兩件事分開執行,任務之間不會互相阻塞,可以獲取最先完成的任務結果。
CompletionService的實現原理比較簡單,底層通過FutureTask+阻塞隊列,實現了任務先完成的話,可優先獲取到。也就是説任務執行結果按照完成的先後順序來排序,先完成可以優先獲取到。內部有一個先進先出的阻塞隊列,用於保存已經執行完成的Future,你調用CompletionService的poll或take方法即可獲取到一個已經執行完成的Future,進而通過調用Future接口實現類的get方法獲取最終的結果。
接下來,我們來看下,如何用CompletionService,實現並行查詢APP首頁信息哈。思考步驟如下:
我們先把查詢用户信息的任務,放到線程池,如下:
ExecutorService executor = Executors.newFixedThreadPool(10); //查詢用户信息 CompletionService<UserInfoDTO> userDTOCompletionService = new ExecutorCompletionService<UserInfoDTO>(executor); Callable<UserInfoDTO> userInfoDTOCallableTask = () -> { UserInfoParam userInfoParam = buildUserParam(req); return userService.queryUserInfo(userInfoParam); }; userDTOCompletionService.submit(userInfoDTOCallableTask);
如果想把查詢banner信息的任務,也放到這個線程池的話,發現不好放了,因為返回類型不一樣,一個是UserInfoDTO,另外一個是BannerDTO。那這時候,我們把泛型聲明為Object即可,因為所有對象都是繼承於Object的。如下:
ExecutorService executor = Executors.newFixedThreadPool(10); //查詢用户信息 CompletionService<Object> baseDTOCompletionService = new ExecutorCompletionService<Object>(executor); Callable<Object> userInfoDTOCallableTask = () -> { UserInfoParam userInfoParam = buildUserParam(req); return userService.queryUserInfo(userInfoParam); }; //banner信息任務 Callable<Object> bannerDTOCallableTask = () -> { BannerParam bannerParam = buildBannerParam(req); return bannerService.queryBannerInfo(bannerParam); }; //提交用户信息任務 baseDTOCompletionService.submit(userInfoDTOCallableTask); //提交banner信息任務 baseDTOCompletionService.submit(bannerDTOCallableTask);
這裏會有個問題,就是獲取返回值的時候,我們不知道哪個Object是用户信息的DTO,哪個是BannerDTO?怎麼辦呢?這時候,我們可以在參數裏面做個擴展嘛,即參數聲明為一個基礎對象BaseRspDTO,再搞個泛型放Object數據的,然後基礎對象BaseRspDTO有個區分是UserDTO還是BannerDTO的唯一標記屬性key。代碼如下:
public class BaseRspDTO<T extends Object> { //區分是DTO返回的唯一標記,比如是UserInfoDTO還是BannerDTO private String key; //返回的data private T data; public String getKey() { return key; } public void setKey(String key) { this.key = key; } public T getData() { return data; } public void setData(T data) { this.data = data; } } //並行查詢App首頁信息 public AppHeadInfoResponse parallelQueryAppHeadPageInfo(AppInfoReq req) { long beginTime = System.currentTimeMillis(); System.out.println("開始並行查詢app首頁信息,開始時間:" + beginTime); ExecutorService executor = Executors.newFixedThreadPool(10); CompletionService<BaseRspDTO<Object>> baseDTOCompletionService = new ExecutorCompletionService<BaseRspDTO<Object>>(executor); //查詢用户信息任務 Callable<BaseRspDTO<Object>> userInfoDTOCallableTask = () -> { UserInfoParam userInfoParam = buildUserParam(req); UserInfoDTO userInfoDTO = userService.queryUserInfo(userInfoParam); BaseRspDTO<Object> userBaseRspDTO = new BaseRspDTO<Object>(); userBaseRspDTO.setKey("userInfoDTO"); userBaseRspDTO.setData(userInfoDTO); return userBaseRspDTO; }; //banner信息查詢任務 Callable<BaseRspDTO<Object>> bannerDTOCallableTask = () -> { BannerParam bannerParam = buildBannerParam(req); BannerDTO bannerDTO = bannerService.queryBannerInfo(bannerParam); BaseRspDTO<Object> bannerBaseRspDTO = new BaseRspDTO<Object>(); bannerBaseRspDTO.setKey("bannerDTO"); bannerBaseRspDTO.setData(bannerDTO); return bannerBaseRspDTO; }; //label信息查詢任務 Callable<BaseRspDTO<Object>> labelDTODTOCallableTask = () -> { LabelParam labelParam = buildLabelParam(req); LabelDTO labelDTO = labelService.queryLabelInfo(labelParam); BaseRspDTO<Object> labelBaseRspDTO = new BaseRspDTO<Object>(); labelBaseRspDTO.setKey("labelDTO"); labelBaseRspDTO.setData(labelDTO); return labelBaseRspDTO; }; //提交用户信息任務 baseDTOCompletionService.submit(userInfoDTOCallableTask); //提交banner信息任務 baseDTOCompletionService.submit(bannerDTOCallableTask); //提交label信息任務 baseDTOCompletionService.submit(labelDTODTOCallableTask); UserInfoDTO userInfoDTO = null; BannerDTO bannerDTO = null; LabelDTO labelDTO = null; try { //因為提交了3個任務,所以獲取結果次數是3 for (int i = 0; i < 3; i++) { Future<BaseRspDTO<Object>> baseRspDTOFuture = baseDTOCompletionService.poll(1, TimeUnit.SECONDS); BaseRspDTO baseRspDTO = baseRspDTOFuture.get(); if ("userInfoDTO".equals(baseRspDTO.getKey())) { userInfoDTO = (UserInfoDTO) baseRspDTO.getData(); } else if ("bannerDTO".equals(baseRspDTO.getKey())) { bannerDTO = (BannerDTO) baseRspDTO.getData(); } else if ("labelDTO".equals(baseRspDTO.getKey())) { labelDTO = (LabelDTO) baseRspDTO.getData(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("結束並行查詢app首頁信息,總耗時:" + (System.currentTimeMillis() - beginTime)); return buildResponse(userInfoDTO, bannerDTO, labelDTO); }
到這裏為止,一個基於CompletionService實現並行調用的例子已經實現啦。是不是很開心,哈哈。
3. 抽取通用的並行調用方法
我們回過來觀察下第2小節,查詢app首頁信息的demo:CompletionService實現了並行調用。不過大家有沒有什麼其他優化想法呢?比如,假設別的業務場景,也想通過並行調用優化,那是不是也得搞一套類似第2小節的代碼。所以,我們是不是可以抽取一個通用的並行方法,讓別的場景也可以用,對吧?這就是後端思維啦!
基於第2小節的代碼,我們如何抽取通用的並行調用方法呢。
首先,這個通用的並行調用方法,不能跟業務相關的屬性掛鈎,所以方法的入參應該有哪些呢?
方法的入參,可以有Callable。因為並行,肯定是多個Callable任務的。所以,入參應該是一個Callable的數組。再然後,基於上面的APP首頁查詢的例子,Callable裏面得帶BaseRspDTO泛型,對吧?因此入參就是List>> list。
那並行調用的出參呢?你有多個Callable的任務,是不是得有多個對應的返回,因此,你的出參可以是List>。我們抽取的通用並行調用模板,就可以寫成醬紫:
public List<BaseRspDTO<Object>> executeTask(List<Callable<BaseRspDTO<Object>>> taskList) { List<BaseRspDTO<Object>> resultList = new ArrayList<>(); //校驗參數 if (taskList == null || taskList.size() == 0) { return resultList; } ExecutorService executor = Executors.newFixedThreadPool(10); CompletionService<BaseRspDTO<Object>> baseDTOCompletionService = new ExecutorCompletionService<BaseRspDTO<Object>>(executor); //提交任務 for (Callable<BaseRspDTO<Object>> task : taskList) { baseDTOCompletionService.submit(task); } try { //遍歷獲取結果 for (int i = 0; i < taskList.size(); i++) { Future<BaseRspDTO<Object>> baseRspDTOFuture = baseDTOCompletionService.poll(2, TimeUnit.SECONDS); resultList.add(baseRspDTOFuture.get()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return resultList; }
既然我們是抽取通用的並行調用方法,那以上的方法是否還有哪些地方需要改進的呢?
- 第一個可以優化的地方,就是executor線程池,比如有些業務場景想用A線程池,有些業務想用B線程池,那麼,這個方法,就不通用啦,對吧。我們可以把線程池以參數的形式提供出來,給調用方自己控制。
- 第二個可以優化的地方,就是CompletionService的poll方法獲取時,超時時間是寫死的。因為不同業務場景,超時時間要求可能不一樣。所以,超時時間也是可以以參數形式放出來,給調用方自己控制。
我們再次優化一下這個通用的並行調用模板,代碼如下:
public List<BaseRspDTO<Object>> executeTask(List<Callable<BaseRspDTO<Object>>> taskList, long timeOut, ExecutorService executor) { List<BaseRspDTO<Object>> resultList = new ArrayList<>(); //校驗參數 if (taskList == null || taskList.size() == 0) { return resultList; } if (executor == null) { return resultList; } if (timeOut <= 0) { return resultList; } //提交任務 CompletionService<BaseRspDTO<Object>> baseDTOCompletionService = new ExecutorCompletionService<BaseRspDTO<Object>>(executor); for (Callable<BaseRspDTO<Object>> task : taskList) { baseDTOCompletionService.submit(task); } try { //遍歷獲取結果 for (int i = 0; i < taskList.size(); i++) { Future<BaseRspDTO<Object>> baseRspDTOFuture = baseDTOCompletionService.poll(timeOut, TimeUnit.SECONDS); resultList.add(baseRspDTOFuture.get()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return resultList; }
以後別的場景也需要用到並行調用的話,直接調用你的這個方法即可,是不是有點小小的成就感啦,哈哈。
4. 代碼思考以及設計模式應用
我們把抽取的那個公用的並行調用方法,應用到App首頁信息查詢的例子,代碼如下:
public AppHeadInfoResponse parallelQueryAppHeadPageInfo1(AppInfoReq req) { long beginTime = System.currentTimeMillis(); System.out.println("開始並行查詢app首頁信息,開始時間:" + beginTime); //用户信息查詢任務 Callable<BaseRspDTO<Object>> userInfoDTOCallableTask = () -> { UserInfoParam userInfoParam = buildUserParam(req); UserInfoDTO userInfoDTO = userService.queryUserInfo(userInfoParam); BaseRspDTO<Object> userBaseRspDTO = new BaseRspDTO<Object>(); userBaseRspDTO.setKey("userInfoDTO"); userBaseRspDTO.setData(userInfoDTO); return userBaseRspDTO; }; //banner信息查詢任務 Callable<BaseRspDTO<Object>> bannerDTOCallableTask = () -> { BannerParam bannerParam = buildBannerParam(req); BannerDTO bannerDTO = bannerService.queryBannerInfo(bannerParam); BaseRspDTO<Object> bannerBaseRspDTO = new BaseRspDTO<Object>(); bannerBaseRspDTO.setKey("bannerDTO"); bannerBaseRspDTO.setData(bannerDTO); return bannerBaseRspDTO; }; //label信息查詢任務 Callable<BaseRspDTO<Object>> labelDTODTOCallableTask = () -> { LabelParam labelParam = buildLabelParam(req); LabelDTO labelDTO = labelService.queryLabelInfo(labelParam); BaseRspDTO<Object> labelBaseRspDTO = new BaseRspDTO<Object>(); labelBaseRspDTO.setKey("labelDTO"); labelBaseRspDTO.setData(labelDTO); return labelBaseRspDTO; }; List<Callable<BaseRspDTO<Object>>> taskList = new ArrayList<>(); taskList.add(userInfoDTOCallableTask); taskList.add(bannerDTOCallableTask); taskList.add(labelDTODTOCallableTask); ExecutorService executor = Executors.newFixedThreadPool(10); List<BaseRspDTO<Object>> resultList = parallelInvokeCommonService.executeTask(taskList, 3, executor); if (resultList == null || resultList.size() == 0) { return new AppHeadInfoResponse(); } UserInfoDTO userInfoDTO = null; BannerDTO bannerDTO = null; LabelDTO labelDTO = null; //遍歷結果 for (int i = 0; i < resultList.size(); i++) { BaseRspDTO baseRspDTO = resultList.get(i); if ("userInfoDTO".equals(baseRspDTO.getKey())) { userInfoDTO = (UserInfoDTO) baseRspDTO.getData(); } else if ("bannerDTO".equals(baseRspDTO.getKey())) { bannerDTO = (BannerDTO) baseRspDTO.getData(); } else if ("labelDTO".equals(baseRspDTO.getKey())) { labelDTO = (LabelDTO) baseRspDTO.getData(); } } System.out.println("結束並行查詢app首頁信息,總耗時:" + (System.currentTimeMillis() - beginTime)); return buildResponse(userInfoDTO, bannerDTO, labelDTO); }
基於以上代碼,小夥伴們,是否還有其他方面的優化想法呢?比如這幾個Callable查詢任務,我們是不是也可以抽取一下?讓代碼更加簡潔。
二話不説,現在我們直接建一個BaseTaskCommand類,實現Callable接口,把查詢用户信息、查詢banner信息、label標籤信息的查詢任務放進去。
代碼如下:
public class BaseTaskCommand implements Callable<BaseRspDTO<Object>> { private String key; private AppInfoReq req; private IUserService userService; private IBannerService bannerService; private ILabelService labelService; public BaseTaskCommand(String key, AppInfoReq req, IUserService userService, IBannerService bannerService, ILabelService labelService) { this.key = key; this.req = req; this.userService = userService; this.bannerService = bannerService; this.labelService = labelService; } @Override public BaseRspDTO<Object> call() throws Exception { if ("userInfoDTO".equals(key)) { UserInfoParam userInfoParam = buildUserParam(req); UserInfoDTO userInfoDTO = userService.queryUserInfo(userInfoParam); BaseRspDTO<Object> userBaseRspDTO = new BaseRspDTO<Object>(); userBaseRspDTO.setKey("userInfoDTO"); userBaseRspDTO.setData(userInfoDTO); return userBaseRspDTO; } else if ("bannerDTO".equals(key)) { BannerParam bannerParam = buildBannerParam(req); BannerDTO bannerDTO = bannerService.queryBannerInfo(bannerParam); BaseRspDTO<Object> bannerBaseRspDTO = new BaseRspDTO<Object>(); bannerBaseRspDTO.setKey("bannerDTO"); bannerBaseRspDTO.setData(bannerDTO); return bannerBaseRspDTO; } else if ("labelDTO".equals(key)) { LabelParam labelParam = buildLabelParam(req); LabelDTO labelDTO = labelService.queryLabelInfo(labelParam); BaseRspDTO<Object> labelBaseRspDTO = new BaseRspDTO<Object>(); labelBaseRspDTO.setKey("labelDTO"); labelBaseRspDTO.setData(labelDTO); return labelBaseRspDTO; } return null; } private UserInfoParam buildUserParam(AppInfoReq req) { return new UserInfoParam(); } private BannerParam buildBannerParam(AppInfoReq req) { return new BannerParam(); } private LabelParam buildLabelParam(AppInfoReq req) { return new LabelParam(); } }
以上這塊代碼,構造函數還是有比較多的參數,並且call()方法中,有多個if...else...,如果新增一個條件分支(比如查詢浮層信息),那又得在call方法裏修改了,並且BaseTaskCommand的構造器也要修改了。
大家是否有印象,當程序中出現多個if...else...時,我們就可以考慮使用策略模式+工廠模式優化。
我們聲明多個策略實現類,把條件分支裏的實現,搬到策略類,如下:
public interface IBaseTask { //返回每個策略類的key,如是usetInfoDTO還是bannerDTO,還是labelDTO String getTaskType(); BaseRspDTO<Object> execute(AppInfoReq req); } //用户信息策略類 @Service public class UserInfoStrategyTask implements IBaseTask { @Autowired private IUserService userService; @Override public String getTaskType() { return "userInfoDTO"; } @Override public BaseRspDTO<Object> execute(AppInfoReq req) { UserInfoParam userInfoParam = userService.buildUserParam(req); UserInfoDTO userInfoDTO = userService.queryUserInfo(userInfoParam); BaseRspDTO<Object> userBaseRspDTO = new BaseRspDTO<Object>(); userBaseRspDTO.setKey(getTaskType()); userBaseRspDTO.setData(userBaseRspDTO); return userBaseRspDTO; } } /** * banner信息策略實現類 **/ @Service public class BannerStrategyTask implements IBaseTask { @Autowired private IBannerService bannerService; @Override public String getTaskType() { return "bannerDTO"; } @Override public BaseRspDTO<Object> execute(AppInfoReq req) { BannerParam bannerParam = bannerService.buildBannerParam(req); BannerDTO bannerDTO = bannerService.queryBannerInfo(bannerParam); BaseRspDTO<Object> bannerBaseRspDTO = new BaseRspDTO<Object>(); bannerBaseRspDTO.setKey(getTaskType()); bannerBaseRspDTO.setData(bannerDTO); return bannerBaseRspDTO; } } ...
然後這幾個策略實現類,怎麼交給spring管理呢?我們可以實現ApplicationContextAware接口,把策略的實現類注入到一個map,然後根據請求方不同的策略請求類型(即userInfoDTO還是bannerDTO等),去實現不同的策略類調用。其實這類似於工廠模式的思想。代碼如下:
/** * 策略工廠類 **/ @Component public class TaskStrategyFactory implements ApplicationContextAware { private Map<String, IBaseTask> map = new ConcurrentHashMap<>(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, IBaseTask> tempMap = applicationContext.getBeansOfType(IBaseTask.class); tempMap.values().forEach(iBaseTask -> { map.put(iBaseTask.getTaskType(), iBaseTask); }); } public BaseRspDTO<Object> executeTask(String key, AppInfoReq req) { IBaseTask baseTask = map.get(key); if (baseTask != null) { System.out.println("工廠策略實現類執行"); return baseTask.execute(req); } return null; } }
有了策略工廠類TaskStrategyFactory,我們再回來優化下BaseTaskCommand類的代碼。它的構造器已經不需要多個IUserService userService, IBannerService bannerService, ILabelService labelService啦,只需要傳入策略工廠類TaskStrategyFactory即可。同時策略也不需要多個if...else...判斷了,用策略工廠類TaskStrategyFactory代替即可。優化後的代碼如下:
public class BaseTaskCommand implements Callable<BaseRspDTO<Object>> { private String key; private AppInfoReq req; private TaskStrategyFactory taskStrategyFactory; public BaseTaskCommand(String key, AppInfoReq req, TaskStrategyFactory taskStrategyFactory) { this.key = key; this.req = req; this.taskStrategyFactory = taskStrategyFactory; } @Override public BaseRspDTO<Object> call() throws Exception { return taskStrategyFactory.executeTask(key, req); } }
因此整個app首頁信息並行查詢,就可以優化成這樣啦,如下:
public AppHeadInfoResponse parallelQueryAppHeadPageInfo2(AppInfoReq req) { long beginTime = System.currentTimeMillis(); System.out.println("開始並行查詢app首頁信息(最終版本),開始時間:" + beginTime); List<Callable<BaseRspDTO<Object>>> taskList = new ArrayList<>(); //用户信息查詢任務 taskList.add(new BaseTaskCommand("userInfoDTO", req, taskStrategyFactory)); //banner查詢任務 taskList.add(new BaseTaskCommand("bannerDTO", req, taskStrategyFactory)); //標籤查詢任務 taskList.add(new BaseTaskCommand("labelDTO", req, taskStrategyFactory)); ExecutorService executor = Executors.newFixedThreadPool(10); List<BaseRspDTO<Object>> resultList = parallelInvokeCommonService.executeTask(taskList, 3, executor); if (resultList == null || resultList.size() == 0) { return new AppHeadInfoResponse(); } UserInfoDTO userInfoDTO = null; BannerDTO bannerDTO = null; LabelDTO labelDTO = null; for (BaseRspDTO<Object> baseRspDTO : resultList) { if ("userInfoDTO".equals(baseRspDTO.getKey())) { userInfoDTO = (UserInfoDTO) baseRspDTO.getData(); } else if ("bannerDTO".equals(baseRspDTO.getKey())) { bannerDTO = (BannerDTO) baseRspDTO.getData(); } else if ("labelDTO".equals(baseRspDTO.getKey())) { labelDTO = (LabelDTO) baseRspDTO.getData(); } } System.out.println("結束並行查詢app首頁信息(最終版本),總耗時:" + (System.currentTimeMillis() - beginTime)); return buildResponse(userInfoDTO, bannerDTO, labelDTO); }
5. 思考
總結以上代碼整體優化下來,已經很簡潔啦。那還有沒有別的優化思路呢。
其實還是有的,比如,把唯一標記的key定義為枚舉,而不是寫死的字符串"userInfoDTO"、"bannerDTO","labelDTO"。還有,除了CompletionService,有些小夥伴喜歡用CompletableFuture實行並行調用,大家可以自己動手操戈寫一寫。
本文大家學到了哪些知識呢?
如何優化接口性能?某些場景下,可以使用並行調用代替串行。
如何實現並行調用呢?可以使用CompletionService。
學到的後端思維是?日常開發中,要學會抽取通用的方法、或者工具。
策略模式和工廠模式的應用
本文的話,設計模式這塊還不是很詳細,下一篇,給大家講講,我是如何在現有代碼基礎上,抽取設計模式的哈。
- Web1.0到Web3.0,互聯網是如何演進的?
- DDD概念複雜難懂,實際落地如何設計代碼實現模型?
- 學會一招!如何利用 pandas 批量合併 Excel?
- 超硬核!11個非常實用的 Python 和 Shell 拿來就用腳本實例!
- 還在用requests寫爬蟲嗎?這個庫效率提高一倍!
- 分佈式編程工具Akka Streams、Kafka Streams和Spark Streaming大PK
- 工具類如何獲取到 Spring 容器中的 Bean?
- code-review之前端代碼優化彙總
- 七個讓我們成為更好 Vue 開發者的技巧
- 效率寶典:10個超實用的React Hooks庫
- 如何將HTML與Htmx一起使用並減少JavaScript代碼量
- 一個依賴搞定Spring Boot 配置文件脱敏
- 如何在 Python 中使用 DateTime
- 如何利用OpenTelemetry識別數據庫依賴關係?
- Node.js 子線程Crash 問題的排查
- 如何在Python中操作MySQL?
- 異步非阻塞框架是如何實現的?
- 從 CPU 説起,深入理解 Java 內存模型!
- JavaScript逆向案例:破解登錄密碼
- 如何從FreeBSD 12升級到FreeBSD13