大數據技術之_16_Scala學習_11_客戶信息管理系統+併發編程模型 Akka+Akka 網絡編程-小黃雞客服案例+Akka 網絡編程-Spark Master Worker 進程通信項目

第十五章 客戶信息管理系統15.1 項目的開發流程15.2 項目的需求分析15.3 項目的界面15.4 項目的設計-程序框架圖15.5 項目的功能實現15.5.1 完成 Customer 類15.5.2 完成顯示主菜單和退出軟件功能15.5.3 完成顯示客戶列表的功能15.5.4 完成添加客戶的功能15.5.5 完成刪除客戶的功能15.5.6 完善退出確認功能15.5.7 完善刪除確認功能15.5.8 完成修改客戶的功能第十六章 併發編程模型 Akka16.1 Akka 的介紹16.2 Actor 模型用於解決什麼問題16.3 Akka 中 Actor 模型詳解16.4 Actor 模型工做機制說明16.5 Actor 模型應用實例16.5.1 Actor 自我通信16.5.2 Actor 之間通信16.7 Akka 網絡編程16.7.1 Akka 網絡編程基本介紹16.7.2 協議(tcp/ip)16.7.3 OSI 與 Tcp/ip 參考模型16.7.4 ip 地址16.7.5 端口(port)16.8 Akka 網絡編程-小黃雞客服案例16.8.1 需求分析 + 界面設計16.8.2 程序框架圖16.8.3 功能實現16.9 Akka 網絡編程-Spark Master Worker 進程通信項目16.9.1 項目意義16.9.2 項目需求分析16.9.3 項目界面設計16.9.4 實現功能 1-Worker 完成註冊16.9.5 實現功能 2-Worker 定時發送心跳16.9.6 實現功能 3-Master 啓動定時任務,定時檢測註冊的 Worker16.9.7 實現功能 4-Master,Worker 的啓動參數運行時指定16.9.8 Master Worker 進行分佈式部署css


第十五章 客戶信息管理系統

15.1 項目的開發流程

15.2 項目的需求分析

  模擬實現基於文本界面的《客戶信息管理軟件》。
  該軟件 scala 可以實現對客戶對象的插入、修改、刪除、顯示、查詢(用 ArrayBuffer 或者 ListBuffer 實現),並可以打印客戶明細表。前端

15.3 項目的界面

主界面java


添加客戶

修改客戶

刪除客戶

客戶列表

15.4 項目的設計-程序框架圖

程序框架圖:設計系統有多少個文件,以及文件之間的調用關係,能夠幫助程序員實現模塊的設計(清晰),便於程序員之間對項目交流分析。【業務優化,設計方案】程序員

15.5 項目的功能實現

15.5.1 完成 Customer 類

根據需求文檔或者頁面,寫出 Customer 類
Customer.scalaapache

package com.atguigu.chapter15.customercrm.bean

class Customer {
  // 屬性
  var id: Int = _
  var name: String = _
  var gender: Char = _
  var age: Short = _
  var tel: String = _
  var email: String = _

  // 輔助構造器
  def this(id: Int, name: String, gender: Char, age: Short, tel: String, email: String) {
    this
    this.id = id
    this.name = name
    this.gender = gender
    this.age = age
    this.tel = tel
    this.email = email
  }
}

15.5.2 完成顯示主菜單和退出軟件功能

CustomerView.scala 功能分析:
  1. 將主菜單的顯示放入到 while
  2. 用戶能夠根據輸入,選擇本身的操做
  3. 若是輸入5退出
CustomerView.scala編程

package com.atguigu.chapter15.customercrm.view

import scala.io.StdIn

class CustomerView {
  // 定義一個循環變量,控制是否退出
  var loop = true
  // 定義一個 key 用於接收用戶輸入的選項
  var key = ' '

  def mainMenu(): Unit = {
    do {
      println("-----------------客戶信息管理軟件-----------------")
      println("                  1 添 加 客 戶                  ")
      println("                  2 修 改 客 戶                  ")
      println("                  3 刪 除 客 戶                  ")
      println("                  4 客 戶 列 表                  ")
      println("                  5 退       出                  ")
      println("                  請選擇(1-5):                   ")
      key = StdIn.readChar()
      key match {
        case '1' => println("添 加 客 戶")
        case '2' => println("修 改 客 戶")
        case '3' => println("刪 除 客 戶")
        case '4' => println("客 戶 列 表")
        case '5' => this.loop = false
      }
    } while (loop)
    println("你退出了系統...")
  }
}

示例代碼以下:服務器

package com.atguigu.chapter15.customercrm.app

import com.atguigu.chapter15.customercrm.view.CustomerView

object CustomerCrm {
  def main(args: Array[String]): Unit = {
    new CustomerView().mainMenu()
  }
}

15.5.3 完成顯示客戶列表的功能

CustomerView.scala 功能分析:
  1. 接收4,顯示客戶列表
  2. 調用 CustomerService 的方法 list
  3. 須要一個 CustomerService 對象(屬性)
CustomerService.sacla 功能分析:
  1. 編寫一個方法 list,返回當前系統有哪些客戶
  2. 客戶放在哪?--> 內存 --> 可變集合 --> ArrayBuffer
一、在 Customer.sacla 中重寫 toString 方法網絡

  override def toString: String = {
    this.id + "\t\t" + this.name + "\t\t" + this.gender + "\t\t" + this.age + "\t\t" + this.tel + "\t\t" + this.email
  }

二、在 CustomerService.scala 中編寫一個方法 list,返回當前系統有哪些客戶併發

class CustomerService {
  // customers 是存放客戶用的,爲了方便測試,咱們先進行初始化
  val customers = ArrayBuffer(new Customer(1"tom"'男'20"110""tom@sohu.com"))

  // 查詢客戶列表的方法
  def list(): ArrayBuffer[Customer] = {
    this.customers
  }
}

三、在 CustomerView.scala 中 調用 CustomerService 的方法 listapp

  val customerService = new CustomerService()

  /*
---------------------------客戶列表---------------------------
編號  姓名       性別    年齡   電話            郵箱
 1    張三       男      30     010-56253825   abc@email.com
 2    李四       女      23     010-56253825    lisi@ibm.com
 3    王芳       女      26     010-56253825   wang@163.com
-------------------------客戶列表完成-------------------------
   */

  def list(): Unit = {
    println()
    println("---------------------------客戶列表---------------------------")
    println("編號\t\t姓名\t\t性別\t\t年齡\t\t電話\t\t郵箱")
    // 遍歷
    // 調用 CustomerService 的方法 list
    val customers = customerService.list()
    for (customer <- customers) {
      // 方式一:輸出
      // println(customer.id + "\t\t" + ...)
      // 方式二:重寫 Customer 的 toString 方法,返回信息,而且格式化
      println(customer)
    }
    println("-------------------------客戶列表完成-------------------------")
  }

15.5.4 完成添加客戶的功能

CustomerView.scala 功能分析:
  1. 接收客戶的信息,並封裝成對應的 Customer 對象
  2. 調用 CustomerService 的方法 add
CustomerService.sacla 功能分析:
  1. 編寫一個方法 add,接收一個 Customer 對象
  2. 加入到 ArrayBuffer 中
  3. 規定:以添加客戶是第幾個做爲它的 id
一、在 Customer.sacla 中添加一個新的 輔助構造器(沒有id屬性)

  // 輔助構造器(沒有id屬性)
  def this(name: String, gender: Char, age: Short, tel: String, email: String) {
    this
    this.name = name
    this.gender = gender
    this.age = age
    this.tel = tel
    this.email = email
  }

二、在 CustomerService.scala 中編寫一個方法 add,接收一個 Customer 對象,並設置 id 後再加入到 ArrayBuffer 中

  // 用於設置用戶 id
  var customerNum = 1

  // 添加客戶的方法
  def add(customer: Customer): Boolean = {
    // 設置 id
    customerNum += 1
    customer.id = customerNum
    // 加入到 ArrayBuffer 中
    customers.append(customer)
    true
  }

三、在 CustomerView.scala 中 調用 CustomerService 的方法 add

  /*
---------------------添加客戶---------------------
姓名:張三
性別:男
年齡:30
電話:010-56253825
郵箱:zhang@abc.com
---------------------添加完成---------------------
   */

  def add(): Unit = {
    println()
    println("---------------------添加客戶---------------------")
    println("姓名:")
    val name = StdIn.readLine()
    println("性別:")
    val gender = StdIn.readChar()
    println("年齡:")
    val age = StdIn.readShort()
    println("電話:")
    val tel = StdIn.readLine()
    println("郵箱:")
    val email = StdIn.readLine()
    // 封裝對象
    val customer = new Customer(name, gender, age, tel, email)
    // 調用 CustomerService 的方法 add
    customerService.add(customer)
    println("---------------------添加完成---------------------")
  }

15.5.5 完成刪除客戶的功能

CustomerView.scala 功能分析:
  1. 接收客戶 id,準備刪除
  2. 調用 CustomerService 的 del(id)
CustomerService.sacla 功能分析:
  1. 編寫一個方法 del,接收一個 id,先去調用另外一個方法 findIndexById,判斷
  2. 編寫一個方法 findIndexById(由於咱們的 ArrayBuffer 索引和 id 並非對應的)
  3. 若是發現有,則刪除,若是沒有就返回 false
一、在 CustomerService.scala 中編寫一個方法 del,接收一個 id,先去調用另外一個方法 findIndexById,判斷

  // 先根據 id 查找 用戶的 index
  def findIndexById(id: Int): Int = {
    // 先假定一個索引,默認 -1,若是找到就改爲對應的,若是沒有找到就返回 -1
    var index = -1
    // 遍歷 ArrayBuffer
    breakable {
      for (i <- 0 until customers.length) {
        if (customers(i).id == id) {
          index = i
          break()
        }
      }
    }
    index
  }

  // 再根據 id 刪除用戶
  def del(id: Int): Boolean 
= {
    val index = findIndexById(id)
    if (index != -1) {
      customers.remove(index)
      true
    } else {
      false
    }
  }

二、在 CustomerView.scala 中接收客戶 id,調用 CustomerService 的 del(id)

  /*
---------------------刪除客戶---------------------
請選擇待刪除客戶編號(-1退出):1
確認是否刪除(Y/N):y
---------------------刪除完成---------------------
   */

  def del(): Unit = {
    println()
    println("---------------------刪除客戶---------------------")
    println("請選擇待刪除客戶編號(-1退出):")
    val id = StdIn.readInt()
    if (id == -1) {
      println("---------------------刪除沒有完成---------------------")
      return
    }
    println("確認是否刪除(Y/N):")
    val choice = StdIn.readChar().toLower
    if (choice == 'y') 
{
      if (customerService.del(id)) {
        println("---------------------刪除完成---------------------")
        return
      }
    }
    println("---------------------刪除沒有完成---------------------")
  }

15.5.6 完善退出確認功能

功能說明:
  要求用戶在退出時提示 "確認是否退出(Y/N):",用戶必須輸入y/n,不然循環提示。且輸入爲y時,退出系統;輸入爲n時,不退出系統。
一、在 CustomerView.scala 中定義一個方法 isOut,並修改 key 所對應的函數。

  // 要求用戶在退出時提示"確認是否退出(Y/N):",用戶必須輸入y/n,不然循環提示。且輸入爲y時,退出系統;輸入爲n時,不退出系統。
  def isOut(): Unit = {
    println()
    println("確認是否退出(Y/N):")
    key = StdIn.readChar().toLower
    key match {
      case 'y' => this.loop = false
      case 'n' => this.loop = true
      case _ => isOut()
    }
  }

15.5.7 完善刪除確認功能

功能說明:
  要求用戶在刪除確認時提示 "確認是否刪除(Y/N):",用戶必須輸入y/n,不然循環提示。
一、在 CustomerView.scala 中,修改 del() 方法便可

  /*
---------------------刪除客戶---------------------
請選擇待刪除客戶編號(-1退出):1
確認是否刪除(Y/N):y
---------------------刪除完成---------------------
   */

  def del(): Unit = {
    println()
    println("---------------------刪除客戶---------------------")
    println("請選擇待刪除客戶編號(-1退出):")
    val id = StdIn.readInt()
    if (id == -1) {
      println("---------------------刪除沒有完成---------------------")
      return
    }
    println("確認是否刪除(Y/N):")
    var choice = ' '

    // 要求用戶在刪除確認時提示 "確認是否刪除(Y/N):",用戶必須輸入y/n,不然循環提示。
    breakable {
      do {
        choice = StdIn.readChar().toLower
        if (choice == 'y' || choice == 'n') 
{
          break()
        }
        println("確認是否刪除(Y/N):")
      } while (true)
    }

    if (choice == 'y') {
      if (customerService.del(id)) {
        println("---------------------刪除完成---------------------")
        return
      }
    }
    println("---------------------刪除沒有完成---------------------")
  }

15.5.8 完成修改客戶的功能

一、在 CustomerService.scala 中定義一個方法根據 id 修改用戶(更新用戶)的方法 和 // 根據 id 查找用戶信息 的方法

  // 根據 id 查找用戶信息
  def findCustomerById(id: Int): Customer = {
    val index = findIndexById(id)
    if (index != -1) {
      customers(index)
    } else {
      null
    }
  }

  // 根據 id 修改用戶(更新用戶)
  def update(id: Int, customer: Customer): Boolean = {
    val index = findIndexById(id)
    customers.update(index, customer)
    true
  }

二、在 CustomerView.scala 中定義一個方法 update

  /*
---------------------修改客戶---------------------
請選擇待修改客戶編號(-1退出):1
姓名(張三):<直接回車表示不修改>
性別(男):
年齡(30):
電話(010-56253825):
郵箱(zhang@abc.com):zsan@abc.com
---------------------修改完成---------------------
   */

  def update(): Unit = {
    println()
    println("---------------------修改客戶---------------------")
    println("請選擇待修改客戶編號(-1退出):")
    var id = StdIn.readInt()
    if (id == -1) {
      println("---------------------修改沒有完成---------------------")
      return
    }
    val customer = customerService.findCustomerById(id)
    if (customer == null) {
      println("---------------------修改沒有完成---------------------")
      return
    }

    var name = customer.name
    print(s"姓名(${name}):")
    name 
= StdIn.readLine()
    if (name.length == 0) name = customer.name

    var gender = customer.gender
    print(s"性別(${gender}):")
    gender 
= StdIn.readChar()

    var age = customer.age
    print(s"年齡(${age}):")
    age 
= StdIn.readShort()

    var tel = customer.tel
    print(s"電話(${tel}):")
    tel 
= StdIn.readLine()
    if (tel.length == 0) tel = customer.tel

    var email = customer.email
    print(s"郵箱(${email}):")
    email 
= StdIn.readLine()
    if (email.length == 0) email = customer.email

    // 封裝對象
    val newCustomer = new Customer(id, name, gender, age, tel, email)
    // 調用 CustomerService 的方法 update
    customerService.update(id, newCustomer)
    println("---------------------修改完成---------------------")
  }

第十六章 併發編程模型 Akka

16.1 Akka 的介紹

16.2 Actor 模型用於解決什麼問題

16.3 Akka 中 Actor 模型詳解

Actor 模型及其說明


對上圖的詳解以下:

16.4 Actor 模型工做機制說明


Actor模型工做機制說明(對照工做機制示意圖理解):

Actor 間傳遞消息機制(對照工做機制示意圖理解)

16.5 Actor 模型應用實例

16.5.1 Actor 自我通信

應用實例需求

代碼實現
SayHelloActor 項目步驟:
1) 建立項目 Mew -> New Project -> 選擇 Maven
2) 給項目命名


3) 下一步 -> Finish
4) 會生成 pom.xml 文件(maven 文件, 項目包的依賴)

5) 將下面的 maven 配置模板拷貝到 pom.xml 文件中,新的 pom.xml 文件文件內容以下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.akka</groupId>
    <artifactId>SayHelloActor</artifactId>
    <version>1.0-SNAPSHOT</version>
    <!-- 定義一下常量 -->
    <properties>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <akka.version>2.4.17</akka.version>
    </properties>

    <dependencies>
        <!-- 添加scala的依賴 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- 添加akka的actor依賴 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>

        <!-- 多進程之間的Actor通訊 -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_${scala.compat.version}</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>

    <!-- 指定插件-->
    <build>
        <!-- 指定源碼包和測試包的位置 -->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <!-- 指定編譯scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!-- maven打包的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

                                    <resource>reference.conf</resource>
                                </transformer>
                                <!-- 指定main方法 -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

                                    <mainClass>xxx</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

6) 由於按照配置模板的內容 "指定源碼包和測試包的位置" 的部分:

    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>

咱們須要建立對應的 scala 目錄,並 mark 爲 Sources Root
7) 當修改後,第一次速度比較慢,由於 maven 須要 resolve 包的依賴,要下載相關的包。
注意:須要如圖勾選,update snapshots,並且不須要聯網,若是使用 maven 解決依賴後,仍然 pom.xml 有誤,則只須要重啓下 idea, 或者動一下 pom.xml 文件(不用改),從新保存便可。


8) 代碼實現:
package com.atguigu.akka.actor

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

// 1. 當咱們繼承 Actor 後,就是一個 Actor,須要重寫該 Actor 的核心方法 receive
class SayHelloActor extends Actor {
  // 循環的接收消息
  // 1. receive方法,會被該 Actor 的 MailBox(實現了 Runnable 接口)調用
  // 2. 當該 Actor 的 MailBox 接收到消息,就會調用 receive 方法
  // 3. Receive 的底層:type Receive = PartialFunction[Any, Unit]
  override def receive: Receive = {
    // 接受消息並處理,若是接收到 exit,就退出
    case "hello" => println("發送:hello\t\t迴應:hello too:)")
    case "ok" => println("發送:ok\t\t\t迴應:ok too:)")
    case "exit" => {
      println("接收到exit~指令,退出系統...")
      context.stop(self) // 中止本身的 ActorRef
      context.system.terminate() // 關閉 ActorSystem
    }
  }
}

object SayHelloActor {
  // 1. 先建立一個 ActorSystem,專門用於建立 Actor
  private val actoryFactory = ActorSystem("actoryFactory")

  // 2. 建立一個 Actor 的同時,返回 Actor 的 ActorRef
  private val sayHelloActorRef: ActorRef = actoryFactory.actorOf(Props[SayHelloActor], "sayHelloActor")
  // (1) Props[SayHelloActor] 建立了一個 SayHelloActor 實例,這裏使用到了反射
  // (2) "sayHelloActor" 是 Actor 的名字
  // (3) sayHelloActorRef: ActorRef  =>是 Props[SayHelloActor] 的引用
  // (4) 建立的 SayHelloActor 實例被 ActorSystme 接管

  def main(args: Array[String]): Unit = {
    // 給 SayHelloActor 發消息(郵箱)
    sayHelloActorRef ! "hello"
    sayHelloActorRef ! "ok"
    sayHelloActorRef ! "ok~"
    // 研究異步如何退出 ActorSystem
    sayHelloActorRef ! "exit"
  }

}

輸出結果以下:

發送:hello        迴應:hello too:)
發送:ok            迴應:ok too:)
接收到exit~指令,退出系統...

9) 運行的效果

代碼的示意圖和小結


小結:
當程序執行 private val sayHelloActorRef: ActorRef = actoryFactory.actorOf(Props[SayHelloActor], "sayHelloActor") 會完成以下任務: [這是很是重要的方法]
  • 一、actorFactory 是 ActorSystem("actorFactory") 建立的。
  • 二、這裏的 Props[SayHelloActor] 會使用反射機制,建立一個 SayHelloActor 對象,若是是 actorFactory.actorOf(Props(new SayHelloActor(其餘代理對象的引用)), "sayHelloActor") 形式,就是使用 new 的方式建立一個 SayHelloActor 對象。注意:Props() 是小括號。
  • 三、會建立一個 SayHelloActor 對象的代理對象 sayHelloActorRef,使用 sayHelloActorRef 才能發送消息。
  • 四、會在底層建立 Dispather Message,是一個線程池,用於分發消息,消息是發送到對應的 Actor 的 MailBox。
  • 五、會在底層建立 SayHelloActor 的 MailBox 對象,該對象是一個隊列,可接收 Dispatcher Message 發送的消息。
  • 六、MailBox 實現了 Runnable 接口,是一個線程,一直運行並調用 Actor 的 receive 方法,所以當Dispather 發送消息到 MailBox 時,Actor 在r eceive 方法就能夠獲得信息。
  • 七、SayHelloActorRef ! "hello" ,表示把 hello 消息發送到 SayHello Actor 的 Mailbox (經過Dispatcher Message 轉發)。

16.5.2 Actor 之間通信

應用實例需求

兩個 Actor 的通信機制原理圖

代碼實現
AActor.scala

package com.atguigu.akka.actors

import akka.actor.{Actor, ActorRef}

class AActor(bActorRefActorRefextends Actor {
  var count = 0
  override def receive: Receive = {
    case "start" => {
      println("AActor 出招了,start ok")
      bActorRef ! "我打"
    }
    case "我打" => {
      count += 1
      // 給 BActor 發出消息
      // 這裏須要持有 BActor 的引用(BActorRef)才能夠
      println(s"AActor(黃飛鴻) 厲害!看我佛山無影腳 第${count}腳")
      Thread.sleep(1000)
      bActorRef ! "我打" // 給 BActor 發出消息
    }
  }
}

BActor.scala

package com.atguigu.akka.actors

import akka.actor.Actor

class BActor extends Actor {
  var count = 0
  override def receive: Receive = {
    case "我打" => {
      count += 1
      println(s"BActor(喬峯) 挺猛 看我降龍十八掌 第${count}掌")
      Thread.sleep(1000)
      // 經過 sender() 方法,能夠獲取到發送消息的 Actor 的 ActorRef
      sender() ! "我打"
    }
  }
}

ActorApp.scala

package com.atguigu.akka.actors

import akka.actor.{ActorRef, ActorSystem, Props}

// 100招後,就退出
object ActorApp extends App {
  // 建立 ActorSystem
  val actorfactory = ActorSystem("actorfactory")
  // 先建立 BActor 的引用/代理
  val bActorRef: ActorRef = actorfactory.actorOf(Props[BActor], "bActor")
  // 建立 AActor 的引用時須要持有 BActor 的引用
  val aActorRef: ActorRef = actorfactory.actorOf(Props(new AActor(bActorRef)), "aActor")

  // aActor 先出招
  aActorRef ! "start"
}

輸出結果以下:

AActor 出招了,start ok
BActor(喬峯) 挺猛 看我降龍十八掌 第1掌
AActor(黃飛鴻) 厲害!看我佛山無影腳 第1腳
BActor(喬峯) 挺猛 看我降龍十八掌 第2掌
AActor(黃飛鴻) 厲害!看我佛山無影腳 第2腳
BActor(喬峯) 挺猛 看我降龍十八掌 第3掌
AActor(黃飛鴻) 厲害!看我佛山無影腳 第3腳
BActor(喬峯) 挺猛 看我降龍十八掌 第4掌
AActor(黃飛鴻) 厲害!看我佛山無影腳 第4腳
BActor(喬峯) 挺猛 看我降龍十八掌 第5掌
AActor(黃飛鴻) 厲害!看我佛山無影腳 第5腳
BActor(喬峯) 挺猛 看我降龍十八掌 第6掌
AActor(黃飛鴻) 厲害!看我佛山無影腳 第6腳
......

代碼的小結

  • 一、兩個 Actor 通信機制和 Actor 自身發消息機制基本同樣,只是要注意以下:
  • 二、若是 A Actor 在須要給 B Actor 發消息,則須要持有 B Actor 的 ActorRef,能夠經過建立 A Actor 時,傳入 B Actor 的代理對象(ActorRef)。
  • 三、當 B Actor 在 receive 方法中接收到消息,須要回覆時,能夠經過 sender() 獲取到發送 Actor 的代理對象。

如何理解 Actor 的 receive 方法被調用?

  • 一、每一個 Actor 對應 MailBox。
  • 二、MailBox 實現了 Runnable 接口,處於運行的狀態。
  • 三、當有消息到達 MailBox,就會去調用 Actor 的 receive 方法,即將消息推送給 receive 方法。

16.7 Akka 網絡編程

看兩個實際應用(socket/tcp/ip)
  QQ、迅雷、百度網盤客戶端、新浪網站、京東商城、淘寶

16.7.1 Akka 網絡編程基本介紹

16.7.2 協議(tcp/ip)

  TCP/IP(Transmission Control Protocol/Internet Protocol)的簡寫,中文譯名爲傳輸控制協議/因特網互聯協議,又叫網絡通信協議,這個協議是Internet 最基本的協議、是 Internet 國際互聯網絡的基礎,簡單地說,就是由網絡層的IP協議和傳輸層的TCP協議組成的。
  TCP/IP 3本聖經級別書籍:xxx
  

16.7.3 OSI 與 Tcp/ip 參考模型

16.7.4 ip 地址

  概述:每一個 internet 上的主機和路由器都有一個 ip 地址,它包括網絡號和主機號,ip 地址有 ipv4(32位) 或者 ipv6(128位),能夠經過 ipconfig(ifconfig) 來查看。
  一個小技巧:網絡不通時,如何肯定是哪個路由(ip地址)出現問題?答:使用 tracert 指令。演示以下:

16.7.5 端口(port)

  咱們這裏所指的端口不是指物理意義上的端口,而是特指TCP/IP協議中的端口,是邏輯意義上的端口。若是把 IP 地址比做一間房子,端口就是出入這間房子的門。真正的房子只有幾個門,可是一個 IP 地址的端口 能夠有65535(即:256×256-1)個之多!端口是經過端口號來標記的。
端口(port)-分類


端口(port)-使用注意

socket 編程中客戶端和服務器的網絡分佈

16.8 Akka 網絡編程-小黃雞客服案例

16.8.1 需求分析 + 界面設計

需求分析
  一、服務端進行監聽(9999)
  二、客戶端能夠經過鍵盤輸入,發送諮詢問題給小黃雞客服(服務端)
  三、小黃雞(服務端)回答客戶的問題

界面設計
服務端:


客戶端:

16.8.2 程序框架圖

16.8.3 功能實現

代碼結構:


示例代碼以下:
YellowChickenServer.scala
package com.atguigu.akka.yellowchicken.server

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage}
import com.typesafe.config.ConfigFactory

class YellowChickenServer extends Actor {
  override def receive: Receive = {
    case "start" => println("start 小黃雞客服開始工做了...")
    // 若是接收到了服務端的發來的消息,即 ClientMessage
    case ClientMessage(mes) => {
      println("客戶諮詢的問題是:" + mes)
      mes match {
        // 使用 match case 匹配(模糊匹配)
        case "大數據學費" => sender() ! ServerMessage("20000 RMB")
        case "學校地址" => sender() ! ServerMessage("北京市朝陽區青年路大悅城")
        case "學習什麼技術" => sender() ! ServerMessage("大數據 前端 Python")
        case _ => sender() ! ServerMessage("你說的啥子:)")
      }
    }
  }
}

// 主程序入口
object YellowChickenServerApp extends App {
  val host = "127.0.0.1" // 服務端ip地址
  val port = 9999 // 端口
  // 建立 config 對象,指定協議類型、監聽的ip和端口
  val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="
akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$host
       |akka.remote.netty.tcp.port=$port
        "
"".stripMargin)
  // 建立 ActorSystem
  val serverActorSystem = ActorSystem("Server", config)
  // 建立 YellowChickenServer 的 Actor 和 ActorRef
  val yellowChickenServerActorRef: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer], "YellowChickenServer-01")

  // 啓動服務端
  yellowChickenServerActorRef ! "start"
}

CustomerActor.scala

package com.atguigu.akka.yellowchicken.client

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage}
import com.typesafe.config.ConfigFactory

import scala.io.StdIn

class CustomerActor(serverHost: String, serverPort: Int) extends Actor 
{

  // 定義一個 YellowChickenServerRef
  var serverActorRef: ActorSelection = _

  // 在 Actor 中有一個方法 preStart 方法,它會在 Actor 運行前執行
  // 在 Akka 開發中,一般將初始化的工做,放在 preStart 方法中
  override def preStart(): Unit = {
    this.serverActorRef = context.actorSelection(s"akka.tcp://Server@${serverHost}:${serverPort}/user/YellowChickenServer-01")
    println("this.serverActorRefer=" + this.serverActorRef)
  }

  override def receive: Receive = {
    case "start" => println("start 客戶端運行,能夠諮詢問題")
    case mes: String => {
      // 發給服務端
      // serverActorRef ! mes // 不該該發送字符串,應該包裝一把,應該發送一個(樣例)對象(即協議)
      serverActorRef ! ClientMessage(mes) // 此時發送的是一個對象,該樣例類默認實現了序列化 和 apply 方法
    }
    // 若是接受到了服務器端的消息
    case ServerMessage(mes) => {
      println(s"收到小黃雞客服(Server)消息:$mes")
    }
  }
}

// 主程序入口
object CustomerActorApp extends App {
  val (host, port, serverHost, serverPort) = ("127.0.0.1"9990"127.0.0.1"9999)
  val config = ConfigFactory.parseString(
    s"""
       |akka.actor.provider="
akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname=$host
       |akka.remote.netty.tcp.port=$port
        "
"".stripMargin)

  // 建立 ActorSystem
  val clientActorSystem = ActorSystem("Client", config)
  // 建立 CustomerActor 的 Actor 和 ActorRef
  val clientActorRef: ActorRef = clientActorSystem.actorOf(Props(new CustomerActor(serverHost, serverPort)), "CustomerActor-01")

  // 啓動客戶端
  clientActorRef ! "start"

  // 客戶端發送消息
  while (true) {
    val mes = StdIn.readLine()
    clientActorRef ! mes
  }
}

MessageProtocol.scala

package com.atguigu.akka.yellowchicken.common

// 使用樣例類來構建協議

// 一、客戶端發送服務端的協議(序列化對象)
case class ClientMessage(mes: String)  // 回顧:樣例類的構造器中的每個參數都默認爲 val ,即只可讀。

// 二、服務器端發送給客戶端的協議
case class ServerMessage(mes: String)

16.9 Akka 網絡編程-Spark Master Worker 進程通信項目

16.9.1 項目意義

  一、深刻理解 Spark 的 Master 和 Worker 的通信機制。
  二、爲了方便同窗們看 Spark 的底層源碼,命名的方式和源碼保持一致(如:通信消息類命名就是同樣的)。
  三、加深對主從服務心跳檢測機制(HeartBeat)的理解,方便之後 spark 源碼二次開發。

16.9.2 項目需求分析

16.9.3 項目界面設計

  咱們主要是經過應用實例,來剖析 Spark 的 Master 和 Worker 的通信機制,所以功能比較簡潔,設計的界面以下。看後面演示便可。

16.9.4 實現功能 1-Worker 完成註冊

功能要求: Worker 註冊到 Master,Master 完成註冊,並回復 Worker 註冊成功。


代碼結構:

示例代碼以下:
MasterActor.scala
package com.atguigu.akka.sparkmasterworker.master

import akka.actor.{Actor, ActorSystem, Props}
import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

class MasterActor extends Actor {
  // 定義一個 mutable.HashMap 屬性,用於管理 Worker
  val workers = mutable.HashMap[String, WorkerInfo]()

  override def receive: Receive = {
    case "start" => println("Master服務器啓動了...")
    // 接收到 Worker 客戶端註冊的信息,保存進 HashMap
    case RegisterWorkerInfo(id, cpu, ram) => {
      if (!workers.contains(id)) {
        // 建立 WorkerInfo
        val workerInfo = new WorkerInfo(id, cpu, ram)
        // 加入到 HashMap
        workers += (id -> workerInfo)
        println("服務器的Workers= " + workers)
        // 回覆客戶端註冊成功
        sender() ! RegisteredWorkerInfo
      }
    }
  }
}

object MasterActorApp {
  def main(args: Array[String]): Unit = {
    val host = "127.0.0.1" // 服務端ip地址
    val port = 10005 // 端口
    // 建立 config 對象,指定協議類型、監聽的ip和端口
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="
akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=$host
         |akka.remote.netty.tcp.port=$port
        "
"".stripMargin)
    // 先建立 ActorSystem
    val masterActorSystem = ActorSystem("Master", config)
    // 再建立 Master 的 Actor 和 ActorRef
    val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], "MasterActor-01")

    // 啓動 Master
    masterActorRef ! "start"
  }
}

WorkerActor.scala

package com.atguigu.akka.sparkmasterworker.worker

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo}
import com.typesafe.config.ConfigFactory

class WorkerActor(serverHost: String, serverPort: Int) extends Actor 
{
  // 定義一個 MasterActorRef
  var masterActorProxy: ActorSelection = _

  // 定義 Worker 的編號
  var id = java.util.UUID.randomUUID().toString

  // 在 Actor 中有一個方法 preStart 方法,它會在 Actor 運行前執行
  // 在 Akka 開發中,一般將初始化的工做,放在 preStart 方法中
  override def preStart(): Unit = {
    this.masterActorProxy = context.actorSelection(s"akka.tcp://Master@${serverHost}:${serverPort}/user/MasterActor-01")
    println("this.masterActorProxy=" + this.masterActorProxy)
  }

  override def receive = {
    case "start" => {
      println("Worker客戶端啓動運行")
      // 給服務器發送一個註冊信息
      masterActorProxy ! RegisterWorkerInfo(id, 1616 * 1024)
    }
    case RegisteredWorkerInfo => {
      println("WorkedId= " + id + " 註冊成功!")
    }
  }
}

object WorkerActorApp {
  def main(args: Array[String]): Unit = {
    val (host, port, serverHost, serverPort) = ("127.0.0.1"10001"127.0.0.1"10005)
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="
akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=$host
         |akka.remote.netty.tcp.port=$port
        "
"".stripMargin)

    // 建立 ActorSystem
    val workerActorSystem = ActorSystem("Worker", config)
    // 建立 WorkerActor 的 Actor 和 ActorRef
    val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort)), "WorkerActor-01")

    // 啓動客戶端
    workerActorRef ! "start"
  }
}

MessageProtocol.scala

package com.atguigu.akka.sparkmasterworker.common

// 使用樣例類來構建協議

// Worker 註冊信息
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)

// 這個是 WorkerInfo,是保存在 Master 的 HashMap 中的,該 HashMap 用於管理 Worker
// 未來這個 WorkerInfo 會擴展,好比 增長 Worker 上一次的心跳時間
class WorkerInfo(val id: String, val cpu: Int, val ram: Int)

// 當 Worker 註冊成功,服務器返回一個 RegisteredWorkerInfo 對象
case object RegisteredWorkerInfo

16.9.5 實現功能 2-Worker 定時發送心跳

功能要求:Worker 定時發送心跳給 Master,Master 可以接收到,並更新 Worker 上一次心跳時間。


示例代碼以下:
MessageProtocol.scala 中增長代碼
package com.atguigu.akka.sparkmasterworker.common

// 使用樣例類來構建協議

// Worker 註冊信息
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)

// 這個是 WorkerInfo,是保存在 Master 的 HashMap 中的,該 HashMap 用於管理 Worker
// 未來這個 WorkerInfo 會擴展,好比 增長 Worker 上一次的心跳時間
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) 
{
  // 新增屬性:心跳時間
  var lastHeartBeatTime: Long = _
}

// 當 Worker 註冊成功,服務器返回一個 RegisteredWorkerInfo 對象
case object RegisteredWorkerInfo

// 每隔必定時間定時器發送給 Master 一個心跳
case class HeartBeat(id: String)

// Worker 每隔必定時間定時器發送給 本身 一個消息
case object SendHeartBeat

MasterActor.scala 中增長代碼

    case HeartBeat(id) => {
      // 更新對應的 Worker 的心跳時間
      // 一、先從 Worker 中取出 WorkerInfo
      val workerInfo = workers(id)
      workerInfo.lastHeartBeatTime = System.currentTimeMillis()
      println("Master更新了 " + id + " 的心跳時間 ")
    }

WorkerActor.scala 中增長代碼

      // 當客戶端註冊成功後,就定義一個定時器,每隔必定時間,發送 SendHeartBeat 給本身
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
    case SendHeartBeat => {
      println("WorkedId= " + id + " 給Master發送心跳")
      masterActorProxy ! HeartBeat(id)
    }

16.9.6 實現功能 3-Master 啓動定時任務,定時檢測註冊的 Worker

功能要求:Master 啓動定時任務,定時檢測註冊的 Worker 有哪些沒有更新心跳,已經超時的 Worker,將其從 HashMap 中刪除掉。


示例代碼以下:
MessageProtocol.scala 中增長代碼
// Master 給本身發送一個觸發檢查超時 Worker 的信息
case object StartTimeOutWorker

// Master 給本身發消息,檢測 Worker,對於心跳超時的
case object RemoveTimeOutWorker

MasterActor.scala 中增長代碼

    case "start" => {
      println("Master服務器啓動了...")
      // Master 啓動定時任務,定時檢測註冊的 Worker 有哪些沒有更新心跳,已經超時的 Worker,將其從 HashMap 中刪除掉。
      self ! StartTimeOutWorker
    }

    // 開啓定時器,每隔必定時間檢測是否有 Worker 的心跳超時
    case StartTimeOutWorker => {
      println("開啓了定時檢測Worker心跳的任務")
      import context.dispatcher // 使用調度器時候必須導入dispatcher
      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
    }

    // 判斷哪些 Worker 心跳超時(nowTime - lastHeartBeatTime),對已經超時的 Worker,將其從 HashMap 中刪除掉。
    case RemoveTimeOutWorker => {
      // 首先獲取全部 Workers 的全部 WorkerInfo
      val workerInfos = workers.values
      val nowTime = System.currentTimeMillis()
      // 過濾出全部超時的 workerInfo 並刪除便可
      workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeatTime) > 6000)
        .foreach(workerInfo => workers.remove(workerInfo.id))
      println("當前有 " + workers.size + " 個Worker存活")
    }

16.9.7 實現功能 4-Master,Worker 的啓動參數運行時指定

功能要求:Master,Worker 的啓動參數運行時指定,而不是固定寫在程序中的。


MasterActor.scala 中修改代碼
    if (args.length != 3) {
      println("請輸入參數 host port MasterActor的名字")
      sys.exit()
    }
    val host = args(0)  // 服務端ip地址
    val port = args(1)  // 端口
    val masterName = args(2)  // MasterActor的名字
    ......
    // 再建立 Master 的 Actor 和 ActorRef
    val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], s"${masterName}")

WorkerActor.scala 中增修改代碼

    if (args != 6) {
      println("請輸入參數 host port WorkerActor的名字 serverHost serverPort MasterActor的名字")
    }

    val host = args(0)
    val port = args(1)
    val workerName = args(2)

    val serverHost = args(3)
    val serverPort = args(4)
    val masterName = args(5)
    ......
    // 建立 WorkerActor 的 Actor 和 ActorRef
    val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort.toInt, masterName)), s"${workerName}")

Master 配置參數截圖:


Worker 配置參數截圖:

16.9.8 Master Worker 進行分佈式部署

Master Worker 進行分佈式部署:Linux 系統 -> 如何給 maven 項目打包 -> 上傳Linux
步驟以下:
步驟一:先給 MasterActor 打包,修改 pom.xml 文件的 <mainClass>xxx</mainClass> 節點,指定咱們程序 MasterActor 的主類
即修改爲以下:<mainClass>com.atguigu.akka.sparkmasterworker.master.MasterActorApp</mainClass>

步驟二:給 MasterActor 打包

步驟三:將打好的 jar 包拷貝至某個目錄裏,並修改 jar 包名稱爲 MasterActor.jar,等待上傳至 Linux 系統

步驟四:再給 WorkerActor 打包,修改 pom.xml 文件的 <mainClass>xxx</mainClass>節點,指定咱們程序 WorkerActor 的主類
即修改爲以下:<mainClass>com.atguigu.akka.sparkmasterworker.worker.WorkerActorApp</mainClass>

步驟五:給 WorkerActor 打包,操做同 步驟二,注意:打包前咱們須要先 clean 下

步驟六:將打好的 jar 包拷貝至某個目錄裏,並修改 jar 包名稱爲 WorkerActor.jar,等待上傳至 Linux 系統

步驟七:將兩個 jar 包上傳至 Linux

步驟八:測試運行,,命令以下:

java -jar MasterActor.jar 127.0.0.1 10005 MasterActor-01
java -jar WorkerActor.jar 127.0.0.1 10001 WorkerActor-01 127.0.0.1 10005 MasterActor-01
java -jar WorkerActor.jar 127.0.0.1 10002 WorkerActor-02 127.0.0.1 10005 MasterActor-01
相關文章
相關標籤/搜索