久久久久久久视色,久久电影免费精品,中文亚洲欧美乱码在线观看,在线免费播放AV片

<center id="vfaef"><input id="vfaef"><table id="vfaef"></table></input></center>

    <p id="vfaef"><kbd id="vfaef"></kbd></p>

    
    
    <pre id="vfaef"><u id="vfaef"></u></pre>

      <thead id="vfaef"><input id="vfaef"></input></thead>

    1. 站長(zhǎng)資訊網(wǎng)
      最全最豐富的資訊網(wǎng)站

      基于Java NIO的即時(shí)聊天服務(wù)器模型

      折騰了一個(gè)周,終于搞出來(lái)了一個(gè)雛形,相比于xmpp的xml,本人更喜歡json的簡(jiǎn)潔,為了防止客戶端異常斷開等,準(zhǔn)備采用心跳檢測(cè)的機(jī)制來(lái)判斷用戶是否在線,另外還有一種方法是學(xué)習(xí)例如Tomcat等Servlet中間件的方式,設(shè)置Session周期,定時(shí)清除過(guò)期Session。

        前不久自己動(dòng)手寫了一個(gè)Android的聊天工具,跟服務(wù)器的交互還是基于HTTP方式的,在一般通訊上還算湊活,但是在即時(shí)聊天的時(shí)候就有點(diǎn)惡心了,客戶端開啟Service每隔3秒去詢問服務(wù)器是否有自己的新消息(當(dāng)然3秒有點(diǎn)太快了),在心疼性能和流量的前提下,只能自己動(dòng)手寫個(gè)服務(wù)器,傳統(tǒng)的Socket是阻塞的,這樣的話服務(wù)器對(duì)每個(gè)Socket都需要建立一個(gè)線程來(lái)操作,資源開銷很大,而且線程多了直接會(huì)影響服務(wù)端的性能(曾經(jīng)測(cè)試開了3000多個(gè)線程就不讓創(chuàng)建了,所以并發(fā)數(shù)目也是有限制的),聽說(shuō)從JDK1.5就多了個(gè)New
        IO,灰常不錯(cuò)的樣子,找了找相關(guān)的資料,網(wǎng)上竟然全都是最最最簡(jiǎn)單的一個(gè)demo,然后去CSDN發(fā)帖,基本上都是建議直接使用MINA框架的,這樣一來(lái)根本達(dá)不到學(xué)習(xí)NIO的目的,而且現(xiàn)在的技術(shù)也太快餐了,只知道使用前輩留下的東西,知其然不知其所以然。

        折騰了一個(gè)周,終于搞出來(lái)了一個(gè)雛形,相比于xmpp的xml,本人更喜歡json的簡(jiǎn)潔,為了防止客戶端異常斷開等,準(zhǔn)備采用心跳檢測(cè)的機(jī)制來(lái)判斷用戶是否在線,另外還有一種方法是學(xué)習(xí)例如Tomcat等Servlet中間件的方式,設(shè)置Session周期,定時(shí)清除過(guò)期Session。本Demo暫時(shí)實(shí)現(xiàn)了Session過(guò)期檢測(cè),心跳檢測(cè)有空再搞,如果本例子在使用過(guò)程中有性能漏洞或者什么bug請(qǐng)及時(shí)通知我,謝謝。

        廢話不多說(shuō),關(guān)于NIO的SelectionKey、Selector、Channel網(wǎng)上的介紹例子都很多,直接上代碼:

        JsonParser

        Json的解析類,隨便封裝了下,使用的最近比較火的fastjson

                         
        1. public class JsonParser {
        2. private static JSONObject mJson;
        3. public synchronized static String get(String json,String key) {
        4. mJson = JSON.parseObject(json);
        5. return mJson.getString(key);
        6. }
        7. }
         

        Main

        入口,不解釋

                                                                                                         
        1. public class Main {
        2. public static void main(String… args) {
        3. new SeekServer().start();
        4. }
        5. }
         

        Log

                                                                                                                                                                 
        1. public class Log {
        2. public static void i(Object obj) {
        3. System.out.println(obj);
        4. }
        5. public static void e(Object e) {
        6. System.err.println(e);
        7. }
        8. }
         

        SeekServer:

        服務(wù)器端的入口,請(qǐng)求的封裝和接收都在此類,端口暫時(shí)寫死在了代碼里,mSelector.select(TIME_OUT) > 0
        目的是為了當(dāng)服務(wù)器空閑的時(shí)候(沒有任何讀寫甚至請(qǐng)求斷開事件),循環(huán)時(shí)有個(gè)間隔時(shí)間,不然基本上相當(dāng)于while(true){//nothing}了,你懂的。

                                                                                                                                                                                                                                                 
        1. public class SeekServer extends Thread{
        2. private final int ACCPET_PORT = 55555;
        3. private final int TIME_OUT = 1000;
        4. private Selector mSelector = null;
        5. private ServerSocketChannel mSocketChannel = null;
        6. private ServerSocket mServerSocket = null;
        7. private InetSocketAddress mAddress = null;
        8. public SeekServer() {
        9. long sign = System.currentTimeMillis();
        10. try {
        11. mSocketChannel = ServerSocketChannel.open();
        12. if(mSocketChannel == null) {
        13. System.out.println(“can’t open server socket channel”);
        14. }
        15. mServerSocket = mSocketChannel.socket();
        16. mAddress = new InetSocketAddress(ACCPET_PORT);
        17. mServerSocket.bind(mAddress);
        18. Log.i(“server bind port is “ + ACCPET_PORT);
        19. mSelector = Selector.open();
        20. mSocketChannel.configureBlocking(false);
        21. SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
        22. key.attach(new Acceptor());
        23. //檢測(cè)Session狀態(tài)
        24. Looper.getInstance().loop();
        25. //開始處理Session
        26. SessionProcessor.start();
        27. Log.i(“Seek server startup in “ + (System.currentTimeMillis() – sign) + “ms!”);
        28. } catch (ClosedChannelException e) {
        29. Log.e(e.getMessage());
        30. } catch (IOException e) {
        31. Log.e(e.getMessage());
        32. }
        33. }
        34. public void run() {
        35. Log.i(“server is listening…”);
        36. while(!Thread.interrupted()) {
        37. try {
        38. if(mSelector.select(TIME_OUT) > 0) {
        39. Set<SelectionKey> keys = mSelector.selectedKeys();
        40. Iterator<SelectionKey> iterator = keys.iterator();
        41. SelectionKey key = null;
        42. while(iterator.hasNext()) {
        43. key = iterator.next();
        44. Handler at = (Handler) key.attachment();
        45. if(at != null) {
        46. at.exec();
        47. }
        48. iterator.remove();
        49. }
        50. }
        51. } catch (IOException e) {
        52. Log.e(e.getMessage());
        53. }
        54. }
        55. }
        56. class Acceptor extends Handler{
        57. public void exec(){
        58. try {
        59. SocketChannel sc = mSocketChannel.accept();
        60. new Session(sc, mSelector);
        61. } catch (ClosedChannelException e) {
        62. Log.e(e);
        63. } catch (IOException e) {
        64. Log.e(e);
        65. }
        66. }
        67. }
        68. }
         

        Handler:

        只有一個(gè)抽象方法exec,Session將會(huì)繼承它。


        1. public abstract class Handler {
        2. public abstract void exec();
        3. }
         

        Session:

        封裝了用戶的請(qǐng)求和SelectionKey和SocketChannel,每次接收到新的請(qǐng)求時(shí)都重置它的最后活動(dòng)時(shí)間,通過(guò)狀態(tài)mState=READING
        or SENDING 去執(zhí)行消息的接收與發(fā)送,當(dāng)客戶端異常斷開時(shí)則從SessionManager清除該會(huì)話。


        1. public class Session extends Handler{
        2. private SocketChannel mChannel;
        3. private SelectionKey mKey;
        4. private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);
        5. private Charset charset = Charset.forName(“UTF-8”);
        6. private CharsetDecoder mDecoder = charset.newDecoder();
        7. private CharsetEncoder mEncoder = charset.newEncoder();
        8. private long lastPant;//最后活動(dòng)時(shí)間
        9. private final int TIME_OUT = 1000 * 60 * 5; //Session超時(shí)時(shí)間
        10. private String key;
        11. private String sendData = “”;
        12. private String receiveData = null;
        13. public static final int READING = 0,SENDING = 1;
        14. int mState = READING;
        15. public Session(SocketChannel socket, Selector selector) throws IOException {
        16. this.mChannel = socket;
        17. mChannel = socket;
        18. mChannel.configureBlocking(false);
        19. mKey = mChannel.register(selector, 0);
        20. mKey.attach(this);
        21. mKey.interestOps(SelectionKey.OP_READ);
        22. selector.wakeup();
        23. lastPant = Calendar.getInstance().getTimeInMillis();
        24. }
        25. public String getReceiveData() {
        26. return receiveData;
        27. }
        28. public void clear() {
        29. receiveData = null;
        30. }
        31. public void setSendData(String sendData) {
        32. mState = SENDING;
        33. mKey.interestOps(SelectionKey.OP_WRITE);
        34. this.sendData = sendData + “n”;
        35. }
        36. public boolean isKeekAlive() {
        37. return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
        38. }
        39. public void setAlive() {
        40. lastPant = Calendar.getInstance().getTimeInMillis();
        41. }
        42. /**
        43. * 注銷當(dāng)前Session
        44. */
        45. public void distroy() {
        46. try {
        47. mChannel.close();
        48. mKey.cancel();
        49. } catch (IOException e) {}
        50. }
        51. @Override
        52. public synchronized void exec() {
        53. try {
        54. if(mState == READING) {
        55. read();
        56. }else if(mState == SENDING) {
        57. write();
        58. }
        59. } catch (IOException e) {
        60. SessionManager.remove(key);
        61. try {
        62. mChannel.close();
        63. } catch (IOException e1) {
        64. Log.e(e1);
        65. }
        66. mKey.cancel();
        67. }
        68. }
        69. public void read() throws IOException{
        70. mRreceiveBuffer.clear();
        71. int sign = mChannel.read(mRreceiveBuffer);
        72. if(sign == –1) { //客戶端連接關(guān)閉
        73. mChannel.close();
        74. mKey.cancel();
        75. }
        76. if(sign > 0) {
        77. mRreceiveBuffer.flip();
        78. receiveData = mDecoder.decode(mRreceiveBuffer).toString();
        79. setAlive();
        80. setSign();
        81. SessionManager.addSession(key, this);
        82. }
        83. }
        84. private void setSign() {
        85. //設(shè)置當(dāng)前Session的Key
        86. key = JsonParser.get(receiveData,“imei”);
        87. //檢測(cè)消息類型是否為心跳包
        88. // String type = jo.getString(“type”);
        89. // if(type.equals(“HEART_BEAT”)) {
        90. // setAlive();
        91. // }
        92. }
        93. /**
        94. * 寫消息
        95. */
        96. public void write() {
        97. try {
        98. mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
        99. sendData = null;
        100. mState = READING;
        101. mKey.interestOps(SelectionKey.OP_READ);
        102. } catch (CharacterCodingException e) {
        103. e.printStackTrace();
        104. } catch (IOException e) {
        105. try {
        106. mChannel.close();
        107. } catch (IOException e1) {
        108. Log.e(e1);
        109. }
        110. }
        111. }
        112. }
         

        SessionManager:

        將所有Session存放到ConcurrentHashMap,這里使用手機(jī)用戶的imei做key,ConcurrentHashMap因?yàn)槭蔷€程安全的,所以能很大程度上避免自己去實(shí)現(xiàn)同步的過(guò)程,
        封裝了一些操作Session的方法例如get,remove等。


        1. public class SessionManager {
        2. private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
        3. public static void addSession(String key,Session session) {
        4. sessions.put(key, session);
        5. }
        6. public static Session getSession(String key) {
        7. return sessions.get(key);
        8. }
        9. public static Set<String> getSessionKeys() {
        10. return sessions.keySet();
        11. }
        12. public static int getSessionCount() {
        13. return sessions.size();
        14. }
        15. public static void remove(String[] keys) {
        16. for(String key:keys) {
        17. if(sessions.containsKey(key)) {
        18. sessions.get(key).distroy();
        19. sessions.remove(key);
        20. }
        21. }
        22. }
        23. public static void remove(String key) {
        24. if(sessions.containsKey(key)) {
        25. sessions.get(key).distroy();
        26. sessions.remove(key);
        27. }
        28. }
        29. }
         

        SessionProcessor

        里面使用了JDK自帶的線程池,用來(lái)分發(fā)處理所有Session中當(dāng)前需要處理的請(qǐng)求(線程池的初始化參數(shù)不是太熟,望有了解的童鞋能告訴我),內(nèi)部類Process則是將Session再次封裝成SocketRequest和SocketResponse(看到這里是不是有點(diǎn)熟悉的感覺,對(duì)沒錯(cuò),JavaWeb里到處都是request和response)。


        1. public class SessionProcessor implements Runnable{
        2. private static Runnable processor = new SessionProcessor();
        3. private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
        4. public static void start() {
        5. new Thread(processor).start();
        6. }
        7. @Override
        8. public void run() {
        9. while(true) {
        10. Session tmp = null;
        11. for(String key:SessionManager.getSessionKeys()) {
        12. tmp = SessionManager.getSession(key);
        13. //處理Session未處理的請(qǐng)求
        14. if(tmp.getReceiveData() != null) {
        15. pool.execute(new Process(tmp));
        16. }
        17. }
        18. try {
        19. Thread.sleep(10);
        20. } catch (InterruptedException e) {
        21. Log.e(e);
        22. }
        23. }
        24. }
        25. class Process implements Runnable {
        26. private SocketRequest request;
        27. private SocketResponse response;
        28. public Process(Session session) {
        29. //將Session封裝成Request和Response
        30. request = new SocketRequest(session);
        31. response = new SocketResponse(session);
        32. }
        33. @Override
        34. public void run() {
        35. new RequestTransform().transfer(request, response);
        36. }
        37. }
        38. }
         

        RequestTransform里的transfer方法利用反射對(duì)請(qǐng)求參數(shù)中的請(qǐng)求類別和請(qǐng)求動(dòng)作來(lái)調(diào)用不同類的不同方法(UserHandler和MessageHandler)

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
        1. public class RequestTransform {
        2. public void transfer(SocketRequest request,SocketResponse response) {
        3. String action = request.getValue(“action”);
        4. String handlerName = request.getValue(“handler”);
        5. //根據(jù)Session的請(qǐng)求類型,讓不同的類方法去處理
        6. try {
        7. Class<?> c= Class.forName(“com.seek.server.handler.” + handlerName);
        8. Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};
        9. Method method=c.getMethod(action,arg);
        10. method.invoke(c.newInstance(), new Object[]{request,response});
        11. } catch (Exception e) {
        12. e.printStackTrace();
        13. }
        14. }
        15. }
         

        SocketRequest和SocketResponse


        1. public class SocketRequest {
        2. private Session mSession;
        3. private String mReceive;
        4. public SocketRequest(Session session) {
        5. mSession = session;
        6. mReceive = session.getReceiveData();
        7. mSession.clear();
        8. }
        9. public String getValue(String key) {
        10. return JsonParser.get(mReceive, key);
        11. }
        12. public String getQueryString() {
        13. return mReceive;
        14. }
        15. }
         

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
        1. public class SocketResponse {
        2. private Session mSession;
        3. public SocketResponse(Session session) {
        4. mSession = session;
        5. }
        6. public void write(String msg) {
        7. mSession.setSendData(msg);
        8. }
        9. }
         

        最后則是兩個(gè)處理請(qǐng)求的Handler


        1. public class UserHandler {
        2. public void login(SocketRequest request,SocketResponse response) {
        3. System.out.println(request.getQueryString());
        4. //TODO: 處理用戶登錄
        5. response.write(“你肯定收到消息了”);
        6. }
        7. }
         


        1. public class MessageHandler {
        2. public void send(SocketRequest request,SocketResponse response) {
        3. System.out.println(request.getQueryString());
        4. //消息發(fā)送
        5. String key = request.getValue(“imei”);
        6. Session session = SessionManager.getSession(key);
        7. new SocketResponse(session).write(request.getValue(“sms”));
        8. }
        9. }
         

        還有個(gè)監(jiān)測(cè)是否超時(shí)的類Looper,定期去刪除Session


        1. public class Looper extends Thread{
        2. private static Looper looper = new Looper();
        3. private static boolean isStart = false;
        4. private final int INTERVAL = 1000 * 60 * 5;
        5. private Looper(){}
        6. public static Looper getInstance() {
        7. return looper;
        8. }
        9. public void loop() {
        10. if(!isStart) {
        11. isStart = true;
        12. this.start();
        13. }
        14. }
        15. public void run() {
        16. Task task = new Task();
        17. while(true) {
        18. //Session過(guò)期檢測(cè)
        19. task.checkState();
        20. //心跳包檢測(cè)
        21. //task.sendAck();
        22. try {
        23. Thread.sleep(INTERVAL);
        24. } catch (InterruptedException e) {
        25. Log.e(e);
        26. }
        27. }
        28. }
        29. }
         


        1. public class Task {
        2. public void checkState() {
        3. Set<String> keys = SessionManager.getSessionKeys();
        4. if(keys.size() == 0) {
        5. return;
        6. }
        7. List<String> removes = new ArrayList<String>();
        8. Iterator<String> iterator = keys.iterator();
        9. String key = null;
        10. while(iterator.hasNext()) {
        11. key = iterator.next();
        12. if(!SessionManager.getSession(key).isKeekAlive()) {
        13. removes.add(key);
        14. }
        15. }
        16. if(removes.size() > 0) {
        17. Log.i(“sessions is time out,remove “ + removes.size() + “session”);
        18. }
        19. SessionManager.remove(removes.toArray(new String[removes.size()]));
        20. }
        21. public void sendAck() {
        22. Set<String> keys = SessionManager.getSessionKeys();
        23. if(keys.size() == 0) {
        24. return;
        25. }
        26. Iterator<String> iterator = keys.iterator();
        27. while(iterator.hasNext()) {
        28. iterator.next();
        29. //TODO 發(fā)送心跳包
        30. }
        31. }
        32. }
         

        注意,在Task和SessionProcessor類里都有對(duì)SessionManager的sessions做遍歷,文中使用的方法并不是很好,主要是效率問題,推薦使用遍歷Entry的方式來(lái)獲取Key和Value,因?yàn)橐恢痹贘avaWeb上折騰,所以會(huì)的童鞋看到Request和Response會(huì)挺親切,這個(gè)例子沒有經(jīng)過(guò)任何安全和性能測(cè)試,如果需要放到生產(chǎn)環(huán)境上得話請(qǐng)先自行做測(cè)試-
        -!

        客戶端請(qǐng)求時(shí)的數(shù)據(jù)內(nèi)容例如{handler:”UserHandler”,action:”login”,imei:”2364656512636″…….},這些約定就自己來(lái)定了。

        贊(0)
        分享到: 更多 (0)
        網(wǎng)站地圖   滬ICP備18035694號(hào)-2    滬公網(wǎng)安備31011702889846號(hào)