視頻監(jiān)控應(yīng)用中可伸縮的線程池設(shè)計(jì)
在撰寫本文前 我有選擇性的參考了目前網(wǎng)絡(luò)上對(duì)于線程池的介紹和實(shí)現(xiàn)的技術(shù)文章。結(jié)合自己實(shí)踐經(jīng)歷,做如下分條闡述。
一.依賴支持:基于通用線程基類。
基于公司內(nèi)部的線程基礎(chǔ)庫。線程基類中zui為關(guān)鍵的是幾個(gè)虛繼承函數(shù)。
//所有的線程都必須重載此方法!!!
virtual void RunBefor()=0; //行動(dòng)準(zhǔn)備
virtual void RunAfter()=0; //撤離準(zhǔn)備
virtual void Run()=0; //執(zhí)行體
其他由基類派生出來的子類 就可以而且必須實(shí)現(xiàn)這3個(gè)函數(shù)。
在繼承后的函數(shù)中 按照自己的功能要求來加以具體實(shí)現(xiàn)了。
二.架構(gòu)剖析:一個(gè)管理引擎線程,一群工作線程。
1 線程池要有個(gè)列表,來管理多個(gè)線程對(duì)象。
線程池只是一個(gè)架子,就像房子要有人才行。有人就會(huì)有事情產(chǎn)生和處理。
所謂的事情就是一個(gè)線程對(duì)象,具體線程的run函數(shù)就是事情的解決過程邏輯。
2 線程池要提供獲取空閑(IDLE)線程方法。
這個(gè)是非常必要的。一般獲取空閑可以用遍歷的法子。當(dāng)然在具體應(yīng)用中,“空閑”的概念也可以很豐富。比如用某某算法快速定位等等。這里就不展開了。
3 線程池中的線程,具體執(zhí)行的內(nèi)容,可自定義。
線程池只是一個(gè)架子,提供一種模板化的處理建議。
至于具體一個(gè)新分配的線程究竟做什么,就看程序的應(yīng)用場(chǎng)景了。
4 線程池中的線程,使用完畢后,還能被收回,供下次使用。
所謂線程池,可以理解為一種思想。
就像一個(gè)水池,多進(jìn)多出罷了。需要就取,用完歸還。
5 重復(fù)和克隆機(jī)制按需使用。
在視頻監(jiān)控的應(yīng)用場(chǎng)合中,重復(fù)和克隆都是很常見且非常有效的處理思維。
重復(fù)為了減少無謂的開銷。克隆為了及時(shí)的響應(yīng),否則在某一點(diǎn)接入時(shí)間點(diǎn)會(huì)導(dǎo)致無法接入的情況。相信寫服務(wù)器的朋友對(duì)此應(yīng)該都深有感觸把。
三.調(diào)度規(guī)則
1、線程的創(chuàng)建
一般當(dāng)服務(wù)器接收到客戶端的請(qǐng)求后,會(huì)調(diào)用線程POOL的一個(gè)外調(diào)接口函數(shù)。
外調(diào)函數(shù)根據(jù)條件判斷,遍歷綁定的隊(duì)列,看是否有相同的對(duì)象。如果有,則返回失敗。
如果沒有,則用new操作新建一個(gè)線程對(duì)象,賦值并Resume。然后線程對(duì)象就經(jīng)有runbefor到run循環(huán)中執(zhí)行特定的處理邏輯,zui后經(jīng)由runafter退出線程。
2、任務(wù)分配策略及執(zhí)行過程
一般在服務(wù)端的線程處理引擎中需要好幾個(gè)線程池合力協(xié)作。拿視頻監(jiān)控項(xiàng)目的實(shí)踐來說。
至少會(huì)需要接入POOL,處理POOL,呼出POOL。它們每個(gè)POOL本身就是一個(gè)線程引擎。而相互之間的一般通過標(biāo)記位的方式進(jìn)行合理的串聯(lián),以達(dá)到流水工序化作業(yè)的目的。
一個(gè)流程下來,就能非常好的體現(xiàn)出 任務(wù)分配的思想。有人負(fù)責(zé)接待,有人燒菜,有人端菜,有人負(fù)責(zé)中間協(xié)調(diào),有人收拾碗筷,有人送客。儼然一個(gè)飯店的流程。
3、線程的銷毀
一般線程在主循環(huán)run中執(zhí)行自己的處理邏輯。一旦退出條件被觸發(fā),那么置位退出標(biāo)記為istobeclosed,然后自己空兜線程,就是僅僅sleep(10)然后直接返回。這個(gè)時(shí)候 負(fù)責(zé)管理引擎的線程 在查找線程狀態(tài)時(shí)候 發(fā)現(xiàn)這個(gè)線程想要退出。就調(diào)用線程基類的SetStopFlag(),讓線程退出去,再等待IsOutRun返回TRUE,再就是delete線程對(duì)象。
四.附錄代碼
(1)線程基類
class CThread
{
public:
CThread();
CThread(bool suspend);
virtual ~CThread();
//以下是常見函數(shù)定義;
HANDLE GetHandle();
int GetID();
int GetGroupID();
int GetThreadID();
string GetNickName();
void SetID(int iID);
void SetGroupID(int iGroupID);
void SetNickName(const string& sName);
//zhangxf 2010-01-28補(bǔ)充;
void SetNoSleep();
void SetSleepTime(int ivalue);
void Resume(); //啟動(dòng)線程;
void Suspend(); //掛起線程;
bool Wait(long TimeToStop,bool bWaitAllTime=false); //等待線程退出主循環(huán)體
void SetStopFlag(bool IsStopFlag);//子類協(xié)同性退出
bool GetStopFlag();//獲取退出標(biāo)記
bool IsDead();//線程是否僵死
DWORD GetLastRunTime();//獲取上次循環(huán)體進(jìn)入時(shí)間
DWORD GetLastSuspendedTime();//獲取上次掛起時(shí)間
DWORD GetLastResumeTime();//獲取上次喚醒時(shí)間
bool IsSuspended();
bool IsOutOfRun();
void Terminate();//強(qiáng)制退出
//獲取錯(cuò)誤碼
int GetLastError() const {return m_ErrorCode;}
//所有的線程都必須重載此方法!!!
virtual void RunBefor()=0; //行動(dòng)準(zhǔn)備
virtual void RunAfter()=0; //撤離準(zhǔn)備
virtual void Run()=0; //執(zhí)行體
void MainRun();
protected:
static unsigned _stdcall ThreadEntry(LPVOID lpParam);
private:
HANDLE m_hThread; //線程句柄
int m_ID;
int m_ThreadID; //線程ID
int m_GroupID; //線程組別;
string m_NickName; //線程名稱;
DWORD m_LastRunTime; //上次循環(huán)體進(jìn)入時(shí)間
DWORD m_LastSuspendedTime;//上次掛起的時(shí)間
DWORD m_LastResumeTime; //上次喚醒的時(shí)間
bool m_IsSuspended; //目前線程的狀態(tài)是否掛起;
int m_ErrorCode;
bool m_IsTerminated; //線程是否終止
bool m_IsOutOfRun; //是否退出了主循環(huán)
int m_SleepTime; //設(shè)置sleep的秒數(shù);
};
(2)線程池參考范例
//視頻發(fā)送線程
class CVideoSend : public CThread
{
public:
CVideoSend(bool CreateSuspended);
~CVideoSend();
void RunBefor();
void RunAfter();
void Run();
public:
bool _RunProc(); //加返回值gzb 2010-09-08
void _RunBefor(); //內(nèi)部函數(shù)
public:
void Binding(CPublicInfo* _PublicInfo);
void Binding(CVideoSendPool* _VideoSendPool);
public:
CPublicInfo* m_Binding_PublicInfo; //由publicinfo綁定路由列表
CVideoSendPool* m_Binding_VideoSendPool; //綁定發(fā)送線程pool
public:
CVideoShare* m_Binding_VideoShare; //綁定 視頻共享內(nèi)存區(qū)指針。
CNetContext* m_Binding_TContext; //綁定請(qǐng)求的包文
CNetTCPClient* m_Package_NetClient; //包裝一個(gè)CNetTCPClient,沒有實(shí)質(zhì)的去新建。
int m_WriteIndex; //
DWORD m_WriteTime; //
CCmdContext* m_CmdContext; //請(qǐng)求包文指針-----------------------
SOCKET m_SocketID; //SocketID
string m_ClientIP; //客戶端IP
WORK_STATE m_WorkState; //狀態(tài)
string m_GUID; //*標(biāo)記。gzb 2010-08-26
};
typedef std::list<CVideoSend*> _CVideoSendList;
////
class CVideoSendPool : public _CVideoSendList,public CThread
{
public:
CVideoSendPool(bool CreateSuspended);
~CVideoSendPool();
void RunBefor();
void RunAfter();
void Run();
public:
void _RunProc();
public:
//取空閑線程;
void Binding(CPublicInfo* _PublicInfo);
void _FindNoLivingAndDelete(); //定期清理
void _PrintfPoolInfo();
public:
bool NL_StartSendVideo();
bool out_RequestForVideo();
public:
CPublicInfo* m_Binding_PublicInfo; //
};
銳虎科技 研發(fā)部
銳虎科技,專注安防監(jiān)控平臺(tái)軟件設(shè)計(jì)