本文首發於 泊浮目的簡書: https://www.jianshu.com/u/204...
版本 | 日期 | 備註 |
---|---|---|
1.0 | 2020.9.13 | 文章首發 |
Clean Architecture
是Bob大叔在2012年提出的一個架構模型。其根據過去幾十年中的一系列架構提煉而成:java
根據這些架構設計出來的系統,每每具備如下特色:算法
關於Clean Architecture
的介紹到此爲止,有興趣的同窗能夠自行查閱google。spring
最近寫了不少業務代碼,由於每一個組件都是分佈式部署的,致使手動測試時很是的痛苦,耗時耗力。因而我便開始思考針對業務的自動化測試方案。數據庫
目前業務中一部分的代碼使用了Storm
這個框架,咱們挑一個方便理解的用例,這裏大概涉及三個組件:編程
DispatcherBolt的核心代碼大體以下:segmentfault
@Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { super.prepare(conf, context, collector); try { init(); } catch (Exception e) { collector.reportError(e); throw new RuntimeException(e); } } @Override public void execute(Tuple dataTuple) { this.input = dataTuple; try { Object obj = dataTuple.getValueByField(EmitFields.MESSAGE); String key = (String) dataTuple.getValueByField(EmitFields.GROUP_FIELD); List<MessageEntry> messageEntries = (List<MessageEntry>) obj; emitMessageEntry(key, messageEntries); this.collector.ack(dataTuple); } catch (Exception e) { logger.info("Dispatcher Execute error: ", e); this.collector.reportError(e); this.collector.fail(dataTuple); } } private void emitMessageEntry(String key, List<MessageEntry> messageEntries) throws Exception { long lastPos = 0L, uniquePos = 0L, payloadSize = 0L; UmsMessageBuilder builder = null; // 處理任一schema 分組的表數據 String tableName = messageEntries.get(0).getEntryHeader().getTableName(); for (MessageEntry msgEntry : messageEntries) { EntryHeader header = msgEntry.getEntryHeader(); header.setLastPosition(lastPos); // 若使用schema 進行分組,則同一組數據中可能會出現多張表的情形,須要處理表名出現切換的狀況 if (StringUtils.isEmpty(tableName) || (getExtractorConfig().getGroupType() == GroupType.SCHEMA && !StringUtils.equalsIgnoreCase(tableName, header.getTableName()))) { emitBuilderMessage(builder, key); builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(), MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType())); payloadSize = 0; } // DDL處理 if (msgEntry.isDdl()) { emitBuilderMessage(builder, key); executeDdlEvent(msgEntry); emitDDLMessage(key, msgEntry); builder = null; continue; } if (builder != null && msgEntry.getEntryHeader().getHeader().getSourceType().equalsIgnoreCase(MediaType.DataSourceType.ORACLE.getName())) { emitBoltMessage(key, builder.getMessage()); builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(), MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType())); payloadSize = 0; } // DML處理 if (builder == null) { builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(), MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType())); payloadSize = 0; } for (CanalEntry.RowData rowData : msgEntry.getRowDataLst()) { lastPos = Long.parseLong(header.getPosition()) + (++uniquePos); // 對UPDATE類型的增量數據特殊處理 if (header.isUpdate()) { if (getExtractorConfig().getOutputBeforeUpdateFlg()) { payloadSize += appendUpdateBefore2Builder(builder, header, rowData, EventType.BEFORE.getValue().toLowerCase()); } if (ExtractorHelper.isPkUpdate(rowData.getAfterColumnsList())) { payloadSize += appendUpdateBefore2Builder(builder, header, rowData, getEventTypeForUMS(CanalEntry.EventType.DELETE)); } } List<Object> payloads = new ArrayList<>(); payloadSize += appendRowData2Builder(payloads, builder, header, rowData); builder.appendPayload(payloads.toArray()); } } } // 最後一批數據發送 emitBuilderMessage(builder, key); }
注意,這裏的兩個方法prepare
和execute
都是框架暴露出來的接口,用於初始化時得到strom的上下文以及strom下發的對象。若是開發者使用不當,則會致使業務代碼和框架耦合。設計模式
這個方案在早期的時候作過嘗試,簡單的來講就是將中間那段emitMessageEntry
相關的代碼抽象成一個對象,並用接口表示。然而經過spring這種IOC框架注入進來,相似於:架構
override fun prepare(topoConf: MutableMap<String, Any>, context: TopologyContext, collector: OutputCollector) { super.prepare(topoConf, context, collector) try { init() this.dispatcherServer = IOCUtil.getBean(DispatcherServer::class.java).init(collector) } catch (e: Exception) { collector.reportError(e) throw RuntimeException(e) } } override fun execute(input: Tuple) { val obj = dataTuple.getValueByField(EmitFields.MESSAGE) val key = dataTuple.getValueByField(EmitFields.GROUP_FIELD) as String val messageEntries = obj as List<MessageEntry> dispatcherService.dispatcherLogical(messageEntries,key) }
這樣咱們在單元測試裏能夠直接將dispatcherService
類注入進來,並本身實現一個OutputCollector
用於收集分發的數據。而後將mock的參數填入,並斷言結果是否符合咱們的期待。app
但因爲storm會涉及到分發相關事宜(如序列化),這會讓業務代碼有點變扭:框架
dispatcherService
成員在Bolt裏聲明爲Transient
能夠看到,咱們爲了測試,居然不得不修改業務代碼來加入可有可無的邏輯,這顯然不是一個好的方案。
Mockito實現的方案對業務沒有任何入侵性,直接寫測試代碼便可,寫出來的代碼相似於:
@RunWith(PowerMockRunner::class) @PowerMockIgnore("javax.management.*") class DispatcherBoltTest { private lateinit var config: AbstractSinkConfig private lateinit var outputCollector: OutputCollector private lateinit var tuple: Tuple @Before fun atBefore() { config = PowerMockito.mock(AbstractSinkConfig::class.java) outputCollector = PowerMockito.mock(OutputCollector::class.java) tuple = PowerMockito.mock(Tuple::class.java) } private fun init(dispatcherBoltImpl: DispatcherBoltImpl) { reset(config) reset(outputCollector) reset(tuple) dispatcherBoltImpl.prepare(mutableMapOf(), PowerMockito.mock(TopologyContext::class.java), outputCollector) } @Test fun testSingleUms() { //定義mock對象的一些行爲 `when`(config.configProps).thenReturn(Properties()) //將須要測試的類實例化 val dispatcherBoltImpl = DispatcherBoltImpl(config) init(dispatcherBoltImpl) val umsMap = generateSingleUmsBo() val boMap = getBoMap(intArrayOf(1)) //定義mock對象的一些行爲 `when`(tuple.getValueByField(EmitFields.MESSAGE)).thenReturn(umsMap.messages) `when`(tuple.getValueByField(EmitFields.GROUP_FIELD)).thenReturn(umsMap.dispatchKey) `when`(tuple.getValueByField(EmitFields.EX_BO)).thenReturn(boMap) dispatcherBoltImpl.handleDataTuple(tuple) // 結果驗證 Mockito.verify(outputCollector, Mockito.times(1)) .emit(EmitFields.DATA_MSG, tuple, Values(umsMap.dispatchKey, umsMap.messages, boMap, EmitFields.EMIT_TO_BOLT)) } }
邏輯很清晰易懂:先選擇須要mock的對象,並定義其被mock的行爲,而後把數據填裝進去便可,最後根據結果校驗——本質上將業務和框架的行爲一塊兒測試了進去。
但若是把視野放高點看,有兩個潛在的問題須要考慮:
根據前面提到的,咱們要作的第一件事就是剝離業務和框架的耦合。那麼該如何剝離呢?咱們直接拿出答案:
/** * 剝離與任何流處理框架的耦合,僅關注UMS分發的服務 * */ interface DispatcherServer { fun dispatcherMessageEntry(key: String, messageEntries: List<MessageEntry>, destination: String, tableToDispatchColumn: HashMap<String, Set<String>>, resultConsumer: (group: MutableMap<Int, UmsMessageBuilder>, key: String) -> Unit, executeDdlEventBlock: (messageEntry: MessageEntry) -> Unit, ddlMessageConsumer: (key: String, messageEntry: MessageEntry) -> Unit) }
咱們定義了三個函數型參數。利用這種方式,咱們能夠輕易的將業務和框架隔離開來。因而代碼調用起來就像這樣:
override fun execute(dataTuple: Tuple) { input = dataTuple try { val obj = dataTuple.getValueByField(EmitFields.MESSAGE) val key = dataTuple.getValueByField(EmitFields.GROUP_FIELD) as String val messageEntries = obj as List<MessageEntry> dispatcherServer.dispatcherMessageEntry(key, messageEntries, destination, tableToDispatchColumn, resultConsumer = { builder, innerKey -> emitBuilderMessage(builder, innerKey) }, executeDdlEventBlock = { entry -> executeDdlEvent(entry) }, ddlMessageConsumer = { innerKey, msgEntry -> emitDDLMessage(innerKey, msgEntry) } ) collector.ack(dataTuple) } catch (e: Exception) { logger.info("Dispatcher Execute error: ", e) collector.reportError(e) collector.fail(dataTuple) } }
emitBuilderMessage
、executeDdlEvent
、emitDDLMessage
只是DispatcherBolt中的一個私有方法,裏面會將傳入的數據經過collector按照必定規則下發下去。這樣,咱們就將框架相關的代碼放在了DispatcherBolt裏。
而和框架無關的業務代碼,咱們則能夠將它放到DispatcherServer
的實現中去。
測試的代碼也能夠專一在測試業務邏輯上:
@Test fun testUpdateRecords() { val originNamespace = "my_schema.my_table" val mockData = listOf(getUpdate1Data()) val config = getMockConfig(extractorConfigJsonFile) config.outputBeforeUpdateFlg = false config.outputExtraValueFlg = false config.payloadType = PayloadType.SIZE config.maxPayloadSize = 10240 val dispatcherServer = DispatcherServerImpl(config) val resultMap = mutableMapOf<Int, UmsMessageBuilder>() dispatcherServer.dispatcherMessageEntry(originNamespace, mockData, "M26", hashMapOf(), resultConsumer = { builder, innerKey -> resultMap.putAll(builder) Assert.assertEquals(innerKey, originNamespace) }, executeDdlEventBlock = { throw RuntimeException("這堆數據中不該該出現DDL事件") }, ddlMessageConsumer = { _, _ -> throw RuntimeException("這堆數據中不該該出現DDL相關的結果") }) assertEquals(1, resultMap.keys.toSet().size, "當前數據中,應該被分爲3組——根據主鍵分發原則,他們來自於不一樣的主鍵") assertEquals(1, resultMap.size, "當前數據中,應該被分爲3組——根據主鍵分發原則,他們來自於不一樣的主鍵") val umsList = resultMap.values.map { it.message } umsList.forEach { Assert.assertEquals("m.M26.my_schema.my_table", it.schema.namespace) Assert.assertEquals(1, it.payloads.size) assertEquals(9, it.schema.fields.size, "5個擴展字段+4個schema字段應該爲9") Assert.assertEquals("inc", it.protocol.type) Assert.assertEquals("2", it.protocol.version) assertEquals(MediaType.DataSourceType.MYSQL, KafkaKeyUtils.getDataSourceType(it)) } }
看完了效果,咱們再來談談上面所用到技巧。其實這很像面向對象中的Strategy模式——定義一個算法接口,並將每一種算法都在這個接口下實現其邏輯,令同一個類型的算法可以互換使用。這樣作的好處是算法的變化不影響使用方,也不受使用方的影響。而若是函數是一等公民的話,則會讓創建和操縱各類策略的工做變得十分簡單。
那麼怎樣是不簡單的呢?若是用java的話,咱們得先定義一個專門的接口,聲明一個方法,在使用時用匿名內部實現將它傳入,但這其實沒什麼必要,由於咱們僅僅想傳一個函數進去,而不是對象。典型的代碼能夠見:
ZStack源碼剖析之設計模式鑑賞——策略模式: https://segmentfault.com/a/11...
設計模式要作的事不外乎減小代碼冗餘度,提升代碼複用性。而在函數式語言中,複用主要表現爲經過參數來傳遞做爲第一等語言成分的函數,各類函數式編程庫都頻繁地運用了這種手法。與面嚮對象語言相比(以類型爲單位),函數式語言的重用發生於較粗的粒度級別上(以行爲爲單位),着眼於提取一些共通的運做機制,並參數化地調整其行爲。