日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查

發布時間:2023/10/12 编程问答 181 如意码农
生活随笔 收集整理的這篇文章主要介紹了 11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

11. 用Rust手把手編寫一個wmproxy(代理,內網穿透等), 實現健康檢查

項目 ++wmproxy++

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

健康檢查的意義

健康檢查維持著系統的穩定運行, 極大的加速著服務的響應時間, 并保證服務器不會把消息包轉發到不能響應的服務器上, 從而使系統快速穩定的運轉

在LINUX系統中,系統默認TCP建立連接超時時間為127秒。通常網絡不可達或者網絡連接被拒絕或者網絡連接超時需要耗時的時長較長。此時會超成服務器的響應時間變長很多,而且重復發起不可達的連接嘗試也在耗著大量的IO資源。

當健康檢查介入后,如果短時間內多次建立連接失敗,則暫時判定該地址不可達,狀態設置為不可達。如果此時接收到該地址的請求時直接返回錯誤。大大提高了響應的時間。

所以健康檢查是必不可少的存在。

如何實現

由于健康狀態需要調用的地方可能在任意處需要發起連接的地方,如果通過參數透傳也會涉及到多線程的數據共用,如Arc<Mutex<Data>>,取用的時候也是要通過鎖共用,且編碼的復雜度和理解成本急劇升高,所以此處健康檢查選用的是多線程共用的靜態處理變量。

Rust中的靜態變量

在Rust中,全局變量可以分為兩種:

  • 編譯期初始化的全局變量
  • 運行期初始化的全局變量

編譯期初始化的全局變量有:

const創建的常量,如 const MAX_ID:usize=usize::MAX/2;
static創建的靜態變量,如 static mut REQUEST_RECV:usize=0;

運行期初始化的全局變量有lazy_static用于懶初始化。例如:

lazy_static! {
static ref HEALTH_CHECK: RwLock<HealthCheck> = RwLock::new(HealthCheck::new(60, 3, 2));
}

此外還有

  • 實現你自己的運行時初始化:std::sync::Once + static mut T
  • 單線程運行時初始化的特殊情況:thread_local

我們此處維持一個HealthCheck的全局變量,因為程序是多線程,用thread_local,無法共用其它線程的檢測,不條例預期,所以此處用讀寫鎖來保證全局變量的正確性,讀寫鎖的特點是允許存在多個讀,但如果獲取寫必須保證唯一。

源碼解析,暫時不做主動性的健康檢查

接下來我們看HealthCheck的定義

pub struct HealthCheck {
/// 健康檢查的重置時間, 失敗超過該時間會重新檢查, 統一單位秒
fail_timeout: usize,
/// 最大失敗次數, 一定時間內超過該次數認為不可訪問
max_fails: usize,
/// 最小上線次數, 到達這個次數被認為存活
min_rises: usize,
/// 記錄跟地址相關的信息
health_map: HashMap<SocketAddr, HealthRecord>,
} /// 每個SocketAddr的記錄值
struct HealthRecord {
/// 最后的記錄時間
last_record: Instant,
/// 失敗的恢復時間
fail_timeout: Duration,
/// 當前連續失敗的次數
fall_times: usize,
/// 當前連續成功的次數
rise_times: usize,
/// 當前的狀態
failed: bool,
}

主要有最后記錄時間,失敗次數,成功次數,最大失敗懲罰時間等元素組成

我們通過函數is_fall_down判定是否是異常狀態,未檢查前默認為正常狀態,超出一定時間后,解除異常狀態。

/// 檢測狀態是否能連接
pub fn is_fall_down(addr: &SocketAddr) -> bool {
// 只讀,獲取讀鎖
if let Ok(h) = HEALTH_CHECK.read() {
if !h.health_map.contains_key(addr) {
return false;
}
let value = h.health_map.get(&addr).unwrap();
if Instant::now().duration_since(value.last_record) > value.fail_timeout {
return false;
}
h.health_map[addr].failed
} else {
false
}
}

如果連接TCP失敗則調用add_fall_down將該地址失敗連接次數+1,如果失敗次數達到最大失敗次數將狀態置為不可用。

/// 失敗時調用
pub fn add_fall_down(addr: SocketAddr) {
// 需要寫入,獲取寫入鎖
if let Ok(mut h) = HEALTH_CHECK.write() {
if !h.health_map.contains_key(&addr) {
let mut health = HealthRecord::new(h.fail_timeout);
health.fall_times = 1;
h.health_map.insert(addr, health);
} else {
let max_fails = h.max_fails;
let value = h.health_map.get_mut(&addr).unwrap();
// 超出最大的失敗時長,重新計算狀態
if Instant::now().duration_since(value.last_record) > value.fail_timeout {
value.clear_status();
}
value.last_record = Instant::now();
value.fall_times += 1;
value.rise_times = 0; if value.fall_times >= max_fails {
value.failed = true;
}
}
}
}

如果連接TCP成功則調用add_rise_up將該地址成功連接次數+1,如果成功次數達到最小次數將狀態置為不可用。

/// 成功時調用
pub fn add_rise_up(addr: SocketAddr) {
// 需要寫入,獲取寫入鎖
if let Ok(mut h) = HEALTH_CHECK.write() {
if !h.health_map.contains_key(&addr) {
let mut health = HealthRecord::new(h.fail_timeout);
health.rise_times = 1;
h.health_map.insert(addr, health);
} else {
let min_rises = h.min_rises;
let value = h.health_map.get_mut(&addr).unwrap();
// 超出最大的失敗時長,重新計算狀態
if Instant::now().duration_since(value.last_record) > value.fail_timeout {
value.clear_status();
}
value.last_record = Instant::now();
value.rise_times += 1;
value.fall_times = 0; if value.rise_times >= min_rises {
value.failed = false;
}
}
}
}

接下來我們將TcpStream::connect函數統一替換成HealthCheck::connect外部修改幾乎為0,可實現開啟健康檢查,后續還會有主動式的健康檢查。

pub async fn connect<A>(addr: &A) -> io::Result<TcpStream>
where
A: ToSocketAddrs,
{
let addrs = addr.to_socket_addrs()?;
let mut last_err = None; for addr in addrs {
// 健康檢查失敗,直接返回錯誤
if Self::is_fall_down(&addr) {
last_err = Some(io::Error::new(io::ErrorKind::Other, "health check falldown"));
} else {
match TcpStream::connect(&addr).await {
Ok(stream) =>
{
Self::add_rise_up(addr);
return Ok(stream)
},
Err(e) => {
Self::add_fall_down(addr);
last_err = Some(e)
},
}
}
} Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}

效果

在前三次請求的時候,將花費5秒左右才拋出拒絕鏈接的錯誤

connect server Err(Os { code: 10061, kind: ConnectionRefused, message: "由于目標計算機積極拒絕,無
法連接。" })

可以發現三次之后,將會快速的拋出錯誤,達成健康檢查的目標

connect server Err(Custom { kind: Other, error: "health check falldown" })

此時被動式的健康檢查已完成,后續按需要的話將按需看是否實現主動式的健康檢查。

總結

以上是生活随笔為你收集整理的11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。