Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Z
zhny
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
王夏晖
zhny
Commits
4c55cbbc
Commit
4c55cbbc
authored
Apr 11, 2018
by
Administrator
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
mqtt客户端及redis缓存
parent
c512907c
Changes
8
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
510 additions
and
1 deletion
+510
-1
pom.xml
pom.xml
+5
-1
ZhnyLineRunner.java
src/main/java/org/rcisoft/ZhnyLineRunner.java
+14
-0
RedisConfig.java
src/main/java/org/rcisoft/config/RedisConfig.java
+61
-0
RcRedisConfigBean.java
src/main/java/org/rcisoft/core/bean/RcRedisConfigBean.java
+41
-0
RcRedisService.java
src/main/java/org/rcisoft/core/service/RcRedisService.java
+88
-0
RcRedisServiceImpl.java
...ava/org/rcisoft/core/service/impl/RcRedisServiceImpl.java
+196
-0
ClientMQTT.java
src/main/java/org/rcisoft/mqttclient/ClientMQTT.java
+63
-0
PushCallback.java
src/main/java/org/rcisoft/mqttclient/PushCallback.java
+42
-0
No files found.
pom.xml
View file @
4c55cbbc
...
...
@@ -310,7 +310,11 @@
<!-- 1.10 需要jdk1.8-->
</dependency>
<dependency>
<groupId>
org.eclipse.paho
</groupId>
<artifactId>
org.eclipse.paho.client.mqttv3
</artifactId>
<version>
1.1.1
</version>
</dependency>
</dependencies>
...
...
src/main/java/org/rcisoft/ZhnyLineRunner.java
0 → 100644
View file @
4c55cbbc
package
org
.
rcisoft
;
import
org.rcisoft.mqttclient.ClientMQTT
;
import
org.springframework.boot.CommandLineRunner
;
import
org.springframework.stereotype.Component
;
@Component
public
class
ZhnyLineRunner
implements
CommandLineRunner
{
@Override
public
void
run
(
String
...
var1
)
throws
Exception
{
System
.
out
.
println
(
"MQTT 客户端启动++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
);
ClientMQTT
mqclient
=
new
ClientMQTT
();
mqclient
.
start
();
}
}
src/main/java/org/rcisoft/config/RedisConfig.java
0 → 100644
View file @
4c55cbbc
package
org
.
rcisoft
.
config
;
import
lombok.extern.slf4j.Slf4j
;
import
org.rcisoft.core.bean.RcRedisConfigBean
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
redis.clients.jedis.JedisPool
;
import
redis.clients.jedis.JedisPoolConfig
;
/**
* Created by lcy on 18/1/8.
*/
@Configuration
@Slf4j
public
class
RedisConfig
{
@Autowired
private
RcRedisConfigBean
redisConfigBean
;
@Bean
public
JedisPool
getJedisPool
(){
JedisPoolConfig
config
=
new
JedisPoolConfig
();
//连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
config
.
setBlockWhenExhausted
(
true
);
//设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
config
.
setEvictionPolicyClassName
(
"org.apache.commons.pool2.impl.DefaultEvictionPolicy"
);
//是否启用pool的jmx管理功能, 默认true
config
.
setJmxEnabled
(
true
);
//MBean ObjectName = new ObjectName("org.apache.commons.pool2:type=GenericObjectPool,name=" + "pool" + i); 默认为"pool", JMX不熟,具体不知道是干啥的...默认就好.
config
.
setJmxNamePrefix
(
"pool"
);
//是否启用后进先出, 默认true
config
.
setLifo
(
false
);
//最大空闲连接数, 默认8个
config
.
setMaxIdle
(
redisConfigBean
.
getMaxIdle
());
//最大连接数, 默认8个
config
.
setMaxTotal
(
redisConfigBean
.
getMaxActive
());
//获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
config
.
setMaxWaitMillis
(
redisConfigBean
.
getMaxWait
());
//逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
config
.
setMinEvictableIdleTimeMillis
(
1800000
);
//最小空闲连接数, 默认0
config
.
setMinIdle
(
0
);
//每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
config
.
setNumTestsPerEvictionRun
(
3
);
//对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
config
.
setSoftMinEvictableIdleTimeMillis
(
1800000
);
//在获取连接的时候检查有效性, 默认false
config
.
setTestOnBorrow
(
false
);
//在空闲时检查有效性, 默认false
config
.
setTestWhileIdle
(
false
);
//逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
config
.
setTimeBetweenEvictionRunsMillis
(-
1
);
JedisPool
pool
=
new
JedisPool
(
config
,
redisConfigBean
.
getHost
(),
redisConfigBean
.
getPort
(),
redisConfigBean
.
getTimeout
(),
null
,
redisConfigBean
.
getDatabase
());
log
.
info
(
"init JedisPool ..."
);
return
pool
;
}
}
src/main/java/org/rcisoft/core/bean/RcRedisConfigBean.java
0 → 100644
View file @
4c55cbbc
package
org
.
rcisoft
.
core
.
bean
;
import
lombok.Data
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.boot.context.properties.ConfigurationProperties
;
import
org.springframework.stereotype.Component
;
/**
* Created by lcy on 18/1/8.
*/
@Component
@ConfigurationProperties
(
prefix
=
"spring.redis"
)
@Data
public
class
RcRedisConfigBean
{
private
String
host
;
private
int
port
;
private
String
password
;
private
int
timeout
;
private
int
database
;
@Value
(
"${spring.redis.pool.max-idle:100}"
)
private
int
maxIdle
;
@Value
(
"${spring.redis.pool.max-active:100}"
)
private
int
maxActive
;
@Value
(
"${spring.redis.pool.min-idle:1}"
)
private
int
minIdle
;
@Value
(
"${spring.redis.pool.max-wait:-1}"
)
private
int
maxWait
;
}
src/main/java/org/rcisoft/core/service/RcRedisService.java
0 → 100644
View file @
4c55cbbc
package
org
.
rcisoft
.
core
.
service
;
import
redis.clients.jedis.Jedis
;
import
java.util.List
;
/**
* Created by lcy on 18/1/8.
*/
public
interface
RcRedisService
{
/**
* 获取jedis
* @return
*/
public
Jedis
getResource
();
/**
* 归还jedis
* @param jedis
*/
public
void
returnResource
(
Jedis
jedis
);
/**
* 赋值
* @param key
* @param value
*/
public
void
set
(
String
key
,
String
value
);
/**
* 赋值字节数组 设置过期
* @param key
* @param value
*/
public
void
set
(
String
key
,
String
value
,
int
expire
);
/**
* 赋值字节数组
* @param key
* @param value
*/
public
void
setBytes
(
String
key
,
byte
[]
value
);
/**
* 赋值字节数组 设置过期
* @param key
* @param value
*/
public
void
setBytes
(
String
key
,
byte
[]
value
,
int
expire
);
/**
* 取值
* @param key
* @return
*/
public
String
get
(
String
key
);
/**
* 取值数组
* @param key
* @return
*/
public
byte
[]
getBytes
(
String
key
);
/**
* 清除
* @param key
*/
public
void
remove
(
String
key
);
/**
* 获取list
* @param key
*/
public
List
getList
(
String
key
);
/**
* set list
* @param key
* @param value
*/
public
void
setList
(
String
key
,
String
value
);
/**
* 从 list 中 删除
* @param key
*/
public
void
removeList
(
String
key
,
String
value
);
}
src/main/java/org/rcisoft/core/service/impl/RcRedisServiceImpl.java
0 → 100644
View file @
4c55cbbc
package
org
.
rcisoft
.
core
.
service
.
impl
;
import
lombok.extern.slf4j.Slf4j
;
import
org.rcisoft.core.service.RcRedisService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
redis.clients.jedis.Jedis
;
import
redis.clients.jedis.JedisPool
;
import
java.util.List
;
/**
* Created by lcy on 18/1/8.
*/
@Service
@Slf4j
public
class
RcRedisServiceImpl
implements
RcRedisService
{
@Autowired
private
JedisPool
jedisPool
;
@Override
public
Jedis
getResource
()
{
return
jedisPool
.
getResource
();
}
@SuppressWarnings
(
"deprecation"
)
@Override
public
void
returnResource
(
Jedis
jedis
)
{
if
(
jedis
!=
null
){
jedisPool
.
returnResourceObject
(
jedis
);
}
}
@Override
public
void
set
(
String
key
,
String
value
,
int
expire
)
{
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
jedis
.
set
(
key
,
value
);
if
(
expire
>
0
)
jedis
.
expire
(
key
,
expire
);
log
.
info
(
"Redis set success - "
+
key
+
", value:"
+
value
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
value
);
}
finally
{
returnResource
(
jedis
);
}
}
@Override
public
void
set
(
String
key
,
String
value
)
{
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
jedis
.
set
(
key
,
value
);
log
.
info
(
"Redis set success - "
+
key
+
", value:"
+
value
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
value
);
}
finally
{
returnResource
(
jedis
);
}
}
@Override
public
void
setBytes
(
String
key
,
byte
[]
value
)
{
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
jedis
.
set
(
key
.
getBytes
(),
value
);
log
.
info
(
"Redis set success - "
+
key
+
", value:"
+
value
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
value
);
}
finally
{
returnResource
(
jedis
);
}
}
@Override
public
void
setBytes
(
String
key
,
byte
[]
value
,
int
expire
)
{
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
jedis
.
set
(
key
.
getBytes
(),
value
);
if
(
expire
>
0
)
jedis
.
expire
(
key
,
expire
);
log
.
info
(
"Redis set success - "
+
key
+
", value:"
+
value
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
value
);
}
finally
{
returnResource
(
jedis
);
}
}
@Override
public
String
get
(
String
key
)
{
String
result
=
null
;
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
result
=
jedis
.
get
(
key
);
log
.
info
(
"Redis get success - "
+
key
+
", value:"
+
result
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
result
);
}
finally
{
returnResource
(
jedis
);
}
return
result
;
}
@Override
public
byte
[]
getBytes
(
String
key
)
{
byte
[]
result
=
null
;
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
result
=
jedis
.
get
(
key
.
getBytes
());
log
.
info
(
"Redis get success - "
+
key
+
", value:"
+
result
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
result
);
}
finally
{
returnResource
(
jedis
);
}
return
result
;
}
@Override
public
void
remove
(
String
key
)
{
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
jedis
.
del
(
key
);
log
.
info
(
"Redis del success - "
+
key
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
);
}
finally
{
returnResource
(
jedis
);
}
}
@Override
public
List
getList
(
String
key
)
{
Jedis
jedis
=
null
;
List
result
=
null
;
try
{
jedis
=
getResource
();
result
=
jedis
.
lrange
(
key
,
0
,
-
1
);
log
.
info
(
"Redis get list success - "
+
key
+
", value:"
+
result
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
);
}
finally
{
returnResource
(
jedis
);
}
return
result
;
}
@Override
public
void
setList
(
String
key
,
String
value
)
{
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
jedis
.
rpush
(
key
,
value
);
log
.
info
(
"Redis set list success - "
+
key
+
", value:"
+
value
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
value
);
}
finally
{
returnResource
(
jedis
);
}
}
@Override
public
void
removeList
(
String
key
,
String
value
)
{
Jedis
jedis
=
null
;
try
{
jedis
=
getResource
();
jedis
.
lrem
(
key
,
1
,
value
);
log
.
info
(
"Redis remove list success - "
+
key
+
", value:"
+
value
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"Redis set error: "
+
e
.
getMessage
()
+
" - "
+
key
+
", value:"
+
value
);
}
finally
{
returnResource
(
jedis
);
}
}
}
src/main/java/org/rcisoft/mqttclient/ClientMQTT.java
0 → 100644
View file @
4c55cbbc
package
org
.
rcisoft
.
mqttclient
;
import
java.util.concurrent.ScheduledExecutorService
;
import
org.eclipse.paho.client.mqttv3.MqttClient
;
import
org.eclipse.paho.client.mqttv3.MqttConnectOptions
;
import
org.eclipse.paho.client.mqttv3.MqttException
;
import
org.eclipse.paho.client.mqttv3.MqttTopic
;
import
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
;
/**
* 模拟一个客户端接收消息
* @author rao
*
*/
public
class
ClientMQTT
{
public
static
final
String
HOST
=
"tcp://106.2.4.91:61613"
;
public
static
final
String
TOPIC1
=
"UPDATA"
;
private
static
final
String
clientid
=
"mqttjs_c3567e038c"
;
private
MqttClient
client
;
private
MqttConnectOptions
options
;
private
String
userName
=
"admin"
;
//非必须
private
String
passWord
=
"password"
;
//非必须
@SuppressWarnings
(
"unused"
)
private
ScheduledExecutorService
scheduler
;
public
void
start
()
{
try
{
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client
=
new
MqttClient
(
HOST
,
clientid
,
new
MemoryPersistence
());
// MQTT的连接设置
options
=
new
MqttConnectOptions
();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
options
.
setCleanSession
(
false
);
// 设置连接的用户名
options
.
setUserName
(
userName
);
// 设置连接的密码
options
.
setPassword
(
passWord
.
toCharArray
());
// 设置超时时间 单位为秒
options
.
setConnectionTimeout
(
10
);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options
.
setKeepAliveInterval
(
60
);
// 设置回调
client
.
setCallback
(
new
PushCallback
());
MqttTopic
topic
=
client
.
getTopic
(
TOPIC1
);
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//遗嘱 options.setWill(topic, "close".getBytes(), 2, true);
client
.
connect
(
options
);
//订阅消息
int
[]
Qos
=
{
1
};
String
[]
topic1
=
{
TOPIC1
};
client
.
subscribe
(
topic1
,
Qos
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
public
static
void
main
(
String
[]
args
)
throws
MqttException
{
ClientMQTT
client
=
new
ClientMQTT
();
client
.
start
();
}
}
src/main/java/org/rcisoft/mqttclient/PushCallback.java
0 → 100644
View file @
4c55cbbc
package
org
.
rcisoft
.
mqttclient
;
import
org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
;
import
org.eclipse.paho.client.mqttv3.MqttCallback
;
import
org.eclipse.paho.client.mqttv3.MqttMessage
;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
*
* public void connectionLost(Throwable cause)在断开连接时调用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
* 由 MqttClient.connect 激活此回调。
*
*/
public
class
PushCallback
implements
MqttCallback
{
public
void
connectionLost
(
Throwable
cause
)
{
//mqtt.connect();
// 连接丢失后,一般在这里面进行重连
System
.
out
.
println
(
"连接断开,可以做重连"
);
}
public
void
deliveryComplete
(
IMqttDeliveryToken
token
)
{
System
.
out
.
println
(
"deliveryComplete---------"
+
token
.
isComplete
());
}
public
void
messageArrived
(
String
topic
,
MqttMessage
message
)
throws
Exception
{
// subscribe后得到的消息会执行到这里面
System
.
out
.
println
(
"接收消息主题 : "
+
topic
);
System
.
out
.
println
(
"接收消息Qos : "
+
message
.
getQos
());
System
.
out
.
println
(
"接收消息内容 : "
+
new
String
(
message
.
getPayload
()));
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment