package com.upgradeSocket;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* 客户端是通过控制台获取数据,所以时刻监控控制台的数据
*客户端的管道
* 1.通过参数socket创建构造器,构造流
* 2.发送数据到服务器
*
*/
public class SendChannel implements Runnable {
private BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
private Socket clientChannel ;
private DataOutputStream dos ;//输出流
private Boolean isRunning = true;
public SendChannel(Socket socket) {
this.clientChannel = socket ;
try {
dos = new DataOutputStream(clientChannel.getOutputStream());
} catch (IOException e) {
isRunning = false ;
closeAll();
}
}
//发送数据
private void send(String msg) {
try {
dos.writeUTF(msg);
} catch (IOException e) {
isRunning = false ;
closeAll();
}
}
//获取控制台数据
private String getStrByConsole() {
String msg = null ;
try {
msg = br.readLine();
} catch (IOException e) {
isRunning = false ;
closeAll();
}
return msg;
}
public void closeAll() {
try {
br.close();
dos.close();
clientChannel.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
while(isRunning) {
String msg =getStrByConsole();
if(!msg.equals("")){
send(msg);
}
}
}
}
package com.upgradeSocket;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
/**
*
* 客户端接收数据的通道
* 1.根据socket获取输入流,读取数据
*
*/
public class ReceiveChannel implements Runnable{
private Socket receiveSocket;
private DataInputStream dis ;
private Boolean isRunning = true;
public ReceiveChannel(Socket socket) {
this.receiveSocket = socket;
try {
dis = new DataInputStream(socket.getInputStream());
} catch (IOException e) {
isRunning = false ;
closeAll();
}
}
//接收服务器发送的数据
public String receive() {
String msg = "";
try {
msg = dis.readUTF();
} catch (IOException e) {
isRunning = false ;
closeAll();
}
return msg;
}
public void closeAll() {
try {
dis.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
receiveSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
String msg = "";
while(isRunning) {
msg = receive();
if(!msg.equals("")) {
System.out.println(msg);
}
}
}
}
package com.upgradeSocket;
/**
*
* 服务器端通道
* 1.通过socket获取流
* 2.读取客户端发来的数据
* 3.通过客户端发来的数据,进行判断转发对象
*
*/
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.CopyOnWriteArrayList;
public class ServerChannel implements Runnable {
private Socket serverSocket;
private DataInputStream dis ;
private DataOutputStream dos ;
private Boolean isRunning = true;
private CopyOnWriteArrayList<ServerChannel> cos;
public ServerChannel(Socket socket, CopyOnWriteArrayList<ServerChannel> cos) {
this.cos = cos ;
this.serverSocket = socket;
try {
dis = new DataInputStream(serverSocket.getInputStream());
dos = new DataOutputStream(serverSocket.getOutputStream());
} catch (IOException e) {
isRunning = false ;
closeAll();
}
}
//读取服务端发来的数据
private String readClient() {
String msg ="";
try {
msg = dis.readUTF();
} catch (IOException e) {
isRunning = false ;
closeAll();
e.printStackTrace();
}
return msg;
}
private void send(String msg) {
try {
dos.writeUTF(msg);
} catch (IOException e) {
isRunning = false ;
closeAll();
e.printStackTrace();
}
}
//发送给客户端
private void sendClient(String msg) {
//如果是自己,则不发送给自己
for (ServerChannel serverChannel : cos) {
if(this == serverChannel) {
continue ;
}
try {
serverChannel.dos.writeUTF(msg);
} catch (IOException e) {
isRunning = false ;
closeAll();
e.printStackTrace();
}
}
}
@Override
public void run() {
// TODO Auto-generated method stub
while(isRunning) {
String msg =readClient();
if(!msg.equals("")) {
sendClient(msg);
}
}
}
public void closeAll() {
try {
dis.close();
dos.close();
serverSocket.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
@Override
public String toString() {
return "ServerChannel [serverSocket=" + serverSocket + ", dis=" + dis + ", dos=" + dos + ", isRunning="
+ isRunning + "]";
}
}
package com.upgradeSocket;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
public class Client {
public static void main(String[] args) throws UnknownHostException, IOException {
new Thread(new SendChannel(new Socket("localhost",9999))).start();
new Thread(new ReceiveChannel(new Socket("localhost",9999))).start();
}
}
package com.upgradeSocket;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.CopyOnWriteArrayList;
public class Server {
//高并发容器,当多个线程同时访问确保线程安全,arraylist多个线程访问修改,可能会造成数据丢失
private static CopyOnWriteArrayList<ServerChannel> cos = new CopyOnWriteArrayList();
public static void main(String[] args) throws IOException {
ServerSocket ssk = new ServerSocket(9999);
int i =0;
while(true) {
Socket s = ssk.accept();
ServerChannel channel = new ServerChannel(s,cos);
new Thread(channel).start();
cos.add(channel);
}
}
}