欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

java memcache 队列_基于memcache的java分布式队列实现。

发布时间:2023/12/1 编程问答 43 豆豆
生活随笔 收集整理的这篇文章主要介绍了 java memcache 队列_基于memcache的java分布式队列实现。 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

主要有两个类,一个队列类和一个job的抽象类。

保证队列类中的key的唯一性,就可以用spring配置多个实例。水平有限,欢迎吐槽。

上代码:

1、队列类

import net.spy.memcached.MemcachedClient;

import net.spy.memcached.internal.OperationFuture;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import com.izx.services.common.Constant;

/**

*

* @ClassName: MemCacheQueue

* @Description: 基于memcache的消息队列的实现

* @author hai.zhu

* @date 2016-3-31 下午3:29:00

*

*/

public class MemCacheQueue implements InitializingBean, DisposableBean,ApplicationContextAware {

private static final Log log = LogFactory.getLog(MemCacheQueue.class);

/**

* 队列名

*/

private String key;

/**

* 队列锁失效分钟

*/

private Integer lockExpireMinite = 3;

private MemcachedClient memcachedClient;

private ApplicationContext applicationContext;

ListenerThread listenerThread = new ListenerThread();

public void setKey(String key) {

this.key = key;

}

public void setMemcachedClient(MemcachedClient memcachedClient) {

this.memcachedClient = memcachedClient;

}

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

@Override

public void destroy() throws Exception {

try {

this.sign = false;

listenerThread.interrupt();

} catch (Exception e) {

log.error(e);

}

}

@Override

public void afterPropertiesSet() throws Exception {

//初始化队列,用add防止重启覆盖

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 0, "0");

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 0, "0");

//设置任务线程

listenerThread.setDaemon(true);

listenerThread.start();

}

/**

*

* @Title: push

* @Description: 唯一对外方法,放入要执行的任务

* @param @param value

* @param @throws Exception    设定文件

* @return void    返回类型

* @throws

*/

public synchronized void push(MemCacheQueueJobAdaptor value) throws Exception {

//分布加锁

queuelock();

//放入队列

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 1);

Object keyorder = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);

memcachedClient.set(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorder, 0, value);

//分布解锁

queueUnLock();

}

/**

*

* @Title: pop

* @Description: 取出要执行的任务

* @param @return

* @param @throws Exception    设定文件

* @return MemCacheQueueJobAdaptor    返回类型

* @throws

*/

private synchronized MemCacheQueueJobAdaptor pop() throws Exception {

Object keyorderstart = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key);

Object keyorderend = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);

if(keyorderstart.equals(keyorderend)){

return null;

}

MemCacheQueueJobAdaptor adaptor = (MemCacheQueueJobAdaptor)memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 1);

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);

return adaptor;

}

/**

*

* @Title: queuelock

* @Description: 加锁

* @param @throws InterruptedException    设定文件

* @return void    返回类型

* @throws

*/

private void queuelock() throws Exception {

do {

OperationFuture sign = memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key, lockExpireMinite * 60, key);

if(sign.get()){

return;

} else {

log.debug("key: " + key + " locked by another business");

}

Thread.sleep(300);

} while (true);

}

/**

*

* @Title: queueUnLock

* @Description: 解锁

* @param     设定文件

* @return void    返回类型

* @throws

*/

private void queueUnLock() {

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key);

}

private boolean sign = true;

private long THREAD_SLEEP = 10;

class ListenerThread extends Thread {

@Override

public void run(){

log.error("队列["+key+"]开始执行");

while(sign){

try {

Thread.sleep(THREAD_SLEEP);

dojob();

} catch (Exception e) {

log.error(e);

}

}

}

private void dojob(){

try{

queuelock();

MemCacheQueueJobAdaptor adaptor = pop();

//逐个执行

if(adaptor != null){

THREAD_SLEEP = 10;

try {

adaptor.setApplicationContext(applicationContext);

adaptor.onMessage();

} catch (Exception e) {

log.error(e);

}

}else{

THREAD_SLEEP = 5000;

}

}catch(Exception e){

log.error(e);

}finally{

queueUnLock();

}

}

}

}[/code]

2、job抽象类

import org.springframework.context.ApplicationContext;

import java.io.Serializable;

/**

*

* @ClassName: MemCacheQueueJobAdaptor

* @Description: 基于memcache队列的任务适配器

* @author hai.zhu

* @date 2015-12-11 上午11:48:26

* @param 

*/

public abstract class MemCacheQueueJobAdaptor implements Serializable{

private static final long serialVersionUID = -5071415952097756327L;

private ApplicationContext applicationContext;

public ApplicationContext getApplicationContext() {

return applicationContext;

}

public void setApplicationContext(ApplicationContext applicationContext) {

this.applicationContext = applicationContext;

}

/**

*

* @Title: onMessage

* @Description: 异步执行任务接口

* @author hai.zhu

* @param @param value 设定文件

* @return void 返回类型

* @throws

*/

public abstract void onMessage();

}[/code]

3、部分放在constant的常量

/**

* 基于memcache的队列存放前缀

*/

public static String MEMCACHE_GLOBAL_QUEUE_VARIABLE = "MEMCACHE_GLOBAL_QUEUE_VARIABLE_";

/**

* 基于memcache的队列锁的前缀

*/

public static String MEMCACHE_GLOBAL_QUEUE_LOCK = "MEMCACHE_GLOBAL_QUEUE_LOCK_";

/**

* 基于memcache的队列锁的开始元素

*/

public static String MEMCACHE_GLOBAL_QUEUE_STARTKEY = "MEMCACHE_GLOBAL_QUEUE_STARTKEY_";

/**

* 基于memcache的队列锁的结束元素

*/

public static String MEMCACHE_GLOBAL_QUEUE_ENDKEY = "MEMCACHE_GLOBAL_QUEUE_ENDKEY_";[/code]

4、spring配置,保证队列名的唯一性就可以配置多个队列

转载于:https://my.oschina.net/zhuxuan/blog/650935

总结

以上是生活随笔为你收集整理的java memcache 队列_基于memcache的java分布式队列实现。的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。