大众点评CAT开源监控系统剖析
?參考文檔:
大眾點評的實時監(jiān)控系統分析(一)
CAT_source_analyze
透過CAT,來看分布式實時監(jiān)控系統的設計與實現
深度剖析開源分布式監(jiān)控CAT
[分布式監(jiān)控CAT] Client端源碼解析
大眾點評Cat--架構分析
大眾點評Cat--Server模塊架構分析
Plexus,Spring之外的IoC容器
plexus使用(一)
Spring Cloud Sleuth使用簡介
Spring Cloud Sleuth 整合Zipkin、RabbitMQ 和 (Mysql或Elasticsearch)
Cat監(jiān)控Druid數據庫連接池
1. 介紹
1.1 概述
CAT(Central Application Tracking)基于Java開發(fā)的實時監(jiān)控平臺,主要包括移動端監(jiān)控,應用側監(jiān)控,核心網絡層監(jiān)控,系統層監(jiān)控等。
CAT是一個提供實時監(jiān)控報警,應用性能分析診斷的工具。
1.2 CAT能做什么
在此之前,先來想一想對于線上應用我們希望能監(jiān)控些什么?可能有如下這些:
- 機器狀態(tài)信息。CPU負載、內存信息、磁盤使用率這些是必需的,另外可能還希望收集Java進程的數據,例如線程棧、堆、垃圾回收等信息,以幫助出現問題時快速debug。
- 請求訪問情況。例如請求個數、響應時間、處理狀態(tài),如果有處理過程中的時間分析那就更完美了。
- 異常情況。譬如緩存服務時不時出現無響應,我們希望能夠監(jiān)控到這種異常,從而做進一步的處理。
- 業(yè)務情況。例如訂單量統計,銷售額等等。
CAT支持的監(jiān)控消息類型包括:
- Transaction 適合記錄跨越系統邊界的程序訪問行為,比如遠程調用,數據庫調用,也適合執(zhí)行時間較長的業(yè)務邏輯監(jiān)控,Transaction用來記錄一段代碼的執(zhí)行時間和次數。
- Event 用來記錄一件事發(fā)生的次數,比如記錄系統異常,它和transaction相比缺少了時間的統計,開銷比transaction要小。
- Heartbeat 表示程序內定期產生的統計信息, 如CPU%, MEM%, 連接池狀態(tài), 系統負載等。
- Metric 用于記錄業(yè)務指標、指標可能包含對一個指標記錄次數、記錄平均值、記錄總和,業(yè)務指標最低統計粒度為1分鐘。
- Trace 用于記錄基本的trace信息,類似于log4j的info信息,這些信息僅用于查看一些相關信息
在一個請求處理中可能產生有多種消息,CAT將其組織成消息樹的形式。
在處理開始時,默認開始一個類型為URL的Transaction,在這個Transaction中業(yè)務本身可以產生子消息。例如,產生一個數據庫訪問的子Transaction或者一個訂單統計的Metric。結構如下所示:
?
1.3 分布式監(jiān)控系統要求
- 方便安裝
- 要求輕量
- 界面盡可能友好
- 監(jiān)控策略豐富,監(jiān)控元素多樣化
- 可以嵌套開發(fā)
- 占用服務器資源小,使用時不過多占用機器硬件方面資源,對實際業(yè)務影響較小
1.4 CAT使用特點
- 異步化傳輸數據,不太影響正常業(yè)務
- 實時監(jiān)控
- 輕量,部署簡單
- 嵌入簡單
- 有問題跟蹤報表
- 消息樹形化
- 日志不落地本地磁盤,較少IO,但很消耗網絡資源
- 監(jiān)控消息,按照分業(yè)務傳輸數據,如業(yè)務場景,時間等要求傳輸數據
- 有報警機制
- 可能復雜的消息存儲和消息ID查詢看起來麻煩,需要建立查詢索引(目前不考慮這個東東)
- 消息隊列異步化發(fā)送
- 開源(這個最重要)
2. CAT設計
2.1 整體設計
2.2 客戶端設計
2.3 服務端設計
?
2.4 領域建模
?
3. 模塊劃分
?
3.1 模塊說明
3.1.1 client端
cat-client 提供給業(yè)務以及中間層埋點的底層SDK。
3.1.2 server端
cat-consumer 用于實時分析從客戶端提供的數據。
cat-home 作為用戶給用戶提供展示的控制端?,并且cat-home做展示時,通過對cat-consumer的調用獲取其他節(jié)點的數據,將所有數據匯總展示。
consumer、home以及路由中心都是部署在一起的,每個服務端節(jié)點都可以充當任何一個角色。
CAT服務端在整個實時處理中,基本上實現了全異步化處理:
- 消息消費基于Netty的NIO實現(Netty-Server);
- 消息消費到服務端就存放內存隊列,然后程序開啟一個線程會消費這個消息做消息分發(fā)(異步消費處理);
- 每個消息都會有一批線程并發(fā)消費各自隊列的數據,以做到消息處理的隔離。(每報表每線程,分別按照自己的規(guī)則解析消費這個消息,并且可以動態(tài)控制對某種報表類型的處理線程個數);
- 消息(原始的消息logView)存儲是先存入本地磁盤,然后異步上傳到HDFS文件,這也避免了強依賴HDFS;
4. 設計原理
4.1 cat-client設計
作為一個日志上報的通用客戶端,考慮點至少有如下這些:
- 為了盡可能減少對業(yè)務的影響,需要對消息進行異步處理。即業(yè)務線程將消息交給CAT客戶端與CAT客戶端上報這兩個過程需要異步。
- 為了達到實時的目的以及適應高并發(fā)的情況,客戶端上報應該基于TCP而非HTTP開發(fā)。
- 在線程安全的前提下盡可能的資源低消耗以及低延時。我們知道,線程競爭的情況是由于資源共享造成的,要達到線程安全通常需要減少資源共享或者加鎖,而這兩點則會導致系統資源冗余和高延時。
CAT客戶端實現并不復雜,但這些點都考慮到了。它的架構如下所示:
大概步驟為:
- 業(yè)務線程產生消息,交給消息Producer,消息Producer將消息存放在該業(yè)務線程消息棧中;
- 業(yè)務線程通知消息Producer消息結束時,消息Producer根據其消息棧產生消息樹放置在同步消息隊列中;
- 消息上報線程監(jiān)聽消息隊列,根據消息樹產生最終的消息報文上報CAT服務端。
4.1.1 cat-client包結構
└─com├─dianping│ └─cat│ ├─build│ ├─configuration│ ├─log4j│ ├─message│ │ ├─internal│ │ ├─io│ │ └─spi│ │ ├─codec│ │ └─internal│ ├─servlet│ └─status└─site├─helper└─lookup└─util?
4.1.2?com.dianping.cat.message包介紹
包結構如下:
com.dianping.cat.message中主要包含了internal、io、spi這三個目錄:
- internal目錄包含主要的CAT客戶端內部實現類;
- io目錄包含建立服務端連接、重連、消息隊列監(jiān)聽、上報等io實現類;
- spi目錄為上報消息工具包,包含消息二進制編解碼、轉義等實現類。
其uml圖如下所示(可以放大看):
類的功能如下:
- Message為所有上報消息的抽象,它的子類實現有Transaction、Metric、Event、HeartBeat、Trace這五種。
- MessageProducer封裝了所有接口,業(yè)務在使用CAT時只需要通過MessageProducer來操作。
- MessageManager為CAT客戶端核心類,相當于MVC中的Controller。
- Context類保存消息上下文。
- TransportManager提供發(fā)送消息的sender,具體實現有DefaultTransportManager,調用其getSender接口返回一個TcpSocketSender。
- TcpSocketSender類負責發(fā)送消息。
1)Message
上面說到,Message有五類,分別為Transaction、Metric、Event、HeartBeat、Trace。其中Metric、Event、HeartBeat、Trace基本相同,保存的數據都為一個字符串;而Transaction則保存一個Message列表。換句話說,Transaction的結構為一個遞歸包含的結構,其他結構則為原子性結構。
下面為DefaultTransaction的關鍵數據成員及操作:
public class DefaultTransaction extends AbstractMessage implements Transaction {private List<Message> m_children;private MessageManager m_manager;...//添加子消息public DefaultTransaction addChild(Message message) {...}//Transaction結束時調用此方法public void complete() {...m_manager.end(this); //調用MessageManager來結束Transaction ...}?
值得一提的是,Transaction(或者其他的Message)在創(chuàng)建時自動開始,消息結束時需要業(yè)務方調用complete方法,而在complete方法內部則調用MessageManager來完成消息。
2)MessageProducer
MessageProducer對業(yè)務方封裝了CAT內部的所有細節(jié),它的主要方法如下:
public void logError(String message, Throwable cause); public void logEvent(String type, String name, String status, String nameValuePairs); public void logHeartbeat(String type, String name, String status, String nameValuePairs); public void logMetric(String name, String status, String nameValuePairs); public void logTrace(String type, String name, String status, String nameValuePairs); ... public Event newEvent(String type, String name); public Event newEvent(Transaction parent, String type, String name); public Heartbeat newHeartbeat(String type, String name); public Metric newMetric(String type, String name); public Transaction newTransaction(String type, String name); public Trace newTrace(String type, String name);?
logXXX方法為方法糖(造詞小能手呵呵),這些方法在調用時需要傳入消息數據,方法結束后消息自動結束。
newXXX方法返回相應的Message,業(yè)務方需要調用Message方法設置數據,并最終調用Message.complete()方法結束消息。
MessageProducer只是接口封裝,消息處理主要實現依賴于MessageManager這個類。
3)MessageManager
MessageManager為CAT的核心類,但它只是定義了接口,具體實現為DefaultMessageManager。DefaultMessageManager這個類里面主要包含了兩個功能類,Context和TransportManager,分別用于保存上下文和消息傳輸。TransportManager運行期間為單例對象,而Context則包裝成ThreadLocal為每個線程保存上下文。
我們通過接口來了解DefaultMessageManager的主要功能:
public void add(Message message); public void start(Transaction transaction, boolean forked); public void end(Transaction transaction);public void flush(MessageTree tree);?
add()方法用來添加原子性的Message,也就是Metric、Event、HeartBeat、Trace。
start()和end()方法用來開始和結束Transaction這種消息。
flush()方法用來將當前業(yè)務線程的所有消息刷新到CAT服務端,當然,是異步的。
4)Context
Context用來保存消息上下文,我們可以通過它的主要接口來了解它功能:
public void add(Message message) {if (m_stack.isEmpty()) {MessageTree tree = m_tree.copy();tree.setMessage(message);flush(tree);} else {Transaction parent = m_stack.peek();addTransactionChild(message, parent);}}add方法主要添加原子性消息,它先判斷該消息是否有上文消息(即判斷是否處于一個Transaction中)。如果有則m_stack不為空并且將該消息添加到上文Transaction的子消息隊列中;否則直接調用flush來將此原子性消息刷新到服務端。
public void start(Transaction transaction, boolean forked) {if (!m_stack.isEmpty()) {...Transaction parent = m_stack.peek();addTransactionChild(transaction, parent);} else {m_tree.setMessage(transaction);}if (!forked) {m_stack.push(transaction);} }start方法用來開始Transaction(Transaction是消息里比較特殊的一種),如果當前消息棧為空則證明該Transaction為第一個Transaction,使用消息樹保存該消息,同時將該消息壓棧;否則將當前Transaction保存到上文Transaction的子消息隊列中,同時將該消息壓棧。
public boolean end(DefaultMessageManager manager, Transaction transaction) { if (!m_stack.isEmpty()) {Transaction current = m_stack.pop();...if (m_stack.isEmpty()) {MessageTree tree = m_tree.copy();m_tree.setMessageId(null);m_tree.setMessage(null);...manager.flush(tree); //刷新消息到CAT服務端return true;}}return false; }end方法用來結束Transaction,每次調用都會pop消息棧,如果棧為空則調用flush來刷新消息到CAT服務端。
綜上,Context的m_stack的結構如下:
Transaction之間是有引用的,因此在end方法中只需要將第一個Transaction(封裝在MessageTree中)通過MessageManager來flush,在拼接消息時可以根據這個引用關系來找到所有的Transaction :)。
5)TransportManager和TcpSocketSender
這兩個類用來發(fā)送消息到服務端。MessageManager通過TransportManager獲取到MessageSender,調用sender.send()方法來發(fā)送消息。 TransportManager和MessageSender關系如下:
TCPSocketSender為MessageSender的具體子類,它里面主要的數據成員為:
private MessageCodec m_codec; private MessageQueue m_queue = new DefaultMessageQueue(SIZE); private ChannelManager m_manager;-
MessageCodec:CAT基于TCP傳輸消息,因此在發(fā)送消息時需要對字符消息編碼成字節(jié)流,這個編碼的工作由MessageCodec負責實現。
-
MessageQueue:還記得剛才說業(yè)務方在添加消息時,CAT異步發(fā)送到服務端嗎?在添加消息時,消息會被放置在TCPSocketSender的m_queue中,如果超出queue大小則拋棄消息。
-
ChannelManager:CAT底層使用netty來實現TCP消息傳輸,ChannelManager負責維護通信Channel。通俗的說,維護連接。
TCPSocketSender主要方法為initialize、send和run,分別介紹如下:
public void initialize() {m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);Threads.forGroup("cat").start(this);Threads.forGroup("cat").start(m_manager);... }initialize方法為初始化方法,在執(zhí)行時主要創(chuàng)建兩個線程,一個用來運行自身run方法(TCPSocketSender實現了Runnable接口)監(jiān)聽消息隊列;另一個則用來執(zhí)行ChannelManager維護通信Channel。
public void send(MessageTree tree) {if (isAtomicMessage(tree)) {boolean result = m_atomicTrees.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}} else {boolean result = m_queue.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}} }send方法被MessageManager調用,把消息放置在消息隊列中。
public void run() {m_active = true;while (m_active) {ChannelFuture channel = m_manager.channel();if (channel != null && checkWritable(channel)) {try {MessageTree tree = m_queue.poll();if (tree != null) {sendInternal(tree);tree.setMessage(null);}} catch (Throwable t) {m_logger.error("Error when sending message over TCP socket!", t);}} else {try {Thread.sleep(5);} catch (Exception e) {// ignore itm_active = false;}}} }private void sendInternal(MessageTree tree) {ChannelFuture future = m_manager.channel();ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K m_codec.encode(tree, buf);int size = buf.readableBytes();Channel channel = future.channel();channel.writeAndFlush(buf);if (m_statistics != null) {m_statistics.onBytes(size);} }run方法會一直執(zhí)行直到進程退出,在循環(huán)里先獲取通信Channel,然后發(fā)送消息。值得注意的是,sendInternal方法在執(zhí)行時調用m_codec.encode(tree, buf),參數為消息樹和緩沖區(qū)。消息樹里面其實只保存了一個消息,還記得剛才說的Transaction上下文引用嗎?m_codec在encode的時候會判斷消息類型是否為Transaction,如果為Transaction則會遞歸獲取子Transaction,否則直接將該消息編碼。具體實現可以參考源代碼的PlainTextMessageCodec類的encode方法,此處不再贅述。
4.1.3 cat-client 主要類介紹
cat-client的主要入口是cat-client包中的Cat類
Cat類以及Cat的依賴類層級結構如下:
接口層
Cat類以及MessageProducer類。主要功能是為外部提供api,Cat主要作用是與plexus框架做集成,MessageProducer是處理api的主要類
PS:額外說一下,Cat這個項目很【有特色】地用了plexus作為管理容器,初次接觸的時候真是讓人頭大,plexus的基本功能和spring可以說別無二致,但是很多地方的注入竟然都需要手動處理,真是讓人尷尬,雖然作者說spring太重了,plexus的作用已經足夠
消息處理層
MessageManager以及其內部類Context。主要功能是管理消息的發(fā)送,Transaction類消息的歸集,等消息的管理工作。在MessageManager中,使用了ThreadLocal類型作為當前線程消息管理的上下文,通過這個對象線程安全地實現消息的添加,合并,發(fā)送等等。
PS:MessageManager管理的消息Message是基于Cat的監(jiān)控模型創(chuàng)建的,其中最主要的區(qū)別是Transaction類和其他消息不太一樣,Transaction消息是一個鏈表的模型,每一個消息后面都鏈接著下一個消息,所以MessageManager對Transaction的處理也不同,別的消息都是放到Context中直接從消息處理層flush到下一層,Transaction是放到Context的棧中,直到過了預定時間,或者消息達到規(guī)定的最大長度才flush到下一層。
消息傳輸層
TransportManager以及TcpSocketSender以及ChannelManager。主要功能是把消息管理層發(fā)下來的消息進行發(fā)送,對于與多個發(fā)送的目的服務器進行Channel管理,保證有可用服務器能接受消息。TransportManager主要功能是根據配置文件初始化TcpSocketSender,TcpSocketSender主要實現把Message進行編碼(如果是Transaction還會進行合并)并放置到待發(fā)送隊列中,再同時由ChannelManager消費隊列中的消息,將消息發(fā)送給狀態(tài)為active的server端
PS:暫存消息的隊列用的是LinkedBlockingQueue,實際上LinkedBlockingQueue屬于生產消費者隊列的標配了,因為這個類對于添加和移除的消耗小,線程安全,而且達到隊列容量時會成為blocking狀態(tài),所以基本上都會用這個類,或者基于這個類進行擴展來實現相關需求。相對來說還有ConcurrentLinkedQueue可以用,和blockingqueue的主要區(qū)別是,Concurrent超過主要容量會直接返回false,不會block,所以如果想馬上就返回的可以用Concurrent隊列。
4.1.4 Cat入口類
1)測試用例
//靜態(tài)方法獲取Transaction對象Transaction t=Cat.newTransaction("logTransaction", "logTransaction");TimeUnit.SECONDS.sleep(30);t.setStatus("0");t.complete();2)Cat源碼
private static Cat s_instance = new Cat();private static volatile boolean s_init = false;private static void checkAndInitialize() {if (!s_init) {synchronized (s_instance) {if (!s_init) {initialize(new File(getCatHome(), "client.xml"));log("WARN", "Cat is lazy initialized!");s_init = true;}}}}private Cat() {}public static MessageProducer getProducer() {checkAndInitialize();return s_instance.m_producer;}Cat lazy Init
可以看到類加載時已經完成了Cat對象的初始化,內存中有且僅有一個Cat Object(static Cat s_instance = new Cat();),但是包含配置信息的完整的Cat對象并沒有完全初始化完成。調用Cat時會先嘗試獲取producer對象,并在獲取之前檢查客戶端配置是否加載完畢(checkAndInitialize)。
checkAndInitialize()通過使用doublecheck來對Cat相關配置填充的單次初始化加載。
cat-client首先會使用plexus(一個比較老的IOC容器)加載配置文件/META-INF/plexus/plexus.xml,完成IOC容器的初始化。
接著使用../../client.xml文件完成cat對象的配置信息填充初始化。并且啟動這四個daemon線程,后文詳細說明:
- cat-StatusUpdateTask 用來每秒鐘上報客戶端基本信息(JVM等信息)
- cat-merge-atomic-task(消息合并檢查)
- cat-TcpSocketSender-ChannelManager(NIO 連接服務端檢查)
- cat-TcpSocketSender(消息發(fā)送服務端)
4.1.5?CatClientModule
由于Cat用了十分low的plexus作為容器,所以在加載Cat類的時候會從靜態(tài)方法中加載各個Module,CatClientModule就是Cat client工程中首要Module
public class CatClientModule extends AbstractModule {public static final String ID = "cat-client";@Overrideprotected void execute(final ModuleContext ctx) throws Exception {ctx.info("Current working directory is " + System.getProperty("user.dir"));// initialize milli-second resolution level timer MilliSecondTimer.initialize();// tracking thread start/stop,此處增加經典的hook,用于線程池關閉的清理工作。Threads.addListener(new CatThreadListener(ctx));// warm up Cat Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());// bring up TransportManagerctx.lookup(TransportManager.class);ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);if (clientConfigManager.isCatEnabled()) {// start status update taskStatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);Threads.forGroup("cat").start(statusUpdateTask);LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms// MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);// Threads.forGroup("cat").start(mmapReaderTask); }}這里plexusIOC的具體的初始化加載邏輯在org\unidal\framework\foundation-service\2.5.0\foundation-service-2.5.0.jar中,有興趣可以仔細查看。?
當準備工作做完之后,會執(zhí)行具體的消息構造:
DefaultMessageProducer.newTransaction(String type, String name)
@Overridepublic Transaction newTransaction(String type, String name) {// this enable CAT client logging cat message without explicit setupif (!m_manager.hasContext()) {//詳細可見下文源碼,此處就是用ThreadLocal存儲一個Context對象:ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); m_manager.setup();}if (m_manager.isMessageEnabled()) {DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);//向Context中填充構造的消息體:Context.m_tree;Context.m_stack;稍后看看Context這個對象m_manager.start(transaction, false);return transaction;} else {return NullMessage.TRANSACTION;}}DefaultMessageManager.start(Transaction transaction, boolean forked)
@Overridepublic void start(Transaction transaction, boolean forked) {Context ctx = getContext();//這里獲取上文中說到的ThreadLocal中構造的Context對象if (ctx != null) {ctx.start(transaction, forked);if (transaction instanceof TaggedTransaction) {TaggedTransaction tt = (TaggedTransaction) transaction;m_taggedTransactions.put(tt.getTag(), tt);}} else if (m_firstMessage) {m_firstMessage = false;m_logger.warn("CAT client is not enabled because it's not initialized yet");}}DefaultMessageManager.Context.start(Transaction transaction, boolean forked)
public void start(Transaction transaction, boolean forked) {if (!m_stack.isEmpty()) {// {Transaction parent = m_stack.peek();addTransactionChild(transaction, parent);}} else {m_tree.setMessage(transaction);//在這里把返回的transaction放在tree上,如果有嵌套結構,后邊繼續(xù)在tree上添枝加葉 }if (!forked) {m_stack.push(transaction);}}這部分代碼可以看出,?
通過ThreadLocal<Context.>,使Context中實際的消息的構造保證了線程安全。
如果當前Context的棧m_stack不為空,那么接著之前的消息后邊,將當前消息構造為一個孩子結點。如果當前消息之前沒有其他消息,放入m_stack中,并setMessage.也就是當前消息時父節(jié)點。
至此,消息體構造完畢。?
這里需要看一下Context類,是DefaultMessageManager包私有的內部類。
Context.java
class Context {private MessageTree m_tree;//初始化的時候構建一個MessageTreeprivate Stack<Transaction> m_stack;private int m_length;private boolean m_traceMode;private long m_totalDurationInMicros; // for truncate messageprivate Set<Throwable> m_knownExceptions;public Context(String domain, String hostName, String ipAddress) {m_tree = new DefaultMessageTree();m_stack = new Stack<Transaction>();Thread thread = Thread.currentThread();String groupName = thread.getThreadGroup().getName();m_tree.setThreadGroupName(groupName);m_tree.setThreadId(String.valueOf(thread.getId()));m_tree.setThreadName(thread.getName());m_tree.setDomain(domain);m_tree.setHostName(hostName);m_tree.setIpAddress(ipAddress);m_length = 1;m_knownExceptions = new HashSet<Throwable>();}每個線程通過使用ThreadLocal構造一個Context對象并存儲。Context主要包含當前的消息體m_tree,和多個嵌套消息體填充的棧:m_stack :
再回到我們原來的UnitTest代碼,?
Transaction t=Cat.newTransaction("logTransaction", "logTransaction");這行代碼完成了客戶端plexusIOC容器的初始化,cat-client的加載初始化、啟動了四個daemon線程,并返回了Transaction對象。
t.setStatus("0");//很簡單,就是這是一個屬性值 t.complete();消息完成后,將消息放入一個隊列中,從而保證異步上報。
transaction.complete();的具體代碼如下:
........public void complete() {try {if (isCompleted()) {// complete() was called more than onceDefaultEvent event = new DefaultEvent("cat", "BadInstrument");event.setStatus("TransactionAlreadyCompleted");event.complete();addChild(event);} else {m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;setCompleted(true);if (m_manager != null) {m_manager.end(this);}}} catch (Exception e) {// ignore }} ........@Overridepublic void end(Transaction transaction) {Context ctx = getContext();if (ctx != null && transaction.isStandalone()) {if (ctx.end(this, transaction)) {m_context.remove();}}} ........public boolean end(DefaultMessageManager manager, Transaction transaction) {if (!m_stack.isEmpty()) {Transaction current = m_stack.pop();//Context的成員變量m_stack彈出棧頂元素,LIFO當然是最新的current元素。if (transaction == current) {m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);} else {while (transaction != current && !m_stack.empty()) {m_validator.validate(m_stack.peek(), current);current = m_stack.pop();}}if (m_stack.isEmpty()) {//如果當前線程存儲的Context中m_stack無元素MessageTree tree = m_tree.copy();m_tree.setMessageId(null);//清理m_treem_tree.setMessage(null);if (m_totalDurationInMicros > 0) {adjustForTruncatedTransaction((Transaction) tree.getMessage());}manager.flush(tree);//將消息放入消費隊列中return true;}}return false;} ........public void flush(MessageTree tree) {if (tree.getMessageId() == null) {tree.setMessageId(nextMessageId());//為消息體生產全局唯一ID,詳見snowflate算法 }MessageSender sender = m_transportManager.getSender();if (sender != null && isMessageEnabled()) {sender.send(tree);reset();//ThreadLocal中存儲的Context清理} else {m_throttleTimes++;if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);}}} ........private Context getContext() {if (Cat.isInitialized()) {Context ctx = m_context.get();//ThreadLocal存儲一個Context對象if (ctx != null) {return ctx;} else {if (m_domain != null) {ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());} else {ctx = new Context("Unknown", m_hostName, "");}m_context.set(ctx);return ctx;}}return null;}//TcpSocketSender.send(MessageTree tree)private MessageQueue m_queue = new DefaultMessageQueue(SIZE);private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);@Overridepublic void send(MessageTree tree) {if (isAtomicMessage(tree)) {boolean result = m_atomicTrees.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}} else {boolean result = m_queue.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}}}至此,構造的消息體放入了阻塞隊列中等待上傳。
總結:?我們可以看到Cat-SDK通過ThreadLocal對消息進行收集,?
收集進來按照時間以及類型構造為Tree結構,在compele()方法中將這個構造的消息放入一個內存隊列中,等待TcpSockekSender這個Daemon線程異步上報給服務端。
?4.1.6 cat-TcpSocketSender
消息上傳服務端,會有一個線程cat-TcpSocketSender監(jiān)聽消費隊列,并消費(上傳服務端)。
通信上報服務端使用了Netty-Client,并且自定義了消息協議。
@Overridepublic void run() {m_active = true;while (m_active) {ChannelFuture channel = m_manager.channel();if (channel != null && checkWritable(channel)) {try {MessageTree tree = m_queue.poll();if (tree != null) {sendInternal(tree);//netty NIO編碼后TCP發(fā)送到服務端。tree.setMessage(null);}} catch (Throwable t) {m_logger.error("Error when sending message over TCP socket!", t);}} else {long current = System.currentTimeMillis();long oldTimestamp = current - HOUR;while (true) {try {MessageTree tree = m_queue.peek();if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {MessageTree discradTree = m_queue.poll();if (discradTree != null) {m_statistics.onOverflowed(discradTree);}} else {break;}} catch (Exception e) {m_logger.error(e.getMessage(), e);break;}}try {Thread.sleep(5);} catch (Exception e) {// ignore itm_active = false;}}}}private void sendInternal(MessageTree tree) {ChannelFuture future = m_manager.channel();ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K System.out.println(tree);m_codec.encode(tree, buf);//編碼后發(fā)送int size = buf.readableBytes();Channel channel = future.channel();channel.writeAndFlush(buf);if (m_statistics != null) {m_statistics.onBytes(size);}}?
4.1.7?cat-merge-atomic-task
符合如下邏輯判斷的atomicMessage會放入m_atomicTrees消息隊列,然后由這個后臺線程監(jiān)聽并消費。?
具體代碼如下:
TcpSocketSender.java
private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);......private boolean isAtomicMessage(MessageTree tree) {Message message = tree.getMessage();//從tree上拿去messageif (message instanceof Transaction) {//如果這個message實現了Transaction接口,也就是Transaction類型的消息String type = message.getType();if (type.startsWith("Cache.") || "SQL".equals(type)) {//如果以Cache.,SQL開頭的則返回Truereturn true;} else {return false;}} else {return true;}//看到這里,也就是說,"Cache","SQL"開頭的Transaction消息,或者非Transaction消息,認為是atomicMessage. }......public void send(MessageTree tree) {if (isAtomicMessage(tree)) {//如果符合atomicMessageboolean result = m_atomicTrees.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);//隊列溢出處理 }} else {boolean result = m_queue.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}}} ......?
?
public class DefaultMessageQueue implements MessageQueue {private BlockingQueue<MessageTree> m_queue;private AtomicInteger m_count = new AtomicInteger();public DefaultMessageQueue(int size) {m_queue = new LinkedBlockingQueue<MessageTree>(size);}@Overridepublic boolean offer(MessageTree tree) {return m_queue.offer(tree);}@Overridepublic boolean offer(MessageTree tree, double sampleRatio) {if (tree.isSample() && sampleRatio < 1.0) {//如果這個消息是sample,并且sampleRation大于1if (sampleRatio > 0) {//這段邏輯就是按采樣率去剔除一些消息,只選取其中一部分進行后續(xù)的消費上傳。int count = m_count.incrementAndGet();if (count % (1 / sampleRatio) == 0) {return offer(tree);}}return false;} else {//不做采樣過濾,放入隊列return offer(tree);}}@Overridepublic MessageTree peek() {return m_queue.peek();}@Overridepublic MessageTree poll() {try {return m_queue.poll(5, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {return null;}}@Overridepublic int size() {return m_queue.size();} }?
這個后臺進程的消費動作:
......private boolean shouldMerge(MessageQueue trees) {MessageTree tree = trees.peek();//獲取對頭元素,非移除if (tree != null) {long firstTime = tree.getMessage().getTimestamp();int maxDuration = 1000 * 30;//消息在30s內生成,或者隊列擠壓消息超過200,則需要mergeif (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) {return true;}}return false;}......@Overridepublic void run() {while (true) {if (shouldMerge(m_atomicTrees)) {MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees隊列中的消息merge為一條消息樹boolean result = m_queue.offer(tree);//放入m_queue隊列,等待cat-TcpSocketSender線程正常消費if (!result) {logQueueFullInfo(tree);}} else {try {Thread.sleep(5);} catch (InterruptedException e) {break;}}}}.....private MessageTree mergeTree(MessageQueue trees) {int max = MAX_CHILD_NUMBER;DefaultTransaction tran = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);//增加merge處理埋點MessageTree first = trees.poll();//從隊列頭部移除 tran.setStatus(Transaction.SUCCESS);tran.setCompleted(true);tran.addChild(first.getMessage());tran.setTimestamp(first.getMessage().getTimestamp());long lastTimestamp = 0;long lastDuration = 0;//這段邏輯就是不停從這個m_atomicTrees隊列頭部拿去messsage,并使用同一個messageId,把隊列中所有的消息合并為一條Transaction消息。while (max >= 0) {MessageTree tree = trees.poll();//接著 從隊列頭部移除if (tree == null) {tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration);break;}lastTimestamp = tree.getMessage().getTimestamp();if(tree.getMessage() instanceof DefaultTransaction){lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis();} else {lastDuration = 0;}tran.addChild(tree.getMessage());m_factory.reuse(tree.getMessageId());max--;}((DefaultMessageTree) first).setMessage(tran);return first;}4.1.8?TcpSocketSender-ChannelManager 后臺線程
這個線程是通過服務端配置的路由ip,10s輪詢一次,當滿足自旋n(n = m_count % 30)次,去檢查路由服務端ip是否變動,并保證連接正常。典型的拉取配置信息機制。
1)客戶端跟服務端連接建立,分兩步:
- 初始ChannelMananger的時候 ;
- ChannelManager異步線程,每隔10秒做一次檢查。
初始ChannelMananger的時候
實例化ChannelManager的時候,根據配置的第一個server,從遠程服務器讀取服務器列表,如果能讀取到,則順序建立連接,直到建立成功為止;如果不能讀到,則根據本地配置的列表,逐個建立連接,直到成功為止。
ChannelMananger異步線程,每隔10秒做一次檢查
- 檢查Server列表是否變更
每間隔10s,檢查當前channelFuture是否活躍,活躍,則300s檢查一次,不活躍,則執(zhí)行檢查。檢查的邏輯是:比較本地server列表跟遠程服務提供的列表是否相等,不相等則根據遠程服務提供的server列表順序的重新建立第一個能用的ChannelFuture
- 查看當前客戶端是否有積壓,或者ChannelFuture是否被關閉
如果有積壓,或者關閉掉了,則關閉當前連接,將activeIndex=-1,表示當前連接不可用。
- 重連默認Server
從0到activeIndex中找一個能連接的server,中心建立一個連接。如果activeIndex為-1,則從整個的server列表中順序的找一個可用的連接建立連接。
2)ChannelManager實例化,建立Netty連接邏輯
客戶端實例化DefaultTransportManager對象時,會按照如下流程先實例化m_tcpSocketSender,接著實例化ChannelManager。ChannelManager管理對服務端的netty連接。 實例化流程如下:
?ChannelManager通過ChannelHolder把netty的ChannnelFuture封裝起來。ChannelHolder結構如下:
public static class ChannelHolder {/*** 當前活躍的channelFuture*/private ChannelFuture m_activeFuture;/*** 當前server在m_serverAddresses中的第幾個*/private int m_activeIndex = -1;/*** 當前活躍的ChannelFuture對應的配置*/private String m_activeServerConfig;/*** 從配置文件中讀取的服務端列表*/private List<InetSocketAddress> m_serverAddresses;/*** 當前活躍的ChannelFutre對應的ip*/private String m_ip; /** * 連接從第一次初始化開始,是否發(fā)生過變更 */ private boolean m_connectChanged;//省略其它的代碼 }
3)ChannelManager內部異步線程,動態(tài)切換Netty連接邏輯。
ChannelManager內部每隔10秒鐘,檢查netty連接。這部分代碼如下:
public void run() {while (m_active) {/*** make save message id index asyc* 本地存儲index,和 時間戳,防止重啟,導致本地的消息id重了*/m_idfactory.saveMark();/*** 檢查本地初始化的服務列表跟遠程的服務列表是否有差異,如果有差異,則取遠程第一個能建立連接的server,建立一個新的連接,關閉舊的連接*/checkServerChanged();ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses(); /** * 檢查當前channelFuture是否有消息積壓(本地隊列長度超過4990),或者 channelFuture不是開的 */ doubleCheckActiveServer(activeFuture); /** * 從serverAddresses列表里面,重新順序選一個,重新連接 */ reconnectDefaultServer(activeFuture, serverAddresses); try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { // ignore } } }?總結:服務端沒有做到負載均衡,連接會慢慢連接到server列表里面第一個可用的server上。
4.1.9 StatusUpdateTask
CatClientModule在加載過程中會從StatusUpdateTask中啟動一個線程來每隔一段時間發(fā)送一個HeartBeatMessage,其中包括了客戶端能拿到的各種信息,包括CPU,Memory,Disk等等,開發(fā)者也可以通過實現StatusExtension接口的方式來實現對于HeartBeatMessage發(fā)送內容的擴展。
這個線程很簡單,類似傳統的agent,每分鐘上報關于應用的各種信息(OS、MXBean信息等等)。而且,在每次線程啟動時上報一個Reboot消息表示重啟動。
其中比較重要的實現信息收集的是這行代碼
StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars); status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));m_statistics包含的是已經發(fā)送過信息的容量,m_jars是通過classLoader加載的jar包名稱,StatusInfoCollector通過大量訪問者模式的代碼實現了將各種指標set到status中的功能,之后將status封裝到HeartBeatMessage中,按照一般對于message的處理流程,flush到消息傳輸層中
4.1.10?MessageId的設計
CAT消息的Message-ID格式applicationName-0a010680-375030-2,CAT消息一共分為四段:?
第一段是應用名applicationName。?
第二段是當前這臺機器的IP的16進制格式:
if (m_ipAddress == null) {String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();List<String> items = Splitters.by(".").noEmptyItem().split(ip);byte[] bytes = new byte[4];for (int i = 0; i < 4; i++) {bytes[i] = (byte) Integer.parseInt(items.get(i));}StringBuilder sb = new StringBuilder(bytes.length / 2);for (byte b : bytes) {//1.一個byte 8位//2.先獲取高4位的16進制字符//3.在獲取低4位的16進制數 sb.append(Integer.toHexString((b >> 4) & 0x0F));//通常使用0x0f來與一個整數進行&運算,來獲取該整數的最低4個bit位sb.append(Integer.toHexString(b & 0x0F));}m_ipAddress = sb.toString();第三段的375030,是系統當前時間除以小時得到的整點數。?
第四段的2,是表示當前這個客戶端在當前小時的順序遞增號(AtomicInteger自增,每小時結束后重置)。
?
總之,同一個小時內、同一個domain、同一個ip , messageId的唯一性需要 AtomicInteger保證。
4.2 cat-home設計
4.2.1 服務端初始化
1)Servlet容器加載、啟動
CAT目前是使用war包放入Servlet容器(如:tomcat或者jetty,以下假設使用tomcat容器)中的方式部署啟動。?
熟悉servlet容器的同學應該知道,容器啟動時會讀取每個Context(可理解為web工程)中的web.xml然后啟動Servlet等其他組件。
在cat-home模塊中的web.xml中可以看到,除了容器默認的Servlet之外,tomcat啟動時會啟動CatServlet、MVC這兩個Servlet(因為load-on-startup>0,也就是會調用init方法初始化):
<web-app><filter>...</filter><servlet><servlet-name>cat-servlet</servlet-name><servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class><load-on-startup>1</load-on-startup></servlet><servlet><servlet-name>mvc-servlet</servlet-name><servlet-class>org.unidal.web.MVC</servlet-class><init-param><param-name>cat-client-xml</param-name><param-value>client.xml</param-value></init-param><init-param><param-name>init-modules</param-name><param-value>false</param-value></init-param><load-on-startup>2</load-on-startup></servlet><filter-mapping>...</filter-mapping> <servlet-mapping>...</servlet-mapping> <jsp-config>...</jsp-config></web-app>?
2)com.dianping.cat.servlet.CatServlet
按照web.xml中Servlet的加載順序CatServlet會優(yōu)先于MVC完成初始化。?
CatServlet的邏輯基本可以概括為如下兩條線:
CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) ——>com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)——>TCPSocketReceiver(netty服務器)CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) ——>com.dianping.cat.***Module.execute(ModuleContext ctx)(完成各個模塊的初始化)?
com.dianping.cat.servlet.CatServlet.init(ServletConfig servletConfig)
public void init(ServletConfig config) throws ServletException {super.init(config);try {//1.plexus IOC容器初始化(根據components.xml的設定完成IOC初始化)if (m_container == null) {m_container = ContainerLoader.getDefaultContainer();}//2.用來打印日志的m_logger對象實例化(根據plexus.xml設定完成實例化)m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(getClass().getName());//3.初始化CAT-Server必備的組件模塊:cat-home\cat-consumer\cat-core initComponents(config);} catch (Exception e) {if (m_logger != null) {m_logger.error("Servlet initializing failed. " + e, e);} else {System.out.println("Servlet initializing failed. " + e);e.printStackTrace(System.out);}throw new ServletException("Servlet initializing failed. " + e, e);}}進入initComponents(config); 我們繼續(xù)看下為了啟動server服務,各個cat-*模塊如何初始化。
com.dianping.cat.servlet.CatServlet.initComponents(ServletConfig servletConfig)
@Overrideprotected void initComponents(ServletConfig servletConfig) throws ServletException {try {//ModuleContext ctx這個對象里主要作用://1.持有 plexus IOC 容器的引用;//2.持有 logger對象引用,用來打日志。//3.持有 需要使用到的配置文件路徑。//比如:cat-server-config-file=\data\appdatas\cat\server.xml //cat-client-config-file=\data\appdatas\cat\client.xml ModuleContext ctx = new DefaultModuleContext(getContainer());ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");ctx.setAttribute("cat-client-config-file", clientXmlFile);ctx.setAttribute("cat-server-config-file", serverXmlFile);//通過查找啟動cat-home必要的模塊,然后依次初始化各個模塊。 initializer.execute(ctx);} catch (Exception e) {m_exception = e;System.err.println(e);throw new ServletException(e);}}org.unidal.initialization.DefaultModuleInitializer.execute(…). 執(zhí)行各個模塊的初始化
@Overridepublic void execute(ModuleContext ctx) {//我們的topLevelModule是cat-home模塊,通過這個模塊去查找需要依賴的其他模塊并初始化他們。Module[] modules = m_manager.getTopLevelModules();execute(ctx, modules);}@Overridepublic void execute(ModuleContext ctx, Module... modules) {Set<Module> all = new LinkedHashSet<Module>();info(ctx, "Initializing top level modules:");for (Module module : modules) {info(ctx, " " + module.getClass().getName());}try {//1.根據頂層Module獲取到下層所有依賴到的modules,并分別調用他們的setup方法 expandAll(ctx, modules, all);//2.依次調用module實現類的execute方法for (Module module : all) {if (!module.isInitialized()) {executeModule(ctx, module, m_index++);}}} catch (Exception e) {throw new RuntimeException("Error when initializing modules! Exception: " + e, e);}}private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {if (modules != null) {for (Module module : modules) {expandAll(ctx, module.getDependencies(ctx), all);if (!all.contains(module)) {if (module instanceof AbstractModule) {((AbstractModule) module).setup(ctx);//調用各個module實現類的setup }//all 最終元素以及順序://CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule all.add(module);}}}}我們看到cat-home模塊是一個頂層模塊,接著根據這個模塊找到其他依賴模塊?(CatClientModule\CatConsumerModule\CatCoreModule),并且依次調用setup方法,解析依次調用模塊的execute方法完成初始化。
Modules之間的設計使用了典型的模板模式。?
模塊依賴關系:?
null<——CatClientModule<——CatClientModule<——CatCoreModule<——CatConsumerModule<——CatHomeModule
接著著重看一下子類 CatHomeModule的setup的實現。注意除了這個子類,Module的子類steup()方法為空?。
com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)
@Overrideprotected void setup(ModuleContext ctx) throws Exception {File serverConfigFile = ctx.getAttribute("cat-server-config-file");//獲取server.xml文件的路徑//通過 plexus IOC 初始化一個 ServerConfigManager beanServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);//通過 plexus IOC 初始化一個 TcpSocketReceiver beanfinal TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);//加載\...\server.xml中的配置 serverConfigManager.initialize(serverConfigFile);//啟動TCPSocketReceiver,就是一個典型的 netty 事件驅動服務器,用來接收客戶端的TCP長連接請求 messageReceiver.init();//增加一個進程觀察者,在這個JVM關閉時回調Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {messageReceiver.destory();}});}?
各個模塊的啟動,executeModule?
各個模塊setup就說到這里,setup完成后,會依次調用module.execute(…)用來完成各個模塊的啟動。
依次調用:?
CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule.其中只有CatClientModule、CatHomeModule實現了有效的execute方法。
com.dianping.cat.CatClientModule.execute(ModuleContext ctx)?
注意:這里的客戶端是用來監(jiān)控服務端的,具體client的解析可以參考cat-client設計
@Overrideprotected void execute(final ModuleContext ctx) throws Exception {ctx.info("Current working directory is " + System.getProperty("user.dir"));// initialize milli-second resolution level timer MilliSecondTimer.initialize();// tracking thread start/stop// Threads用來對線程做管理的類。這里默認給每個新建的線程加上監(jiān)聽器或者說是觀察者Threads.addListener(new CatThreadListener(ctx));// warm up Cat: setContainer Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());// bring up TransportManager:實例化這個類ctx.lookup(TransportManager.class);//ClientConfigManager對象是加載了client.xml的客戶端配置管理對象。//客戶端的解析不進行展開,請看之前寫的《分布式監(jiān)控CAT源碼解析——cat-client》ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);if (clientConfigManager.isCatEnabled()) {StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);Threads.forGroup("cat").start(statusUpdateTask);LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms }}com.dianping.cat.CatHomeModule.execute(ModuleContext ctx)?
CatHomeModule涉及很多可說的,此處暫時不做展開,繼續(xù)按照Servlet啟動的流程講解。
至此,CatServlet初始化完成了,接下來會初始化org.unidal.web.MVC這個Servlet。?
我們接著看一下另外一個Servlet:mvc-servlet
3)org.unidal.web.MVC
MVC這個Servlet繼承了AbstractContainerServlet,與CatServlet非常類似,均是AbstractContainerServlet 的實現類。這個Servlet顧名思義就是用來處理請求的,類似Spring中的DispatcherServlet,集中分配進入的請求到對應的Controller。
public void init(ServletConfig config) throws ServletException {…}?
與CatServelet一樣,均繼承自父類:
public void init(ServletConfig config) throws ServletException {super.init(config);try {if (m_container == null) {//DefaultPlexusContainer m_container 是單例對象,在CATServlet中已經完成初始化了m_container = ContainerLoader.getDefaultContainer();}m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(getClass().getName());initComponents(config);} ......?
org.unidal.web.MVC.initComponents(ServletConfig config) throws Exception
@Overrideprotected void initComponents(ServletConfig config) throws Exception {// /catString contextPath = config.getServletContext().getContextPath();// /catString path = contextPath == null || contextPath.length() == 0 ? "/" : contextPath;getLogger().info("MVC is starting at " + path); //使用client.xml初始化代表CATClient的com.dianping.cat.Cat對象(如果CAT未被初始化)。 initializeCat(config);initializeModules(config);m_handler = lookup(RequestLifecycle.class, "mvc");m_handler.setServletContext(config.getServletContext());config.getServletContext().setAttribute(ID, this);getLogger().info("MVC started at " + path);}至此,容器啟動成功,http://localhost:2281/cat/r?進入頁面。
4.2.2 報表展示
對于實時報表,直接通過HTTP請求分發(fā)到相應消費機,待結果返回后聚合展示(對分區(qū)數據做聚合);歷史報表則直接取數據庫并展示。
?
4.3 cat-core設計
Server的主要入口是cat-core包中的RealTimeConsumer類
RealTimeConsumer類以及RealTimeConsumer的依賴類層級結構如下:
Cat Server功能為解碼消息,解碼后按照固定時間間隔分片,將消息分發(fā)到各個Analyzer的消費隊列中,然后由各自的Analyzer進行消費。
TCPSocketReceiver,DefaultMessageHandler
TCPSocketReceiver主要負責使用netty建立server端,接受到tcp請求后將其解碼,通過DefaultMessageHandler將Message交由RealTimeConsumer消費
RealTimeConsumer
在內部初始化PeriodManager,并啟動periodManager的線程,該線程會不斷根據時間間隔生成新的Period對象,并啟動Period對象內的多個PeriodTask線程,PeriodTask線程會根據持有的Anaylyzer和MessageQueue進行消費
當RealTimeConsumer終止時會調用doCheckPoint方法
PeriodManager,PeriodStrategy
PeriodManager主要是以時間切片作為策略來拆分整體數據的,所以PeriodManager中包含的List類型是根據PeriodStrategy中的時間策略獲得的。PeriodManager實現Task接口,他的主要任務是在規(guī)定的存活期內,每隔一段固定的時間都會創(chuàng)建新的Period對象,并啟動Period對象內的多個消費線程
Period
Period中主要包含了一個類型為Map < String, List < PeriodTask > >的屬性,該屬性根據MessageAnalyzerManager構建。Map < String, List < PeriodTask > >屬性是一個在該Period時間片內,不同類型的Analyzer與各個PeriodTask之間的對應關系,因為偶爾有同一個Analyzer會有多個PeriodTask一同消費,根據Hash進行分配的情況,所以value的類型為List。
PeriodTask是消費Message的消費單元,每個PeriodTask中包含了一個queue,一個analyzer,PeriodTask會一直從queue中取出Message讓analyzer進行消費
distribute方法實現了將Message分發(fā)到該Period中所有PeriodTask中的功能
start方法啟動各個PeriodTask線程,對各個PeriodTask的queue中的Message開始消費
finish方法調用各個PeriodTask的finish方法
MessageAnalyzerManager
持有Map < Long, Map < String, List < MessageAnalyzer > > >,該屬性包含了各個Analyzer的實例,每個實例可以通過——時間片——analyzer類型/名字來獲得,analyzer的數量由各個MessageAnalyzer中getAnalyzerCount獲得
PeriodTask
PeriodTask實現Task接口,每個PeriodTask會持有自己專屬的analyzer和queue,在線程啟動后會調用analyzer的consume方法來消費queue。在調用finish時會調用checkPoint方法,執(zhí)行analyzer實現的檢查點方法
4.3.1?CAT服務端接收MessageTree
消息通過netty發(fā)送到服務端,經過MessageDecoder將字節(jié)流轉換成文本,PlainTextMessageCode將文本消息轉換成一棵消息樹,DefaultMessageHandler調用RealtimeConsumer實時消費消息樹,RealtimeConsumer調用Period(沒有就生成),將消息樹分發(fā)對應的PeriodTask的隊列里面,供對應的Analyzer處理。
4.3.2?EventAnalyzer介紹
Cat server中,以PeriodTask為消費單元,使用MessageAnalyzer進行消息消費,本篇介紹一下EventAnalyzer的功能,并捎帶介紹一下MessageAnalyzer的實現
MessageAnalyzer接口實現結構如下
4.3.3?DumpAnalyzer介紹
?CatServer中,可以定時把消息存儲到hdfs中,dumpAnalyzer就是用來支持這種功能的
LocalMessageBucketManager
ConcurrentHashMap <String, LocalMessageBucket > m_buckets主要根據持久化的日志路徑保存LocalMessageBucket對象
BlockingQueue < MessageBlock > m_messageBlocks 保存MessageItem經過gzip壓縮的block
ConcurrentHashMap < Integer, LinkedBlockingQueue < MessageItem > > m_messageQueues 在內存中持有各個gzip執(zhí)行線程壓縮隊列對象,根據線程的index作為索引
BlockDumper負責將gzip壓縮過的block持久化到本地文件
MessageGzip負責定時壓縮MessageItem
LogviewUploader負責上傳logview到hdfs
- archive 把傳入時間范圍內的,將bucket已經壓縮到block,但是沒有flush的MessageBlock放入m_messageBlocks消費隊列中,供BlockDumper,LogviewUploader消費
- initialize 啟動blockDumper線程,啟動LogviewUploader線程,啟動若干個MessageGzip線程,各自干活
- loadMessage 從文件中將消息加載出來
- storeMessage 根據domain,ip等的hash將MessageItem放入MessageGzip線程的消費隊列,供其壓縮生成MessageBlock
LocalMessageBucket
對Message進行讀寫,壓縮,解壓縮的處理單元,LocalMessageBucket使用basePath/{date,yyyyMMdd}/{date,HH}/{name}的路徑生成壓縮文件,對消息的各種讀寫壓縮操作大多都與該文件有關。
- storeMessage 通過傳入的MessageItem的ByteBuf生成壓縮過的MessageBlock對象
- findById 根據MessageId加載該Bucket對應文件中的MessageTree對象
MessageBlock
壓縮后的消息信息的持有者,index是Message的id,size是對應message的size
MessageGzip
MessageItem的消費者,消費存儲在ConcurrentHashMap <Integer, LinkedBlockingQueue > m_messageQueues中的MessageItem,將MessageItem壓縮成MessageBlock,每個MessageGzip線程都有一個自己的Queue,Integer是每個線程對應queue的索引
BlockDumper
MessageBlock的消費者,消費存儲在BlockingQueue m_messageBlocks中的MessageBlock,將其通過LocalMessageBucket持久化到本地
##LogViewUploader
上傳logView到hdfs
4.3.4?TaskConsumer介紹
后臺的Analyzer在歸檔時會生成Task的記錄到數據庫中,server在CatConsumerModule中初始化過程中啟動了TaskConsumer線程來處理數據庫中的記錄
ReportFacade
此類會在初始化時,將注冊在plexus的所有builder加載到m_reportBuilders中,在執(zhí)行builderReport時,根據傳入的task的reportName查找對應的TaskBuilder,根據時間片,domain以及reportName查詢已經入庫的report基本記錄,再將report的基本記錄合并,并將合并后的report,生成的graph入庫。
在生成聚合report的過程中,會根據層級樹自上至下歸并遞歸生成,比如月的根據周,周根據日,日根據小時生成。
EventReportBuilder
基于時間片,調用EventService實現入庫聚合報表,入庫聚合graph的功能
EventService
基于domain實現報表插入,報表查詢等功能,是業(yè)務執(zhí)行的基本單元
4.3.5?TcpSocketReceiver
在CAT-Server啟動時會啟動Netty的Nio 多線程Reactor模塊來接收客戶端的請求:
- 一個Accept線程池(Main Reactor Thread Pool )用來處理連接操作(通常還可以在這各Accept中加入權限驗證、名單過濾邏輯);
- 接著Accept連接成功的socket請求被轉發(fā)到 專門處理IO操作的線程池(Sub Reactor Thread Pool ,實現異步);在這里做了消息的解碼處理;
- 再接著,解碼處理后,將消息發(fā)送到每個報表解析器內置的內存隊列中。消息將被異步分發(fā)給各個解析器單獨處理(不存在數據競爭)。
消息的接受是在這個類TcpSocketReceiver.java完成的:
// 在CatHomeModule啟動時被調用public void init() {try {startServer(m_port);} catch (Throwable e) {m_logger.error(e.getMessage(), e);}}/*** 啟動一個netty服務端* @param port* @throws InterruptedException*/public synchronized void startServer(int port) throws InterruptedException {boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");int threads = 24;ServerBootstrap bootstrap = new ServerBootstrap();//linux走epoll的事件驅動模型m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用來做為接受請求的線程池 master線程m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用來做為處理請求的線程池 slave線程 bootstrap.group(m_bossGroup, m_workerGroup);bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//channel初始化設置 @Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decode", new MessageDecoder());//增加消息解碼器 }});// 設置channel的參數bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);bootstrap.childOption(ChannelOption.TCP_NODELAY, true);bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);try {m_future = bootstrap.bind(port).sync();//綁定監(jiān)聽端口,并同步等待啟動完成m_logger.info("start netty server!");} catch (Exception e) {m_logger.error("Started Netty Server Failed:" + port, e);}}啟動netty,對每個客戶端上報的消息都會做解碼處理,從字節(jié)流轉換為消息樹MessageTree tree,接著交給DefaultMessageHandler處理。
public class DefaultMessageHandler extends ContainerHolder implements MessageHandler, LogEnabled {/** MessageConsumer按每個period(整小時一個period)組合了多個解析器,用來解析生產多個報表(如:Transaction、* Event、Problem等等)。一個解析器對象-一個有界隊列-一個整小時時間組合了一個PeriodTask,輪詢的處理這個有界隊列中的消息*/@Injectprivate MessageConsumer m_consumer;private Logger m_logger;@Overridepublic void enableLogging(Logger logger) {m_logger = logger;}@Overridepublic void handle(MessageTree tree) {if (m_consumer == null) {m_consumer = lookup(MessageConsumer.class);//從容器中加載MessageConsumer實例 }try {m_consumer.consume(tree);//消息消費} catch (Throwable e) {m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);}} }?
OMS設計是按照每小時去匯總數據,為什么要使用一個小時的粒度呢??
這個是一個trade-off,實時內存數據處理的復雜度與內存的開銷方面的折中方案。?
在這個小時結束后將生成的Transaction\Event\Problean報表存入Mysql、File(機器根目錄俠)。然而為了實時性,當前小時的報表是保存在內存中的。
PeriodManager 用來管理 OMS單位小時內的各種類型的解析器,包括將上報的客戶端數據派發(fā)給不同的解析器(這種派發(fā)可以理解為訂閱\發(fā)布)。每個解析器,將收到的消息存入內置隊列,并且用單獨的線程去獲取消息并處理。
com.dianping.cat.analysis.PeriodManager
public class PeriodManager implements Task {public void init() {long startTime = m_strategy.next(System.currentTimeMillis());//當前小時的起始時間 startPeriod(startTime);}@Overridepublic void run() {// 1s檢查一下當前小時的Period對象是否需要創(chuàng)建(一般都是新的小時需要創(chuàng)建一個Period代表當前小時)while (m_active) {try {long now = System.currentTimeMillis();//value>0表示當前小時的Period不存在,需要創(chuàng)建一個//如果當前線小時的Period存在,那么Value==0long value = m_strategy.next(now);if (value > 0) {startPeriod(value);} else if (value < 0) {// //當這個小時結束后,會異步的調用endPeriod(..),將過期的Period對象移除,help GCThreads.forGroup("cat").start(new EndTaskThread(-value));}} catch (Throwable e) {Cat.logError(e);}try {Thread.sleep(1000L);} catch (InterruptedException e) {break;}}}//當這個小時結束后,會異步的調用這個方法,將過期的Period對象移除,help GCprivate void endPeriod(long startTime) {int len = m_periods.size();for (int i = 0; i < len; i++) {Period period = m_periods.get(i);if (period.isIn(startTime)) {period.finish();m_periods.remove(i);break;}}}...... }?
消息消費是由MessageConsumer的實現類RealtimeConsumer處理
com.dianping.cat.analysis.RealtimeConsumer.consume(MessageTree tree)
@Overridepublic void consume(MessageTree tree) {String domain = tree.getDomain();String ip = tree.getIpAddress();if (!m_blackListManager.isBlack(domain, ip)) {// 全局黑名單 按domain-iplong timestamp = tree.getMessage().getTimestamp();//PeriodManager用來管理、啟動periodTask,可以理解為每小時的解析器。Period period = m_periodManager.findPeriod(timestamp);//根據消息產生的時間,查找這個小時所屬的對應Periodif (period != null) {period.distribute(tree);//將解碼后的tree消息依次分發(fā)給所有類型解析器} else {m_serverStateManager.addNetworkTimeError(1);}} else {m_black++;if (m_black % CatConstants.SUCCESS_COUNT == 0) {Cat.logEvent("Discard", domain);}}}?
分發(fā)消息給各個解析器(類似向訂閱者發(fā)布消息)?
void com.dianping.cat.analysis.Period.distribute(MessageTree tree)
?
PeriodTask?
每個periodTask對應一個線程,m_analyzer對應解析器處理m_queue中的消息
?
4.3.6?AbstractMessageAnalyzer
@Overridepublic void analyze(MessageQueue queue) {// 解析器在當前小時內自旋,不停從隊列中拿取消息,然后處理while (!isTimeout() && isActive()) {// timeOut:當前時間>小時的開始時間+一小時+三分鐘;// isActive默認為true,調用shutdown后為falseMessageTree tree = queue.poll();// 非阻塞式獲取消息if (tree != null) {try {process(tree);// 解析器實現類 override} catch (Throwable e) {m_errors++;if (m_errors == 1 || m_errors % 10000 == 0) {Cat.logError(e);}}}}// 如果當前解析器以及超時,那么處理完對應隊列內的消息后返回。while (true) {MessageTree tree = queue.poll();if (tree != null) {try {process(tree);} catch (Throwable e) {m_errors++;if (m_errors == 1 || m_errors % 10000 == 0) {Cat.logError(e);}}} else {break;}}}?消費流程圖:
?
總結:
消息發(fā)送到服務端,服務端解碼為 MessageTree準備消費。期間存在一個demon線程,1s檢查一下當前小時的Period對象是否需要創(chuàng)建(一般都是新的小時需要創(chuàng)建一個Period代表當前小時)。
如果當前小時的Period存在,那么我們的MessageTree會被分發(fā)給各個PeriodTask,這里其實就是把消息發(fā)送到每個PeriodTask中的內存隊列里,然后每個Task異步去消費。就是通過使用Queue實現了解耦與延遲異步消費。
每個PeriodTask持有MessageAnalyzer analyzer(Transaction\Event\Problean…每種報表都對應一個解析器的實現類)、MessageQueue queue對象,PeriodTask會不停地解析被分發(fā)進來的MessageTree,形成這個解析器所代表的報表。
當前時間進入下個小時,會創(chuàng)建一個新的當前小時的Period,并且異步的remove之前的Period。
注意,這里有個比較坑的地方是,作者沒有使用線程池,每小時各個解析器的線程并沒有池化,而是直接銷毀后再次創(chuàng)建!
?
4.4 cat-consumer設計
4.4.1?報表部分數值計算公式
1)數據結構
TRANSACTIONREPORT報表的數據結構如下:
?
CROSSREPORT內存報表數據結構:
?
數據示例:
<?xml version="1.0" encoding="utf-8"?> <cross-report domain="monitor-cat" startTime="2017-12-28 18:00:00" endTime="2017-12-28 18:59:59"><domain>monitor-cat</domain><domain>monitor-dubbo</domain><ip>10.15.83.181</ip><local id="10.15.83.181"><remote id="10.15.83.181(monitor-cat):Pigeon.Client" role="Pigeon.Client" app="monitor-cat" ip="10.15.83.181(monitor-cat)"><type id="PigeonService" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="28.89" tps="0.00"><name id="DubboProviderService.getProviderServiceName" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="28.89" tps="0.00"/></type></remote><remote id="10.15.83.181(monitor-dubbo):Pigeon.Server" role="Pigeon.Server" app="monitor-dubbo" ip="10.15.83.181(monitor-dubbo)"><type id="PigeonCall" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="461.26" tps="0.00"><name id="DubboProviderService.getProviderServiceName" totalCount="19" failCount="0" failPercent="0.00" avg="0.00" sum="461.26" tps="0.00"/></type></remote></local> </cross-report>?
2)計算95線、99線
/*** 統計95線,99.9線。* 思路:* 求請求的總數,假設是100,求線余下的數目,如果是95線,那余下是5,* 將之前統計的durations放進treeMap里面,倒序* 從前向后遍歷,找到第5個元素,拿到對應的值** 基本上是按照95線的定義來取數的* @param durations* @param percent* @return*/private double computeLineValue(Map<Integer, AllDuration> durations, double percent) {int totalCount = 0;Map<Integer, AllDuration> sorted = new TreeMap<Integer, AllDuration>(TransactionComparator.DESC);sorted.putAll(durations);for (AllDuration duration : durations.values()) {totalCount += duration.getCount();}int remaining = (int) (totalCount * (100 - percent) / 100);for (Entry<Integer, AllDuration> entry : sorted.entrySet()) {remaining -= entry.getValue().getCount();if (remaining <= 0) {return entry.getKey();}}return 0.0;}3)求方差
/*** 求方差 value = 求和(xi * xi)/count - 均值*均值* value = 開方(value)* @param count* @param avg* @param sum2* @param max* @return*/double std(long count, double avg, double sum2, double max) {double value = sum2 / count - avg * avg;if (value <= 0 || count <= 1) {return 0;} else if (count == 2) {return max - avg;} else {return Math.sqrt(value);}}?4.5 存儲設計(cat-hadoop)
存儲主要分成兩類:一個是 報表(Transaction、Event、Problem….),一個是logview,也是就是原始的MessageTree。
所有原始消息會先存儲在本地文件系統,然后上傳到HDFS中保存;而對于報表,因其遠比原始日志小,則以K/V的方式保存在MySQL中。
報表存儲:在每個小時結束后,將內存中的各個XML報表 保存到MySQL、File(/data/appdatas/cat/bucket/report…)中
logView的保存有后臺線程(默認20個,Daemon Thread [cat-Message-Gzip-n])輪詢處理:會間隔一段時間后從消息隊列中拿取MessageTree,并進行編碼壓縮,保存到\data\appdatas\cat\bucket\dump\年月\日\domain-ip1-ip2-ipn目錄下。
com.dianping.cat.consumer.dump.LocalMessageBucketManager.MessageGzip.run()
@Overridepublic void run() {try {while (true) {MessageItem item = m_messageQueue.poll(5, TimeUnit.MILLISECONDS);if (item != null) {m_count++;if (m_count % (10000) == 0) {gzipMessageWithMonitor(item);//數量達到10000的整數倍,通過上報埋點記錄監(jiān)控一下} else {gzipMessage(item);}}}} catch (InterruptedException e) {// ignore it }}private void gzipMessage(MessageItem item) {try {MessageId id = item.getMessageId();String name = id.getDomain() + '-' + id.getIpAddress() + '-' + m_localIp;String path = m_pathBuilder.getLogviewPath(new Date(id.getTimestamp()), name);LocalMessageBucket bucket = m_buckets.get(path);if (bucket == null) {synchronized (m_buckets) {bucket = m_buckets.get(path);if (bucket == null) {bucket = (LocalMessageBucket) lookup(MessageBucket.class, LocalMessageBucket.ID);bucket.setBaseDir(m_baseDir);bucket.initialize(path);m_buckets.put(path, bucket);}}}DefaultMessageTree tree = (DefaultMessageTree) item.getTree();ByteBuf buf = tree.getBuffer();MessageBlock block = bucket.storeMessage(buf, id);if (block != null) {if (!m_messageBlocks.offer(block)) {m_serverStateManager.addBlockLoss(1);Cat.logEvent("DumpError", tree.getDomain());}}} catch (Throwable e) {Cat.logError(e);}}public MessageBlock storeMessage(final ByteBuf buf, final MessageId id) throws IOException {synchronized (this) {int size = buf.readableBytes();m_dirty.set(true);m_lastAccessTime = System.currentTimeMillis();m_blockSize += size;m_block.addIndex(id.getIndex(), size);buf.getBytes(0, m_out, size); // write buffer and compress itif (m_blockSize >= MAX_BLOCK_SIZE) {return flushBlock();} else {return null;}}}?
4.5.1 logView的文件存儲設計
6. 客戶端接入?
6.1 mybatis接入
使用ORM插件-MybatisPlugin
效果:
6.2 log4j日志接入
Throwable(及其子類)異常上報,使用log日志框架的appender機制
<root level="INFO"><appender-ref ref="STDOUT" /><appender-ref ref="CatAppender" /></root>?
效果:
?
6.3?URL請求監(jiān)控
使用Filter機制實現,并實現URL聚合等其他功能。
6.4?代碼級別監(jiān)控
aop 做監(jiān)控,內部封裝,將aop-expression暴露出來給用戶配置(填寫需要監(jiān)控的實現類范圍)。字節(jié)碼織入技術。
?
6.5?接口抽象\靜態(tài)綁定
為了向后兼容,輕松替換埋點的實現。比如切換為zipkin或者其他產品的API。從而減少對業(yè)務線的影響。?
仿照slf4j的設計,使用了靜態(tài)綁定。
6.6?分布式調用鏈
CatCrossHttpClient作為 httpClient的代理類暴露給用戶使用。
@Component public class CatCrossHttpClientextends HttpClientProxy {private String serverToCall;public void setServerToCall(String serverToCall){this.serverToCall = serverToCall;}public String execute(HttpRequestBase request, int socketTimeout, int connectTimeout)throws Exception{Transaction t = Cat.newTransaction("PigeonCall", request.getURI().getPath());createCrossReport(request, this.serverToCall);Cat.Context context = new CatContext();Cat.logRemoteCallClient(context);request.setHeader("_catRootMessageId", context.getProperty("_catRootMessageId"));request.setHeader("_catParentMessageId", context.getProperty("_catParentMessageId"));request.setHeader("_catChildMessageId", context.getProperty("_catChildMessageId"));request.setHeader("ClientApplication.Name", Cat.getManager().getDomain());try{String ret = super.execute(request, socketTimeout, connectTimeout);t.setStatus("0");return ret;}catch (Exception e){Cat.logEvent("HTTP_REST_CAT_ERROR", request.getURI().toString(), e.getMessage(), " ");t.setStatus(e);throw e;}finally{t.complete();}}private void createCrossReport(HttpRequestBase request, String serverToCall)throws Exception{Cat.logEvent("PigeonCall.app", serverToCall, "0", " ");Cat.logEvent("PigeonCall.server", request.getURI().getHost(), "0", " ");Cat.logEvent("PigeonCall.port", String.valueOf(request.getURI().getPort()), "0", " ");MessageTree tree = Cat.getManager().getThreadLocalMessageTree();((DefaultMessageTree)tree).setDomain(Cat.getManager().getDomain());((DefaultMessageTree)tree).setIpAddress(InetAddress.getLocalHost().getHostAddress());} }?
當有外部請求進來,通過如下的Filter判斷請求中是否帶有分布式調用鏈埋點,如果有就繼續(xù)構造這個鏈條。當然分布式調用鏈得以使用的前提是被監(jiān)控的應用中有引入這兩個功能!
public class HttpCatCrossFilterimplements Filter {private static final Logger logger = LoggerFactory.getLogger(HttpCatCrossFilter.class);public void doFilter(ServletRequest req, ServletResponse resp, FilterChain filterChain)throws IOException, ServletException{HttpServletRequest request = (HttpServletRequest)req;if (isCatTracing(request)){String requestURI = request.getRequestURI();Transaction t = Cat.newTransaction("PigeonService", requestURI);try{Cat.Context context = new CatContext();context.addProperty("_catRootMessageId", request.getHeader("_catRootMessageId"));context.addProperty("_catParentMessageId", request.getHeader("_catParentMessageId"));context.addProperty("_catChildMessageId", request.getHeader("_catChildMessageId"));Cat.logRemoteCallServer(context);createCrossReport(request, t);filterChain.doFilter(req, resp);t.setStatus("0");}catch (Exception e){logger.error("Get cat msgtree error :" + e);Cat.logEvent("HTTP_REST_CAT_ERROR", request.getRequestURL().toString(), e.getMessage(), " ");t.setStatus(e);}finally{t.complete();}}else{filterChain.doFilter(req, resp);}}private void createCrossReport(HttpServletRequest request, Transaction t)throws Exception{Cat.logEvent("PigeonService.app", request.getHeader("ClientApplication.Name"), "0", " ");Cat.logEvent("PigeonService.client", request.getRemoteAddr(), "0", " ");MessageTree tree = Cat.getManager().getThreadLocalMessageTree();((DefaultMessageTree)tree).setDomain(Cat.getManager().getDomain());((DefaultMessageTree)tree).setIpAddress(InetAddress.getLocalHost().getHostName());}public void init(FilterConfig arg0)throws ServletException{}public void destroy() {}private boolean isCatTracing(HttpServletRequest request){return (null != request.getHeader("_catRootMessageId")) && (null != request.getHeader("_catParentMessageId")) && (null != request.getHeader("_catChildMessageId"));} }?
效果:
7.?各開源監(jiān)控對比
zipkin:
優(yōu)點:分布式調用鏈理論的實現系統。最大的特點是分布式調用鏈。Spring Cloud Sleuth 可以方便的對zipkin元數據進行采集。?
缺點:功能單一,監(jiān)控維度、監(jiān)控信息不夠豐富。沒有告警功能。
pinpoint:
優(yōu)點:使用字節(jié)碼織入技術,對用戶完全透明,實現自動埋點。可展示代碼級別監(jiān)控。?
缺點:? 功能不足夠豐富。對于其他非java程序,實現客戶端難度大。
Cat:??
優(yōu)點:功能豐富,多模型報表展示。可展示代碼級別監(jiān)控。以及特殊業(yè)務數據監(jiān)控。支持多語言客戶端。多數情況可以替代日志的查看。?
缺點:? 手動埋點,需要改造才能減少埋點的侵入性。
?
8. 技術代碼賞析
8.1?MilliSecondTimer
/*** This timer provides milli-second precise system time.*/ public class MilliSecondTimer {private static long m_baseTime;private static long m_startNanoTime;private static boolean m_isWindows = false;public static long currentTimeMillis() {if (m_isWindows) {if (m_baseTime == 0) {initialize();}long elipsed = (long) ((System.nanoTime() - m_startNanoTime) / 1e6);return m_baseTime + elipsed;} else {return System.currentTimeMillis();}}public static void initialize() {String os = System.getProperty("os.name");if (os.startsWith("Windows")) {m_isWindows = true;m_baseTime = System.currentTimeMillis();while (true) {LockSupport.parkNanos(100000); // 0.1 mslong millis = System.currentTimeMillis();if (millis != m_baseTime) {m_baseTime = millis;m_startNanoTime = System.nanoTime();break;}}} else {m_baseTime = System.currentTimeMillis();m_startNanoTime = System.nanoTime();}} }System.currentTimeMillis()返回的毫秒,這個毫秒其實就是自1970年1月1日0時起的毫秒數。
System.nanoTime()返回的是納秒,nanoTime而返回的可能是任意時間,甚至可能是負數。
System.currentTimeMillis調用的是native方法,使用的是系統的時間,每個JVM對應的應該是相同的,但因為具體的取值依賴于操作系統的實現,不同JVM間可能會有略微的差異。
System.nanoTime每個JVM維護一份,和系統時間無關,可用于計算時間間隔,比System.currentTimeMillis的精度要高。
修改了系統時間會對System.currentTimeMillis造成影響,而對System.nanoTime沒有影響。修改系統時間后會有如下效果:Timmer有影響,Thread.sleep有影響,ScheduledThreadPoolExecutor無影響,可以查看方法的實現調用的是System.currentTimeMillis還是System.nanoTime。
java修改系統時間:
- windows環(huán)境下:
- linux環(huán)境下:
Linux上獲取的時間不正確,總是相差幾小時考慮時差的問題,修改/etc/sysconfig/clock。
?
9. Q&A
9.1 為什么基于ThreadLocal收集消息?
CAT客戶端在收集端數據方面使用ThreadLocal(線程局部變量),是線程本地變量。保證了線程安全。
業(yè)務方在處理業(yè)務邏輯時基本都是在一個線程內部調用后端服務、數據庫、緩存等,將這些數據拿回來再進行業(yè)務邏輯封裝,最后將結果展示給用戶。所以將監(jiān)控請求作為一個監(jiān)控上下文存入線程變量就非常合適。
9.2?為什么要使用TCP協議?
AT使用了TCP協議上報消息(引入了netty框架)。那么為什么不適用http協議上報呢?
選擇TCP的理由:對于客戶端的數據采集盡量降低性能損耗,TCP協議比HTTP協議更加輕量級(比如TCP不需要header等額外的損耗),在高qps的場景下具備明顯的性能優(yōu)勢。
另外,CAT的設計也不需要保留一個?Http鏈接供外部調用,這樣的埋點方式效率低下,并不考慮。
?
10. 自己實現小工具
10.1 采集阿里鷹眼的數據,轉換成CAT消息樹并展示
需求來源:
由于某種原因,現有采用HSF+淘寶TDDL+Diamond+ONS消息+Tair+Search的技術選型,但是缺乏阿里鷹眼的監(jiān)控系統。
解決方案:
上述技術選型,會進行阿里鷹眼需要的數據跟蹤鏈進行打點,記錄日志。
把相關日志通過Flume或者ELK進行采集,投遞到Kafka中,實現一個eagleeye-over-cat的springboot應用,并監(jiān)聽kafka消息。
可參考Spring Cloud?Sleuth兼容方案。
10.2? 自定義小工具,解析應用服務器的CAT dump文件
需求來源:
本地dump文件為壓縮文件,無法直觀查看,排查問題時對用戶為黑盒。
解決方案:
使用MessageBlockReader進行文件讀取,使用PlainTextMessageCodec把字節(jié)解碼為MessageTree。
?
轉載于:https://www.cnblogs.com/yeahwell/p/cat.html
總結
以上是生活随笔為你收集整理的大众点评CAT开源监控系统剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FFmpeg+SDL视频播放器
- 下一篇: java信息管理系统总结_java实现科