BlockingQueue
的功能以及常見使用場景是很是普遍的,讀者能夠自行百度去了解 BlockingQueue
的核心方法以及BlockingQueue
家庭大體有哪些成員,這裏就再也不班門弄斧。推薦資料:不怕難之BlockingQueue及其實現html
這裏引用一個案例基於BlockingQueue
實現Web中的長鏈接聊天功能前端
BlockingQueueTest
java
package com.baba.bracelet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * @Author wulongbo * @Date 2020/12/1 9:16 * @Version 1.0 */ public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException { // 聲明一個容量爲10的緩存隊列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); //new了三個生產者和一個消費者 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); // 藉助Executors ExecutorService service = Executors.newCachedThreadPool(); // 啓動線程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 執行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); } }
Producer
生產者mysql
package com.spring.security.demo; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @Author wulongbo * @Date 2020/12/1 9:17 * @Version 1.0 */ public class Producer implements Runnable { private volatile boolean isRunning = true;//是否在運行標誌 private BlockingQueue queue;//阻塞隊列 private static AtomicInteger count = new AtomicInteger();//自動更新的值 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //構造函數 public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { String data = null; Random r = new Random(); System.out.println("啓動生產者線程!"); try { while (isRunning) { System.out.println("正在生產數據..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一個隨機數 data = "data:" + count.incrementAndGet();//以原子方式將count當前值加1 System.out.println("將數據:" + data + "放入隊列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//設定的等待時間爲2s,若是超過2s還沒加進去返回true System.out.println("放入數據失敗:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出生產者線程!"); } } public void stop() { isRunning = false; } }
Consumer
消費者git
package com.spring.security.demo; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * @Author wulongbo * @Date 2020/12/1 9:21 * @Version 1.0 */ public class Consumer implements Runnable { private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; //構造函數 public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { System.out.println("啓動消費者線程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println("正從隊列獲取數據..."); String data = queue.poll(2, TimeUnit.SECONDS);//有數據時直接從隊列的隊首取走,無數據時阻塞,在2s內有數據,取走,超過2s還沒數據,返回失敗 if (null != data) { System.out.println("拿到數據:" + data); System.out.println("正在消費數據:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超過2s還沒數據,認爲全部生產線程都已經退出,自動退出消費線程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println("退出消費者線程!"); } } }
以上的簡單代碼演示瞭如何使用 BlockingQueue
以及它的部分核心方法的使用,讀者可以很好觸類旁通。github
該案例基於springboot
集成Mybatis-Plus
的項目中來講明:讀者能夠參考搭建Springboot項目並集成Mybatis-Plus javaspringboot
如今來作前期的環境準備。
這裏爲了方便測試咱們加入swagger依賴:web
<!-- 引入swagger2依賴--> <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency>
在config
目錄中添加swagger配置類SwaggerConfig
spring
package com.mybatis.plus.config; import freemarker.ext.util.ModelCache; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.ParameterBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.schema.ModelRef; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Parameter; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; import java.util.ArrayList; import java.util.List; /** * @Author wulongbo * @Date 2020/11/21 11:50 * @Version 1.0 */ @Configuration @EnableSwagger2 public class SwaggerConfig { /** * 經過 createRestApi函數來構建一個DocketBean * 函數名,能夠隨意命名,喜歡什麼命名就什麼命名 */ @Bean public Docket createRestApi() { return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo())//調用apiInfo方法,建立一個ApiInfo實例,裏面是展現在文檔頁面信息內容 .select() //控制暴露出去的路徑下的實例 //若是某個接口不想暴露,可使用如下註解 //@ApiIgnore 這樣,該接口就不會暴露在 swagger2 的頁面下 .apis(RequestHandlerSelectors.basePackage("com.mybatis.plus")) .paths(PathSelectors.any()) .build(); } //構建 api文檔的詳細信息函數 private ApiInfo apiInfo() { return new ApiInfoBuilder() //頁面標題 .title("Spring Boot Swagger2 構建把把智能") //條款地址 .termsOfServiceUrl("http://despairyoke.github.io/") .contact("zwd") .version("1.0") //描述 .description("API 描述") .build(); } }
注意:swagger的掃包路徑,才能正確訪問swagger
目錄結構以下:sql
在IUserService
中添加兩個接口數據庫
void recordJob(User job); @PostConstruct void init();
實現類 UserServiceImpl
中的代碼以下:
package com.mybatis.plus.service.impl; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.mybatis.plus.entity.User; import com.mybatis.plus.mapper.UserMapper; import com.mybatis.plus.service.IUserService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * <p> * 服務實現類 * </p> * * @author wulongbo * @since 2020-11-09 */ @Service("iUserService") public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService { private static Logger logger = LoggerFactory.getLogger(UserServiceImpl.class); //定義一個容量爲10000的阻塞隊列,BlockingQueue線程安全能夠多個生產者同時put private BlockingQueue<User> dataQueue = new LinkedBlockingQueue<User>(10000); private List<User> list = new ArrayList<User>(); @Override public void recordJob(User job) { try { //put任務的方法,供生產者調用 dataQueue.put(job); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } @PostConstruct @Override public void init() { Thread thread = new Thread(() -> { logger.info("啓動批量守護線程,啓動時間{}", new Date(System.currentTimeMillis())); while (Boolean.TRUE) { User poll = null; boolean pollTimeOut = false; long startTime; long endTime; try { // poll時設置超時時間爲2秒 poll = dataQueue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.info("批量更新Job異常"); Thread.currentThread().interrupt(); } if (null != poll) { // poll到任務添加到List中 list.add(poll); } else { // poll超時,設置超時標誌位 pollTimeOut = true; } // 若是任務List等於300或poll超時且List中還有任務就批量更新 if (list.size() == 300|| (pollTimeOut && !CollectionUtils.isEmpty(list))){ startTime = System.currentTimeMillis(); saveOrUpdateBatch(list); logger.info("Job任務批量更新{}條任務,耗時{}毫秒", list.size(), System.currentTimeMillis()-startTime); list.clear(); } } }); thread.setName("job-batchUpdate-deamon"); // 設置啓動的線程爲守護線程 直到jvm停了才中止 thread.setDaemon(true); thread.start(); } }
控制頁面模擬一下插入操做 UserController
代碼以下:
package com.mybatis.plus.controller; import com.mybatis.plus.entity.User; import com.mybatis.plus.service.IUserService; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * <p> * 前端控制器 * </p> * * @author wulongbo * @since 2020-11-09 */ @RestController @RequestMapping("/user") @CrossOrigin public class UserController { @Autowired private IUserService iUserService; @ApiOperation(value="添加用戶信息", notes="添加用戶信息") @ApiImplicitParams({ @ApiImplicitParam(paramType = "query", name = "name", dataType = "String", required = true, value = "姓名"), @ApiImplicitParam(paramType = "query", name = "age", dataType = "int", required = true, value = "年齡"), @ApiImplicitParam(paramType = "query", name = "email", dataType = "String", required = true, value = "郵箱") }) @PostMapping("/addUser") public boolean addUser(String name,Integer age,String email) { try { User user=new User(); user.setName(name); user.setAge(age); user.setEmail(email); iUserService.recordJob(user); }catch (Exception e){ return false; } return true; } }
啓動項目並訪問http://localhost:8080/swagger-ui.html#/
注意檢查一下:
User
實體類中的主鍵id是否設置成自動遞增/** * 主鍵ID */@TableId(value = "id", type = IdType.AUTO) private Long id;
2.數據庫User表主鍵是否設置主鍵自動遞增
填寫用戶信息,並調用添加用戶信息接口
連按多下(做者按了3次),測試 BlockingQueue
是否把User對象都緩存到了阻塞隊列中,再一次性消費掉。
能夠看到控制檯打印日誌:
覈對數據庫表,查看是否成功插入數據
OK,證實咱們的隊列生效!這裏便演示了使用 BlockingQueue
在併發狀況下對mysql作的批量操做。
這裏讀者已經能夠體會 BlockingQueue
給咱們帶來的便利了,批量刪除差別點就是在 對阻塞隊列的定義上面
咱們使用
//定義一個容量爲10000的阻塞隊列,BlockingQueue線程安全能夠多個生產者同時put private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);
來給生產者調用,【BlockingQueue<Long>,或者BlockingQueue<Integer>依據實際的主鍵類型來建立就好】。
同理咱們在 IUserService
中添加刪除的接口
void delJob(User job); @PostConstruct void delInit();
再其實現類 UserServiceImpl
中添加實現方法
//定義一個容量爲10000的阻塞隊列,BlockingQueue線程安全能夠多個生產者同時put private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000); private List<Long> delList = new ArrayList<Long>(); @Override public void delJob(User job) { try { //put任務的方法,供生產者調用 delDataQueue.put(job.getId()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } @PostConstruct @Override public void delInit() { Thread thread = new Thread(() -> { logger.info("啓動批量刪除守護線程,啓動時間{}", new Date(System.currentTimeMillis())); while (Boolean.TRUE) { Long poll = null; boolean pollTimeOut = false; long startTime; long endTime; try { // poll時設置超時時間爲2秒 poll = delDataQueue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } if (null != poll) { // poll到任務添加到List中 delList.add(poll); } else { // poll超時,設置超時標誌位 pollTimeOut = true; } // 若是任務List等於5000或poll超時且List中還有任務就批量更新 if (delList.size() == 300|| (pollTimeOut && !CollectionUtils.isEmpty(delList))){ startTime = System.currentTimeMillis(); removeByIds(delList); logger.info("Job任務批量刪除{}條任務,耗時{}毫秒", delList.size(), System.currentTimeMillis()-startTime); delList.clear(); } } }); thread.setName("job-batchUpdate-deamon"); // 設置啓動的線程爲守護線程 直到jvm停了才中止 thread.setDaemon(true); thread.start(); }
控制頁面模擬刪除操做 UserController
代碼以下:
@ApiOperation(value="刪除用戶信息", notes="刪除用戶信息") @ApiImplicitParams({ @ApiImplicitParam(paramType = "query", name = "id", dataType = "long", required = true, value = "用戶id")}) @PostMapping("/delUser") public boolean delUser(Long id) { try { User user=new User(); user.setId(id); iUserService.delJob(user); }catch (Exception e){ return false; } return true; }
咱們在swagger參數中連續輸入6,7,8並快速執行刪除接口
能夠看到控制檯輸出結果
覈對數據庫表中記錄是否已經刪除,發現確實已經刪除
至此便用 BlockingQueue
模擬了在併發狀況下對數據庫的批量刪除操做。