package org.rcisoft.mqttclient;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.rcisoft.business.totaloriginal.entity.TotalOriginal;
import org.rcisoft.business.totaloriginal.entity.TotalSensor;
import org.rcisoft.business.totaloriginal.service.TotalOriginalService;
import org.rcisoft.core.service.RcRedisService;
import org.rcisoft.core.util.DateUtil;
import org.rcisoft.core.util.ZhnyUtil;

import java.util.*;

public class MqttClient {

    RcRedisService rcRedisService;
    TotalOriginalService totalService;

    public MqttClient(RcRedisService redis, TotalOriginalService ts){
        rcRedisService = redis;
        totalService = ts;
    }

    public void HandleReceive(String content){

        Map<String,Object> referMap = null;
        String devStr = "";
        try {
            JSONObject jb = JSONArray.parseObject(content.substring(1,content.length()-1));
            Map<String,Object> map = new HashMap<>();
            String jwnum = jb.getString("PHONE");//网关编号
            map.put("pro_id",jwnum);
            referMap = totalService.queryBusParamRefer(map);
            devStr = totalService.queryDevByPro(jwnum);
            String time = jb.getString("TIME");//时间
            JSONObject jbody = jb.getJSONObject("body");//数据体
            //String time = DateUtil.getSimepleDate("yyyyMMddHHmmss",new Date());
            if(DateUtil.getParseDate("yyyyMMddhhmmss",time)!=null){//时间格式正常
                String minute = time.substring(10,12);
                String sec = time.substring(12,14);
                /* 10秒级的数据,存储缓存
                 * 缓存中始终保持最新两条网关数据
                 * 用作拓扑图点击设备查看实时参数及10秒级的能耗
                 */
                if(Integer.parseInt(sec)%10==0){
                    rcRedisService.set(jwnum, jbody.toJSONString());
                    //rcRedisService.set("old","old"+new Date() + rcRedisService.get("new"));
                    //rcRedisService.set("new","new"+new Date() + content);
                }
                System.out.println("++++++++++++++++++++++++++++++++++"+minute+"-----"+sec);
                if(Integer.parseInt(minute)%10==0 && sec.equalsIgnoreCase("00")){//10分钟级的数据,进行存储
                    List<String> list = new ArrayList<String>();
                    List<TotalOriginal> originalList = new ArrayList<>();
                    List<TotalSensor> sensorList = new ArrayList<>();
                    //遍历数据体内对象
                    for (Map.Entry<String, Object> entry : jbody.entrySet()) {
                        String key = entry.getKey();
                        JSONObject body = ((JSONObject)entry.getValue()).getJSONObject("REG_VAL");
                        String dev_num = jwnum+"_"+key;
                        //(【已配置的表具】/【设备】)
                        if(referMap.get(dev_num)!=null || devStr.contains(dev_num)){
                            TotalOriginal original = new TotalOriginal();
                            original.setProId(jwnum);
                            original.setTm(DateUtil.getParseDate("yyyyMMddHHmmss",time));
                            original.setDevNum(dev_num);
                            original.setOriginalStr(body.toJSONString());
                            /**
                             * 摘选水电气能耗,对于设备来说,都为null
                             */
                            if(devStr.contains(dev_num)){//设备
                                original.setElecV(null);
                                original.setWaterV(null);
                                original.setGasV(null);
                            }else{//表具
                                String own_param = referMap.get(dev_num).toString().split(",")[0];
                                String other_param = referMap.get(dev_num).toString().split(",")[1];
                                if(own_param.equalsIgnoreCase("power")){
                                    original.setElecV(ZhnyUtil.parseFloat(body.get(other_param)));
                                    original.setWaterV(0f);
                                    original.setGasV(0f);
                                }else if(own_param.equalsIgnoreCase("water")){
                                    original.setElecV(0f);
                                    original.setWaterV(ZhnyUtil.parseFloat(body.get(other_param)));
                                    original.setGasV(0f);
                                }else if(own_param.equalsIgnoreCase("gas")){
                                    original.setElecV(0f);
                                    original.setWaterV(0f);
                                    original.setGasV(ZhnyUtil.parseFloat(body.get(other_param)));
                                }
                            }
                            int i=4;
                            for (Map.Entry<String, Object> bt : body.entrySet()) {
                                if(i==4){ original.setP4(bt.getKey());original.setP4V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==5){ original.setP5(bt.getKey());original.setP5V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==6){ original.setP6(bt.getKey());original.setP6V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==7){ original.setP7(bt.getKey());original.setP7V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==8){ original.setP8(bt.getKey());original.setP8V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==9){ original.setP9(bt.getKey());original.setP9V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==10){ original.setP10(bt.getKey());original.setP10V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==11){ original.setP11(bt.getKey());original.setP11V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==12){ original.setP12(bt.getKey());original.setP12V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==13){ original.setP13(bt.getKey());original.setP13V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==14){ original.setP14(bt.getKey());original.setP14V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==15){ original.setP15(bt.getKey());original.setP15V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==16){ original.setP16(bt.getKey());original.setP16V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==17){ original.setP17(bt.getKey());original.setP17V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==18){ original.setP18(bt.getKey());original.setP18V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==19){ original.setP19(bt.getKey());original.setP19V(ZhnyUtil.parseFloat(bt.getValue())); }
                                if(i==20){ original.setP20(bt.getKey());original.setP20V(ZhnyUtil.parseFloat(bt.getValue())); }
                                i++;
                            }
                            originalList.add(original);
                        }else{//【传感器】
                            TotalSensor sensor = new TotalSensor();
                            sensor.setTm(DateUtil.getParseDate("yyyyMMddHHmmss",time));
                            sensor.setSensorJson(body.toJSONString());
                            sensor.setProId(jwnum);
                            sensorList.add(sensor);
                        }

                    }
                    for(TotalOriginal t : originalList){
                        totalService.save(t);
                    }
                    for(TotalSensor s : sensorList){
                        totalService.saveSensor(s);
                    }
                }
            }


            //System.out.println(jb);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public MqttClient(){};
    public boolean isReceive(String content){
        try {
            if(!content.contains("jwnum")){
                return false;
            }
            JSONObject jsonStr= JSONObject.parseObject(content);
            return  true;
        } catch (Exception e) {
            return false;
        }
    }

    public void start(){
        try {
            MQTT mqtt=new MQTT();

            //MQTT设置说明
            mqtt.setHost("tcp://172.21.0.6:1883");
            mqtt.setClientId("876543210"); //用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成
            mqtt.setCleanSession(false); //若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true
            mqtt.setKeepAlive((short) 60);//定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待
            mqtt.setUserName("admin");//服务器认证用户名
            mqtt.setPassword("public");//服务器认证密码

            mqtt.setWillTopic("willTopic");//设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
            mqtt.setWillMessage("willMessage");//设置“遗嘱”消息的内容,默认是长度为零的消息
            mqtt.setWillQos(QoS.AT_LEAST_ONCE);//设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE
            mqtt.setWillRetain(true);//若想要在发布“遗嘱”消息时拥有retain选项,则为true
            mqtt.setVersion("3.1.1");

            //失败重连接设置说明
            mqtt.setConnectAttemptsMax(10L);//客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
            mqtt.setReconnectAttemptsMax(3L);//客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
            mqtt.setReconnectDelay(10L);//首次重连接间隔毫秒数,默认为10ms
            mqtt.setReconnectDelayMax(30000L);//重连接间隔毫秒数,默认为30000ms
            mqtt.setReconnectBackOffMultiplier(2);//设置重连接指数回归。设置为1则停用指数回归,默认为2

            //Socket设置说明
            mqtt.setReceiveBufferSize(65536);//设置socket接收缓冲区大小,默认为65536(64k)
            mqtt.setSendBufferSize(65536);//设置socket发送缓冲区大小,默认为65536(64k)
            mqtt.setTrafficClass(8);//设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输

            //带宽限制设置说明
            mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制
            mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制

            //选择消息分发队列
            mqtt.setDispatchQueue(Dispatch.createQueue("UPDATA"));//若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法

            //设置跟踪器
            mqtt.setTracer(new Tracer(){
                @Override
                public void onReceive(MQTTFrame frame) {
                    System.out.println("recv: "+frame);
                }

                @Override
                public void onSend(MQTTFrame frame) {
                    System.out.println("send: "+frame);
                }

                @Override
                public void debug(String message, Object... args) {
                    System.out.println(String.format("debug: "+message, args));
                }
            });



            //使用回调式API
            final CallbackConnection callbackConnection=mqtt.callbackConnection();

            //连接监听
            callbackConnection.listener(new Listener() {

                //接收订阅话题发布的消息
                @Override
                public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
                    System.out.println("=============receive msg================"+new String(payload.toByteArray()));
                    String content = new String(payload.toByteArray());
                    HandleReceive(content);
                    onComplete.run();
                }

                //连接失败
                @Override
                public void onFailure(Throwable value) {
                    System.out.println("===========connect failure===========");
                    callbackConnection.disconnect(null);
                }

                //连接断开
                @Override
                public void onDisconnected() {
                    System.out.println("====mqtt disconnected=====");

                }

                //连接成功
                @Override
                public void onConnected() {
                    System.out.println("====mqtt connected=====");

                }
            });



            //连接
            callbackConnection.connect(new Callback<Void>() {

                //连接失败
                public void onFailure(Throwable value) {
                    System.out.println("============连接失败:"+value.getLocalizedMessage()+"============");
                }

                // 连接成功
                public void onSuccess(Void v) {

                    //订阅主题
                    Topic[] topics = {new Topic("UPDATA", QoS.EXACTLY_ONCE)};
                    callbackConnection.subscribe(topics, new Callback<byte[]>() {
                        //订阅主题成功
                        public void onSuccess(byte[] qoses) {
                            System.out.println("========订阅成功=======");
                        }
                        //订阅主题失败
                        public void onFailure(Throwable value) {
                            System.out.println("========订阅失败=======");
                            callbackConnection.disconnect(null);
                        }
                    });


                    //发布消息
                    /*callbackConnection.publish("UPDATA", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback<Void>() {
                        public void onSuccess(Void v) {
                            System.out.println("===========消息发布成功============");
                        }
                        public void onFailure(Throwable value) {
                            System.out.println("========消息发布失败=======");
                            callbackConnection.disconnect(null);
                        }
                    });*/


                }
            });



//            while(true)
//            {
//
//            }


        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}