BlockingQueue

引言

BlockingQueue 的功能以及常見使用場景是很是普遍的,讀者能夠自行百度去了解 BlockingQueue的核心方法以及BlockingQueue家庭大體有哪些成員,這裏就再也不班門弄斧。推薦資料:不怕難之BlockingQueue及其實現html

案例一:實現web聊天功能

這裏引用一個案例基於BlockingQueue實現Web中的長鏈接聊天功能前端

BlockingQueueTestjava

package com.baba.bracelet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * @Author wulongbo
 * @Date 2020/12/1 9:16
 * @Version 1.0
 */
 public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        // 聲明一個容量爲10的緩存隊列
 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 //new了三個生產者和一個消費者
 Producer producer1 = new Producer(queue);
 Producer producer2 = new Producer(queue);
 Producer producer3 = new Producer(queue);
 Consumer consumer = new Consumer(queue);
 // 藉助Executors
 ExecutorService service = Executors.newCachedThreadPool();
 // 啓動線程
 service.execute(producer1);
 service.execute(producer2);
 service.execute(producer3);
 service.execute(consumer);
 // 執行10s
 Thread.sleep(10 * 1000);
 producer1.stop();
 producer2.stop();
 producer3.stop();
 Thread.sleep(2000);
 // 退出Executor
 service.shutdown();
 }
}

Producer生產者mysql

package com.spring.security.demo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @Author wulongbo
 * @Date 2020/12/1 9:17
 * @Version 1.0
 */
 public class Producer implements Runnable {
    private volatile boolean isRunning = true;//是否在運行標誌
 private BlockingQueue queue;//阻塞隊列
 private static AtomicInteger count = new AtomicInteger();//自動更新的值
 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
 //構造函數
 public Producer(BlockingQueue queue) {
        this.queue = queue;
 }
    public void run() {
        String data = null;
 Random r = new Random();
 System.out.println("啓動生產者線程!");
 try {
            while (isRunning) {
                System.out.println("正在生產數據...");
 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一個隨機數
 data = "data:" + count.incrementAndGet();//以原子方式將count當前值加1
 System.out.println("將數據:" + data + "放入隊列...");
 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//設定的等待時間爲2s,若是超過2s還沒加進去返回true
 System.out.println("放入數據失敗:" + data);
 }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {
            System.out.println("退出生產者線程!");
 }
    }
    public void stop() {
        isRunning = false;
 }
}

Consumer消費者git

package com.spring.security.demo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 * @Author wulongbo
 * @Date 2020/12/1 9:21
 * @Version 1.0
 */
 public class Consumer implements Runnable {
    private BlockingQueue<String> queue;
 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
 //構造函數
 public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
 }
    public void run() {
        System.out.println("啓動消費者線程!");
 Random r = new Random();
 boolean isRunning = true;
 try {
            while (isRunning) {
                System.out.println("正從隊列獲取數據...");
 String data = queue.poll(2, TimeUnit.SECONDS);//有數據時直接從隊列的隊首取走,無數據時阻塞,在2s內有數據,取走,超過2s還沒數據,返回失敗
 if (null != data) {
                    System.out.println("拿到數據:" + data);
 System.out.println("正在消費數據:" + data);
 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
 } else {
                    // 超過2s還沒數據,認爲全部生產線程都已經退出,自動退出消費線程。
 isRunning = false;
 }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
 Thread.currentThread().interrupt();
 } finally {
            System.out.println("退出消費者線程!");
 }
    }
}

以上的簡單代碼演示瞭如何使用 BlockingQueue以及它的部分核心方法的使用,讀者可以很好觸類旁通。github

案例二:批量插入mysql

該案例基於springboot集成Mybatis-Plus的項目中來講明:讀者能夠參考搭建Springboot項目並集成Mybatis-Plus javaspringboot
如今來作前期的環境準備。
這裏爲了方便測試咱們加入swagger依賴:web

<!-- 引入swagger2依賴-->
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
</dependency>
<dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
</dependency>

config目錄中添加swagger配置類
SwaggerConfigspring

package com.mybatis.plus.config;
import freemarker.ext.util.ModelCache;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.ParameterBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.schema.ModelRef;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author wulongbo
 * @Date 2020/11/21 11:50
 * @Version 1.0
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {
    /**
 * 經過 createRestApi函數來構建一個DocketBean
 * 函數名,能夠隨意命名,喜歡什麼命名就什麼命名
 */
 @Bean
 public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())//調用apiInfo方法,建立一個ApiInfo實例,裏面是展現在文檔頁面信息內容
 .select()
                //控制暴露出去的路徑下的實例
 //若是某個接口不想暴露,可使用如下註解
 //@ApiIgnore 這樣,該接口就不會暴露在 swagger2 的頁面下
 .apis(RequestHandlerSelectors.basePackage("com.mybatis.plus"))
                .paths(PathSelectors.any())
                .build();
 }
    //構建 api文檔的詳細信息函數
 private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                //頁面標題
 .title("Spring Boot Swagger2 構建把把智能")
                //條款地址
 .termsOfServiceUrl("http://despairyoke.github.io/")
                .contact("zwd")
                .version("1.0")
                //描述
 .description("API 描述")
                .build();
 }
}

注意:swagger的掃包路徑,才能正確訪問swagger
目錄結構以下:
目錄結構sql

IUserService中添加兩個接口數據庫

void recordJob(User job);
@PostConstruct
void init();

實現類 UserServiceImpl中的代碼以下:

package com.mybatis.plus.service.impl;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.mybatis.plus.entity.User;
import com.mybatis.plus.mapper.UserMapper;
import com.mybatis.plus.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
 * <p>
 * 服務實現類
 * </p>
 *
 * @author wulongbo
 * @since 2020-11-09
 */
@Service("iUserService")
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
    private static Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
 //定義一個容量爲10000的阻塞隊列,BlockingQueue線程安全能夠多個生產者同時put
 private BlockingQueue<User> dataQueue = new LinkedBlockingQueue<User>(10000);
 private List<User> list = new ArrayList<User>();
 @Override
 public void recordJob(User job) {
        try {
            //put任務的方法,供生產者調用
 dataQueue.put(job);
 } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
 }
    }
    @PostConstruct
 @Override public void init() {
        Thread thread = new Thread(() -> {
            logger.info("啓動批量守護線程,啓動時間{}", new Date(System.currentTimeMillis()));
 while (Boolean.TRUE) {
                User poll = null;
 boolean pollTimeOut = false;
 long startTime;
 long endTime;
 try {
                    // poll時設置超時時間爲2秒
 poll = dataQueue.poll(2, TimeUnit.SECONDS);
 } catch (InterruptedException e) {
                    logger.info("批量更新Job異常");
 Thread.currentThread().interrupt();
 }
                if (null != poll) {
                    // poll到任務添加到List中
 list.add(poll);
 } else {
                    // poll超時,設置超時標誌位
 pollTimeOut = true;
 }
                // 若是任務List等於300或poll超時且List中還有任務就批量更新
 if (list.size() == 300||
                        (pollTimeOut && !CollectionUtils.isEmpty(list))){
                    startTime = System.currentTimeMillis();
 saveOrUpdateBatch(list);
 logger.info("Job任務批量更新{}條任務,耗時{}毫秒", list.size(),
 System.currentTimeMillis()-startTime);
 list.clear();
 }
            }
        });
 thread.setName("job-batchUpdate-deamon");
 // 設置啓動的線程爲守護線程 直到jvm停了才中止
 thread.setDaemon(true);
 thread.start();
 }
}

控制頁面模擬一下插入操做 UserController代碼以下:

package com.mybatis.plus.controller;
import com.mybatis.plus.entity.User;
import com.mybatis.plus.service.IUserService;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * <p>
 * 前端控制器
 * </p>
 *
 * @author wulongbo
 * @since 2020-11-09
 */
@RestController
@RequestMapping("/user")
@CrossOrigin
public class UserController {
    @Autowired
 private IUserService iUserService;
 @ApiOperation(value="添加用戶信息", notes="添加用戶信息")
    @ApiImplicitParams({
            @ApiImplicitParam(paramType = "query", name = "name", dataType = "String", required = true, value = "姓名"),
 @ApiImplicitParam(paramType = "query", name = "age", dataType = "int", required = true, value = "年齡"),
 @ApiImplicitParam(paramType = "query", name = "email", dataType = "String", required = true, value = "郵箱") })
    @PostMapping("/addUser")
    public boolean addUser(String name,Integer age,String email) {
        try {
            User user=new User();
 user.setName(name);
 user.setAge(age);
 user.setEmail(email);
 iUserService.recordJob(user);
 }catch (Exception e){
            return false;
 }
        return true;
 }
}

啓動項目並訪問http://localhost:8080/swagger-ui.html#/
訪問swagger

注意檢查一下:

  1. User實體類中的主鍵id是否設置成自動遞增
/**
 * 主鍵ID
 */@TableId(value = "id", type = IdType.AUTO)
private Long id;

2.數據庫User表主鍵是否設置主鍵自動遞增

填寫用戶信息,並調用添加用戶信息接口
調用swagger
連按多下(做者按了3次),測試 BlockingQueue是否把User對象都緩存到了阻塞隊列中,再一次性消費掉。
能夠看到控制檯打印日誌:
image.png
覈對數據庫表,查看是否成功插入數據
image.png
OK,證實咱們的隊列生效!這裏便演示了使用 BlockingQueue 在併發狀況下對mysql作的批量操做。

案例三:mysql之批量刪除

這裏讀者已經能夠體會 BlockingQueue 給咱們帶來的便利了,批量刪除差別點就是在 對阻塞隊列的定義上面 咱們使用

//定義一個容量爲10000的阻塞隊列,BlockingQueue線程安全能夠多個生產者同時put
private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);

來給生產者調用,【BlockingQueue<Long>,或者BlockingQueue<Integer>依據實際的主鍵類型來建立就好】。

同理咱們在 IUserService 中添加刪除的接口

void delJob(User job);
@PostConstruct
void delInit();

再其實現類 UserServiceImpl 中添加實現方法

//定義一個容量爲10000的阻塞隊列,BlockingQueue線程安全能夠多個生產者同時put
private BlockingQueue<Long> delDataQueue = new LinkedBlockingQueue<Long>(10000);
private List<Long> delList = new ArrayList<Long>();

@Override
public void delJob(User job) {
    try {
        //put任務的方法,供生產者調用
 delDataQueue.put(job.getId());
 } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
 }
}
@PostConstruct
@Override
public void delInit() {
    Thread thread = new Thread(() -> {
        logger.info("啓動批量刪除守護線程,啓動時間{}", new Date(System.currentTimeMillis()));
 while (Boolean.TRUE) {
            Long poll = null;
 boolean pollTimeOut = false;
 long startTime;
 long endTime;
 try {
                // poll時設置超時時間爲2秒
 poll = delDataQueue.poll(2, TimeUnit.SECONDS);
 } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
 }
            if (null != poll) {
                // poll到任務添加到List中
 delList.add(poll);
 } else {
                // poll超時,設置超時標誌位
 pollTimeOut = true;
 }
            // 若是任務List等於5000或poll超時且List中還有任務就批量更新
 if (delList.size() == 300||
                    (pollTimeOut && !CollectionUtils.isEmpty(delList))){
                startTime = System.currentTimeMillis();
 removeByIds(delList);
 logger.info("Job任務批量刪除{}條任務,耗時{}毫秒", delList.size(),
 System.currentTimeMillis()-startTime);
 delList.clear();
 }
        }
    });
 thread.setName("job-batchUpdate-deamon");
 // 設置啓動的線程爲守護線程 直到jvm停了才中止
 thread.setDaemon(true);
 thread.start();
}

控制頁面模擬刪除操做 UserController代碼以下:

@ApiOperation(value="刪除用戶信息", notes="刪除用戶信息")
@ApiImplicitParams({
        @ApiImplicitParam(paramType = "query", name = "id", dataType = "long", required = true, value = "用戶id")})
@PostMapping("/delUser")
public boolean delUser(Long id) {
    try {
        User user=new User();
 user.setId(id);
 iUserService.delJob(user);
 }catch (Exception e){
        return false;
 }
    return true;
}

咱們在swagger參數中連續輸入6,7,8並快速執行刪除接口
image.png

能夠看到控制檯輸出結果
image.png
覈對數據庫表中記錄是否已經刪除,發現確實已經刪除
image.png
至此便用 BlockingQueue 模擬了在併發狀況下對數據庫的批量刪除操做。

相關文章
相關標籤/搜索