- 2023-09-19 09:00:17
- 4851 热度
- 0 评论
前两天发了一篇“UDP 上传文件”(http://www.javacui.com/netcode/207.html )的文章,但是当时只是把功能实现,后续做成多线程的,也修正了跳出时机的问题。
用到多线程,也不得不了解下线程池的内容,你可以参考“Java四种线程池的使用”(http://www.javacui.com/Theory/151.html )。
下面直接来看下代码即可:
package com.dlwx.net; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * UDP服务类 */ public class LogFileSer { public static DatagramSocket ds = null; public static int sendLen = 1024 * 10 + 4 + 4 + 4 + 4; // 长度 索引 名称 包长度 public static String filePath = "C:\\"; public static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100); public static void main(String[] args) throws Exception { String serverHost = "0.0.0.0"; int serverPort = 3344; LogFileSer udpServerSocket = new LogFileSer(serverHost, serverPort); while (true) { DatagramPacket packet = udpServerSocket.receive(); fixedThreadPool.execute(new LogFileSerChild(packet)); } } /** * 构造函数,绑定主机和端口 */ private LogFileSer(String host, int port) throws Exception { InetSocketAddress socketAddress = new InetSocketAddress(host, port); ds = new DatagramSocket(socketAddress); System.out.println("服务端启动 端口" + 3344); } /** * 接收数据包,该方法会造成线程阻塞 */ private final DatagramPacket receive() throws IOException { byte[] buffer = new byte[sendLen]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); ds.receive(packet); return packet; } /** * 将响应包发送给请求端 */ public final static void response(DatagramPacket packet, byte[] info) throws Exception { byte[] buffer = new byte[LogFileSer.sendLen]; DatagramPacket dp = new DatagramPacket(buffer, buffer.length, packet.getAddress(), packet.getPort()); dp.setData(info); LogFileSer.ds.send(dp); } }
注意,这里索引是分块传输的索引,后面还加了一个每次传递多少,用于跳过文件写入。
package com.dlwx.net; import java.io.File; import java.io.RandomAccessFile; import java.net.DatagramPacket; import java.nio.ByteBuffer; import java.util.Arrays; import com.dlwx.util.StreamTool; /** * 处理UDP包 */ public class LogFileSerChild implements Runnable{ private DatagramPacket packet; public LogFileSerChild(DatagramPacket packet){ this.packet = packet; } public void run() { try { if(null != packet && packet.getLength() > 15){ byte[] re = new byte[packet.getLength()]; System.arraycopy(packet.getData(), 0, re, 0, packet.getLength()); System.out.println("收到内容:" + re.length + "-->" + Arrays.toString(re)); byte[] btTemp = new byte[]{re[0], re[1], re[2], re[3]}; int len = StreamTool.bytesToInt(btTemp); if(len == re.length){ // 标记的长度是否正确 btTemp = new byte[]{re[4], re[5], re[6], re[7]}; int index = StreamTool.bytesToInt(btTemp); if(index >= 0 && index <= 1024 * 10){ // 标记的索引正确,最大10M btTemp = new byte[]{re[8], re[9], re[10], re[11]}; Integer name = StreamTool.bytesToInt(btTemp); btTemp = new byte[]{re[12], re[13], re[14], re[15]}; int bloc = StreamTool.bytesToInt(btTemp); if(bloc > 0 && bloc < 1024 * 10){ ByteBuffer bf = ByteBuffer.allocate(16); bf.put(StreamTool.intToByte(16)); // 总长度 bf.put(StreamTool.intToByte(index)); // 索引 bf.put(StreamTool.intToByte(name)); // 名称 bf.put(StreamTool.intToByte(1)); // 成功 LogFileSer.response(packet, bf.array()); byte[] btFile = new byte[re.length - 16]; System.arraycopy(re, 16, btFile, 0, re.length - 16); String nameStr = name.toString(); File file = new File(LogFileSer.filePath + nameStr + ".txt"); if(!file.exists()) file.createNewFile(); // 不存在就创建新文件 RandomAccessFile fdf = new RandomAccessFile(LogFileSer.filePath + nameStr + ".txt", "rw"); fdf.seek(index * bloc); // 跳过索引部分 fdf.write(btFile); fdf.close(); } } } } } catch (Exception e) { } } }
实现类要做一些限制,这里我就简单做了一些判断,实际应用根据具体场景来做。
客户端代码很简单,分块和重发:
package com.dlwx.net; import java.io.FileInputStream; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.Arrays; import com.dlwx.util.StreamTool; /** * UDP客户端程序,用于对服务端发送数据,并接收服务端的回应信息 */ public class LogFileClient { private static int sendLen = 1024 * 1; private static DatagramSocket ds = null; private static int name = 456789; // 订单号 /** * 测试客户端发包和接收回应信息的方法 */ public static void main(String[] args) throws Exception { LogFileClient client = new LogFileClient(); String serverHost = "127.0.0.1"; int serverPort = 3344; FileInputStream fi = new FileInputStream("C:\\log.txt"); byte[] fbt = StreamTool.inputStreamToByte(fi); if(null != fbt && fbt.length > 0){ int pk = fbt.length / sendLen; // 需要发多少包 if(fbt.length % sendLen > 0) pk++; if(pk == 1){ // 只有一个包 D:for(int t=0;t<5;t++){ // 尝试 5 次 try { ByteBuffer bf = ByteBuffer.allocate(fbt.length); bf.put(StreamTool.intToByte(fbt.length + 16)); // 总长度 bf.put(StreamTool.intToByte(0)); // 索引 bf.put(StreamTool.intToByte(name)); // 名称 bf.put(StreamTool.intToByte(sendLen)); // 每次发送多少 bf.put(fbt); client.send(serverHost, serverPort, bf.array()); byte[] bt = client.receive(); if(null != bt && bt.length == 16){ break D; // 发送成功 } } catch (Exception e) { } if(t==4){ // 发送5次不成功 break D; } try {Thread.sleep(1000);} catch (Exception e) {} } }else{ A:for(int i = 0;i < pk;i++){ int len = sendLen; if(i == pk - 1 && fbt.length % sendLen > 0){ // 最后一个包,且不满足一个整包 len = fbt.length % sendLen; } byte[] sd = new byte[len]; System.arraycopy(fbt, i * sendLen, sd, 0, len); // 数组源,数组源拷贝的开始位子,目标,目标填写的开始位子,拷贝的长度 ByteBuffer bf = ByteBuffer.allocate(len + 16); bf.put(StreamTool.intToByte(len + 16)); // 总长度 bf.put(StreamTool.intToByte(i)); // 索引 bf.put(StreamTool.intToByte(name)); // 名称 bf.put(StreamTool.intToByte(sendLen)); // 每次发送多少 bf.put(sd); byte[] bySd = bf.array(); B:for(int t=0;t<5;t++){ // 尝试 5 次 try { System.out.println("发送次数:" + t + " 发送内容:" + bySd.length + "-->" + Arrays.toString(bySd)); client.send(serverHost, serverPort, bySd); byte[] bt = client.receive(); if(null != bt && bt.length == 16){ break B; // 发送成功,继续发送下一个包 } } catch (Exception e) { } if(t==4){ // 发送5次不成功 break A; } } } } } ds.close(); // 关闭连接 } /** * 构造函数,创建UDP客户端 */ public LogFileClient() throws Exception { ds = new DatagramSocket(); // 邦定本地端口作为客户端,这里不邦定 } /** * 向指定的服务端发送数据信息 */ public final void send(final String host, final int port, final byte[] bytes) throws IOException { DatagramPacket dp = new DatagramPacket(bytes, bytes.length, InetAddress.getByName(host), port); ds.send(dp); } /** * 接收从指定的服务端发回的数据 */ public final byte[] receive() throws Exception { byte[] buffer = new byte[16]; DatagramPacket dp = new DatagramPacket(buffer, buffer.length); ds.setSoTimeout(30 * 1000); // 超时时间 ds.receive(dp); byte[] data = new byte[dp.getLength()]; System.arraycopy(dp.getData(), 0, data, 0, dp.getLength()); return data; } }
这里有几个代码技术点可以参考,其实并不是很难的东西,就当hello world参考下即可。
0 评论
留下评论
热门标签
- Spring(403)
- Boot(208)
- Spring Boot(187)
- Java(82)
- Cloud(82)
- Spring Cloud(82)
- Security(60)
- Spring Security(54)
- Boot2(51)
- Spring Boot2(51)
- Redis(31)
- SQL(29)
- Mysql(25)
- IDE(24)
- Dalston(24)
- JDBC(22)
- IDEA(22)
- mongoDB(22)
- MVC(22)
- Web(21)
- CLI(20)
- Alibaba(19)
- SpringMVC(19)
- SpringBoot(17)
- Docker(17)
- Git(16)
- Eclipse(16)
- Vue(16)
- ORA(15)
- JPA(15)
- Apache(15)
- Mybatis(14)
- Oracle(14)
- jdk(14)
- Tomcat(14)
- Linux(14)
- HTTP(14)
- XML(13)
- JdbcTemplate(13)
- OAuth(13)
- Nacos(13)
- Pro(13)
- Data(12)
- JSON(12)
- OAuth2(12)
- stream(11)
- int(11)
- Myeclipse(11)
- not(10)
- Bug(10)
- maven(9)
- Map(9)
- Hystrix(9)
- ast(9)
- APP(8)
- Bit(8)
- API(8)
- session(8)
- Window(8)
- Swagger(8)
- Github(7)
- JavaMail(7)
- Cache(7)
- File(7)
- mail(7)
- IntelliJ(7)
- windows(7)
- too(7)
- HTML(7)
- RabbitMQ(6)
- star(6)
- and(6)
- Excel(6)
- Log4J(6)
- pushlet(6)
- apt(6)
- read(6)
- Freemarker(6)
- WebFlux(6)
- JSP(6)
- Bean(6)
- error(6)
- nginx(6)
- Server(6)
- ueditor(6)
- jar(6)
- ehcache(6)
- UDP(6)
- rdquo(5)
- PHP(5)
- Struts(5)
- string(5)
- Syntaxhighlighter(5)
- script(5)
- Tool(5)
- Controller(5)
- swagger2(5)
- ldquo(5)
- input(5)
- Servlet(5)