package org.rcisoft.mqttclient; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.Listener; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; import org.rcisoft.business.totaloriginal.entity.TotalOriginal; import org.rcisoft.business.totaloriginal.entity.TotalSensor; import org.rcisoft.business.totaloriginal.service.TotalOriginalService; import org.rcisoft.core.service.RcRedisService; import org.rcisoft.core.util.DateUtil; import org.rcisoft.core.util.ZhnyUtil; import java.util.*; public class MqttClient { RcRedisService rcRedisService; TotalOriginalService totalService; public MqttClient(RcRedisService redis, TotalOriginalService ts){ rcRedisService = redis; totalService = ts; } public void HandleReceive(String content){ Map<String,Object> referMap = null; String devStr = ""; try { JSONObject jb = JSONArray.parseObject(content.substring(1,content.length()-1)); Map<String,Object> map = new HashMap<>(); String jwnum = jb.getString("PHONE");//网关编号 map.put("pro_id",jwnum); referMap = totalService.queryBusParamRefer(map); devStr = totalService.queryDevByPro(jwnum); String time = jb.getString("TIME");//时间 JSONObject jbody = jb.getJSONObject("body");//数据体 //String time = DateUtil.getSimepleDate("yyyyMMddHHmmss",new Date()); if(DateUtil.getParseDate("yyyyMMddhhmmss",time)!=null){//时间格式正常 String minute = time.substring(10,12); String sec = time.substring(12,14); /* 10秒级的数据,存储缓存 * 缓存中始终保持最新两条网关数据 * 用作拓扑图点击设备查看实时参数及10秒级的能耗 */ if(Integer.parseInt(sec)%10==0){ rcRedisService.set(jwnum, jbody.toJSONString()); //rcRedisService.set("old","old"+new Date() + rcRedisService.get("new")); //rcRedisService.set("new","new"+new Date() + content); } System.out.println("++++++++++++++++++++++++++++++++++"+minute+"-----"+sec); if(Integer.parseInt(minute)%10==0 && sec.equalsIgnoreCase("00")){//10分钟级的数据,进行存储 List<String> list = new ArrayList<String>(); List<TotalOriginal> originalList = new ArrayList<>(); List<TotalSensor> sensorList = new ArrayList<>(); //遍历数据体内对象 for (Map.Entry<String, Object> entry : jbody.entrySet()) { String key = entry.getKey(); JSONObject body = ((JSONObject)entry.getValue()).getJSONObject("REG_VAL"); String dev_num = jwnum+"_"+key; //(【已配置的表具】/【设备】) if(referMap.get(dev_num)!=null || devStr.contains(dev_num)){ TotalOriginal original = new TotalOriginal(); original.setProId(jwnum); original.setTm(DateUtil.getParseDate("yyyyMMddHHmmss",time)); original.setDevNum(dev_num); original.setOriginalStr(body.toJSONString()); /** * 摘选水电气能耗,对于设备来说,都为null */ if(devStr.contains(dev_num)){//设备 original.setElecV(null); original.setWaterV(null); original.setGasV(null); }else{//表具 String own_param = referMap.get(dev_num).toString().split(",")[0]; String other_param = referMap.get(dev_num).toString().split(",")[1]; if(own_param.equalsIgnoreCase("power")){ original.setElecV(ZhnyUtil.parseFloat(body.get(other_param))); original.setWaterV(0f); original.setGasV(0f); }else if(own_param.equalsIgnoreCase("water")){ original.setElecV(0f); original.setWaterV(ZhnyUtil.parseFloat(body.get(other_param))); original.setGasV(0f); }else if(own_param.equalsIgnoreCase("gas")){ original.setElecV(0f); original.setWaterV(0f); original.setGasV(ZhnyUtil.parseFloat(body.get(other_param))); } } int i=4; for (Map.Entry<String, Object> bt : body.entrySet()) { if(i==4){ original.setP4(bt.getKey());original.setP4V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==5){ original.setP5(bt.getKey());original.setP5V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==6){ original.setP6(bt.getKey());original.setP6V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==7){ original.setP7(bt.getKey());original.setP7V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==8){ original.setP8(bt.getKey());original.setP8V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==9){ original.setP9(bt.getKey());original.setP9V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==10){ original.setP10(bt.getKey());original.setP10V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==11){ original.setP11(bt.getKey());original.setP11V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==12){ original.setP12(bt.getKey());original.setP12V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==13){ original.setP13(bt.getKey());original.setP13V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==14){ original.setP14(bt.getKey());original.setP14V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==15){ original.setP15(bt.getKey());original.setP15V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==16){ original.setP16(bt.getKey());original.setP16V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==17){ original.setP17(bt.getKey());original.setP17V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==18){ original.setP18(bt.getKey());original.setP18V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==19){ original.setP19(bt.getKey());original.setP19V(ZhnyUtil.parseFloat(bt.getValue())); } if(i==20){ original.setP20(bt.getKey());original.setP20V(ZhnyUtil.parseFloat(bt.getValue())); } i++; } originalList.add(original); }else{//【传感器】 TotalSensor sensor = new TotalSensor(); sensor.setTm(DateUtil.getParseDate("yyyyMMddHHmmss",time)); sensor.setSensorJson(body.toJSONString()); sensor.setProId(jwnum); sensorList.add(sensor); } } for(TotalOriginal t : originalList){ totalService.save(t); } for(TotalSensor s : sensorList){ totalService.saveSensor(s); } } } //System.out.println(jb); }catch(Exception e){ e.printStackTrace(); } } public MqttClient(){}; public boolean isReceive(String content){ try { if(!content.contains("jwnum")){ return false; } JSONObject jsonStr= JSONObject.parseObject(content); return true; } catch (Exception e) { return false; } } public void start(){ try { MQTT mqtt=new MQTT(); //MQTT设置说明 mqtt.setHost("tcp://172.21.0.6:1883"); mqtt.setClientId("876543210"); //用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成 mqtt.setCleanSession(false); //若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true mqtt.setKeepAlive((short) 60);//定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待 mqtt.setUserName("admin");//服务器认证用户名 mqtt.setPassword("public");//服务器认证密码 mqtt.setWillTopic("willTopic");//设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息 mqtt.setWillMessage("willMessage");//设置“遗嘱”消息的内容,默认是长度为零的消息 mqtt.setWillQos(QoS.AT_LEAST_ONCE);//设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE mqtt.setWillRetain(true);//若想要在发布“遗嘱”消息时拥有retain选项,则为true mqtt.setVersion("3.1.1"); //失败重连接设置说明 mqtt.setConnectAttemptsMax(10L);//客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1 mqtt.setReconnectAttemptsMax(3L);//客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1 mqtt.setReconnectDelay(10L);//首次重连接间隔毫秒数,默认为10ms mqtt.setReconnectDelayMax(30000L);//重连接间隔毫秒数,默认为30000ms mqtt.setReconnectBackOffMultiplier(2);//设置重连接指数回归。设置为1则停用指数回归,默认为2 //Socket设置说明 mqtt.setReceiveBufferSize(65536);//设置socket接收缓冲区大小,默认为65536(64k) mqtt.setSendBufferSize(65536);//设置socket发送缓冲区大小,默认为65536(64k) mqtt.setTrafficClass(8);//设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输 //带宽限制设置说明 mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制 mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制 //选择消息分发队列 mqtt.setDispatchQueue(Dispatch.createQueue("UPDATA"));//若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法 //设置跟踪器 mqtt.setTracer(new Tracer(){ @Override public void onReceive(MQTTFrame frame) { System.out.println("recv: "+frame); } @Override public void onSend(MQTTFrame frame) { System.out.println("send: "+frame); } @Override public void debug(String message, Object... args) { System.out.println(String.format("debug: "+message, args)); } }); //使用回调式API final CallbackConnection callbackConnection=mqtt.callbackConnection(); //连接监听 callbackConnection.listener(new Listener() { //接收订阅话题发布的消息 @Override public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) { System.out.println("=============receive msg================"+new String(payload.toByteArray())); String content = new String(payload.toByteArray()); HandleReceive(content); onComplete.run(); } //连接失败 @Override public void onFailure(Throwable value) { System.out.println("===========connect failure==========="); callbackConnection.disconnect(null); } //连接断开 @Override public void onDisconnected() { System.out.println("====mqtt disconnected====="); } //连接成功 @Override public void onConnected() { System.out.println("====mqtt connected====="); } }); //连接 callbackConnection.connect(new Callback<Void>() { //连接失败 public void onFailure(Throwable value) { System.out.println("============连接失败:"+value.getLocalizedMessage()+"============"); } // 连接成功 public void onSuccess(Void v) { //订阅主题 Topic[] topics = {new Topic("UPDATA", QoS.EXACTLY_ONCE)}; callbackConnection.subscribe(topics, new Callback<byte[]>() { //订阅主题成功 public void onSuccess(byte[] qoses) { System.out.println("========订阅成功======="); } //订阅主题失败 public void onFailure(Throwable value) { System.out.println("========订阅失败======="); callbackConnection.disconnect(null); } }); //发布消息 /*callbackConnection.publish("UPDATA", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback<Void>() { public void onSuccess(Void v) { System.out.println("===========消息发布成功============"); } public void onFailure(Throwable value) { System.out.println("========消息发布失败======="); callbackConnection.disconnect(null); } });*/ } }); // while(true) // { // // } } catch (Exception e) { e.printStackTrace(); } } }