| Steven's profile永远的第一天PhotosBlogLists | Help |
|
|
February 11 volatile实现控制运行一次以下是volatile实现控制运行一次的例子
if (!cInitialized) {
synchronized (ServiceManager.class) { if (!cInitialized) { cInitialized = true; } else { return; } } } else { return; } ...........
}
其中cInitialized为
private static volatile boolean cInitialized = false;
这种方法能够确保只运行一次,如果对于运行时间较长,但又未必需要立刻使用,可使用此方法。但注意使用与运行的时间(其他线程虽然判定其不需要初始化运行,但是实际初始化运行工作未必已经完成) February 07 State Pattern东东是在看portal中发现的,主要改模式主要用于表示状态,有很好的扩展性,又方便使用
package com.newtouch.portal.core.portlet;
import javax.portlet.PortletMode; public class NewtouchPortletMode extends PortletMode{ public final static PortletMode ABOUT = new PortletMode("about"); public final static PortletMode CONFIG = new PortletMode("config"); public final static PortletMode EDIT_DEFAULTS = new PortletMode("edit_defaults"); public final static PortletMode PREVIEW = new PortletMode("preview"); public final static PortletMode PRINT = new PortletMode("print"); public NewtouchPortletMode(String name) { super(name); } } 其中PortletMode为
package com.newtouch.portal.core.portlet; 在windowstate也是使用这样的模式public class PortletMode { private String name = null; public final static PortletMode VIEW = new PortletMode("view"); public PortletMode(String name){ this.name = name; } public boolean equals(PortletMode pm){ return pm.name.equals(this.name); } public String toString(){ return name; } } December 13 Ofbiz模式应用(下)JUNIT的简单测试用例
package com.p6spy.engine.sqltrigger.test;
import java.util.HashMap; public class EventTest extends TestCase { public EventTest(String arg0) { super(arg0); // TODO Auto-generated constructor stub } public void testEventFactory() { EventHandler handler = null; String handlerClass = "com.p6spy.engine.sqltrigger.event.JavaEventHandler"; try { handler = EventFactory.getInstance().getEventHandler(handlerClass); } catch (Exception ex) { ex.printStackTrace(); } Assert.assertTrue(handler.getClass().getName().equals(handlerClass)); } public void testEventHandle() { EventHandler handler = new JavaEventHandler(); Map map = new HashMap(); map.put("tablename", "emp"); try { String result = (String) handler.invoke( "com.p6spy.engine.sqltrigger.test.SimpleEvent", "EventTest", map); Assert.assertEquals("tablename:emp\n", result); } catch (Exception ex) { ex.printStackTrace(); } } public void testMultiThreadJavaEventHandle() { EventHandler handler = new MultiThreadJavaEventHandler(); try { String result = null; for (int i = 0; i < 50; i++) { Map map = new HashMap(); map.put("tablename", "Employee-" + i); result = (String) handler.invoke( "com.p6spy.engine.sqltrigger.test.SimpleEvent", "EventTest", map); } Assert.assertEquals("success", result); } catch (Exception ex) { ex.printStackTrace(); } } }
由于event在主线程中运行,所以当有大量event请求而且无须回应的事件时,此方法显然不够高效,所以我们采用前面所说的Work Thread Pattern创建一个MultiThreadJavaPattern package com.p6spy.engine.sqltrigger.event;
import org.apache.commons.logging.Log; public class MultiThreadJavaEventHandler implements EventHandler{ Channel channel = null; private static Log log = LogFactory.getLog(MultiThreadJavaEventHandler.class); public MultiThreadJavaEventHandler(){ channel = new Channel(3); channel.startProcessors(); } public synchronized Object invoke(String eventPath, String eventMethod, Object context) throws EventHandlerException { EventRequest request = new EventRequest(eventPath,eventMethod,context); channel.putRequest(request); return "success"; } } class EventRequest{ String eventPath = null; String eventMethod = null; Object context = null; private static Log log = LogFactory.getLog(EventRequest.class); private static final String DEFAULT_HANDLECLASS = "com.p6spy.engine.sqltrigger.event.JavaEventHandler"; public EventRequest(String eventPath, String eventMethod, Object context){ this.eventPath = eventPath; this.eventMethod = eventMethod; this.context = context; log.info("New Event Request"+this); } public void execute(){ try { EventFactory.getInstance() .getEventHandler(DEFAULT_HANDLECLASS) .invoke(eventPath, eventMethod, context); log.debug(Thread.currentThread().getName() + " executes " + this); } catch (EventHandlerException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //only for debug public String toString() { return "[ EventPath " + eventPath + " EventMethod." + eventMethod + " EventMethod." + eventMethod +" ]"; } } class Channel{ private static final int MAX_REQUEST = 100; private final EventRequest[] requestQueue; private int tail; // 下一个putRequest的地方 private int head; // 下一个takeRequest的地方 private int count; // Request的数量 private final EventProcessor[] threadPool; public Channel(int threads) { this.requestQueue = new EventRequest[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0; threadPool = new EventProcessor[threads]; for (int i = 0; i < threadPool.length; i++) { threadPool[i] = new EventProcessor("Proccessor-" + i, this); } } public void startProcessors() { for (int i = 0; i < threadPool.length; i++) { threadPool[i].start(); } } public synchronized void putRequest(EventRequest request) { while (count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { } } requestQueue[tail] = request; tail = (tail + 1) % requestQueue.length; count++; notifyAll(); } public synchronized EventRequest takeRequest() { while (count <= 0) { try { wait(); } catch (InterruptedException e) { } } EventRequest request = requestQueue[head]; head = (head + 1) % requestQueue.length; count--; notifyAll(); return request; } } class EventProcessor extends Thread{ private final Channel channel; public EventProcessor(String name, Channel channel) { super(name); this.channel = channel; } public void run() { while (true) { EventRequest request = channel.takeRequest(); request.execute(); } } } 执行JUNIT的测试可见 - New Event Request[ EventPath com.p6spy.engine.sqltrigger.test.SimpleEvent EventMethod.EventTest EventMethod.EventTest ] - invoking com.p6spy.engine.sqltrigger.test.SimpleEvent method : EventTest 。。。。。。。。。。。。。。。。。。。。。。。。。。。。
PS:如果全部复制上面代码,只要加入相应的包,程序应该能够跑起来,以后也会尽肯能向这方面努力。可惜就是好象这个东东对字数有限制。。。就象这样要一拆二,以后真的不行就直接写在代码的注释里面了。如有问题,可随时加此MSN与本人交流 Ofbiz模式应用(上) ofibz有多种EventHandle来处理各类事物,如JAVA,SOAP,JMS,Bsf等等。
EventHandle接口只定义一个方法
package com.p6spy.engine.sqltrigger.event;
public interface EventHandler { public Object invoke(String eventPath, String eventMethod, Object context) throws EventHandlerException; } 对于EventHandle的生产ofbiz采用了Simple Factory+singlton+double check的处理方法。此类方法主要适用于需要单态,对象较大的情况,在ofbiz中非常常见(本示例代码为了满足自身需要稍微改动了些源码)。
package com.p6spy.engine.sqltrigger.event;
import java.util.HashMap; import java.util.Map; public class EventFactory { protected Map handlers = null; private static EventFactory eventFactory = null; private EventFactory() { handlers = new HashMap(); } //EventFactory 的单态方法 public static EventFactory getInstance() { if (eventFactory == null) { synchronized (EventFactory.class) { if (eventFactory == null) { eventFactory = new EventFactory(); } } } return eventFactory; } //EventHandler的处理方法()删减了处理方法和别名的映射 public EventHandler getEventHandler(String handlerClass) throws EventHandlerException { // attempt to get a pre-loaded handler EventHandler handler = (EventHandler) handlers.get(handlerClass); if (handler == null) { synchronized (EventHandler.class) { handler = (EventHandler) handlers.get(handlerClass); if (handler == null) { try { Class c = Thread.currentThread() .getContextClassLoader() .loadClass(handlerClass); handler = (EventHandler) c.newInstance(); handlers.put(handlerClass, handler); } catch (ClassNotFoundException cnf) { throw new EventHandlerException( "Cannot load handler class", cnf); } catch (InstantiationException ie) { throw new EventHandlerException( "Cannot get instance of the handler", ie); } catch (IllegalAccessException iae) { throw new EventHandlerException(iae.getMessage(), iae); } } } if (handler == null) throw new EventHandlerException("Invalid handler"); } return handler; } } JaveEventHandle处理简单java方法,主要采用java的反射机制
package com.p6spy.engine.sqltrigger.event;
import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class JavaEventHandler implements EventHandler { private Map eventClassMap = new HashMap(); private static Log log = LogFactory.getLog(JavaEventHandler.class); public Object invoke(String eventPath, String eventMethod, Object context) throws EventHandlerException { Class eventClass = (Class) this.eventClassMap.get(eventPath); //又是典型的单态+DOUBLE CHECK模式 if (eventClass == null) { synchronized (this) { eventClass = (Class) this.eventClassMap.get(eventPath); if (eventClass == null) { try { ClassLoader loader = Thread.currentThread() .getContextClassLoader(); eventClass = loader.loadClass(eventPath); } catch (ClassNotFoundException e) { log.error("Error loading class with name: " + eventPath + ", will not be able to run event..."); } if (eventClass != null) { eventClassMap.put(eventPath, eventClass); } } } } //原来主要处理Request 和 Response //现今改为简单的一个Object作为通用参数 Class[] paramTypes = new Class[] { Object.class }; Object[] params = new Object[] { context }; return invoke(eventPath, eventMethod, eventClass, paramTypes, params); } private Object invoke(String eventPath, String eventMethod, Class eventClass, Class[] paramTypes, Object[] params) throws EventHandlerException { if (eventClass == null) { throw new EventHandlerException("Error invoking event, the class " + eventPath + " was not found"); } if (eventPath == null || eventMethod == null) { throw new EventHandlerException( "Invalid event method or path; call initialize()"); } try { log.info("invoking " + eventClass.getName() + " method : " + eventMethod); for (int i = 0; i < paramTypes.length; i++) { log.info("injecting parameter type : " + paramTypes[i].getName() + " value : " + params[i].toString()); } Method m = eventClass.getMethod(eventMethod, paramTypes); Object objReturn = m.invoke(null, params); log.info("[Event Return]: " + objReturn); return objReturn.toString(); } catch (java.lang.reflect.InvocationTargetException e) { Throwable t = e.getTargetException(); if (t != null) { log.error("Problems Processing Event : " + e); throw new EventHandlerException("Problems processing event: " + t.toString(), t); } else { log.error("Problems Processing Event : " + e); throw new EventHandlerException("Problems processing event: " + e.toString(), e); } } catch (Exception e) { log.error("Problems Processing Event : " + e); throw new EventHandlerException("Problems processing event: " + e.toString(), e); } } } 我们现今写个简单的测试类
package com.p6spy.engine.sqltrigger.test;
import java.util.Iterator; public class SimpleEvent { private static Log log = LogFactory.getLog(SimpleEvent.class); public synchronized static String EventTest(Object obj) { String result = ""; //this is only for simple test if(obj instanceof java.util.Map){ log.info("object is instance of Map ."); Map context = (Map)obj; Set keySet = ((Map)context).keySet(); Iterator itr = keySet.iterator(); StringBuffer sb = new StringBuffer(result); while (itr.hasNext()) { String key = itr.next().toString(); sb.append(key + ":").append((String) ((Map)context).get(key)).append("\n"); } result = sb.toString(); } // generic speaking , this is useful if(obj instanceof Parser){ log.info("object is instance of Parser ."); Parser context = (Parser)obj; result = context.getTableName(); } return result; } } 注意:一般而言好象Event都不加同步限制,这个显然是线程不安全的,但是加的话可能影响性能,那该怎么办呢? December 12 线程设计模式之八(Work Thread Pattern) 今天决定开始正式写点东西了,希望能对学习JAVA的同道们有点帮助,和大家共同进步。这次设计模式主要是以一个实际项目而展开———P6SPY-EX,其主要完成功能是解析SQL语句,通过XML配置,触发相关的普通JAVA程序,JMS或SOAP等程序。涉及内容包括OFBIZ,HSQL,XSTREAM,P6SPY等,充分利用各个开源项目的优势,满足自己的需求,写出高效的代码。
当要同时触发大量EVENT时候,靠单线程显然是不行的,但是用Thread-Per-Message Pattern显然也是不行的(对每个请求开启个线程),所以我门需要采用Work Thread Pattern,一个类不断的提交Request,然后开启几个线程处理Request,处理结束就等待,直至再有Request来。
现在就举个比较典型的例子
一个发送请求的客户端(这里客户端也是多线程的,可以满足同时有多个客户提出Request)
import java.util.Random;
public class ClientThread extends Thread { private final Channel channel; private static final Random random = new Random(); public ClientThread(String name, Channel channel) { super(name); this.channel = channel; } public void run() { try { for (int i = 0; true; i++) { Request request = new Request(getName(), i); channel.putRequest(request); Thread.sleep(random.nextInt(1000)); } } catch (InterruptedException e) { } } }
一个是表示工作请求Request.java import java.util.Random;
public class Request { private final String name; // 委托者 private final int number; // 请求编号 private static final Random random = new Random(); public Request(String name, int number) { this.name = name; this.number = number; } public void execute() { System.out.println(Thread.currentThread().getName() + " executes " + this); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } public String toString() { return "[ Request from " + name + " No." + number + " ]"; } } 一个是用以存放请求的Channel,以FIFO方式
public class Channel {
private static final int MAX_REQUEST = 100; private final Request[] requestQueue; private int tail; // 下一个putRequest的地方 private int head; // 下一个takeRequest的地方 private int count; // Request的数量 private final WorkerThread[] threadPool; public Channel(int threads) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0; threadPool = new WorkerThread[threads]; for (int i = 0; i < threadPool.length; i++) { threadPool[i] = new WorkerThread("Worker-" + i, this); } } public void startWorkers() { for (int i = 0; i < threadPool.length; i++) { threadPool[i].start(); } } public synchronized void putRequest(Request request) { while (count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { } } requestQueue[tail] = request; tail = (tail + 1) % requestQueue.length; count++; notifyAll(); } public synchronized Request takeRequest() { while (count <= 0) { try { wait(); } catch (InterruptedException e) { } } Request request = requestQueue[head]; head = (head + 1) % requestQueue.length; count--; notifyAll(); return request; } }
最后一个是工作类WorkThread.java public class WorkerThread extends Thread {
private final Channel channel; public WorkerThread(String name, Channel channel) { super(name); this.channel = channel; } public void run() { while (true) { Request request = channel.takeRequest(); request.execute(); } } } 使用方法:Main.java
public class Main { public static void main(String[] args) { //开5个工人同时工作 Channel channel = new Channel(5); channel.startWorkers(); //三个人同时发出各自的请求 new ClientThread("Alice", channel).start(); new ClientThread("Bobby", channel).start(); new ClientThread("Chris", channel).start(); } } 显示结果如下:
Worker-1 executes [ Request from Alice No.0 ]
Worker-0 executes [ Request from Bobby No.0 ] Worker-4 executes [ Request from Chris No.0 ] Worker-2 executes [ Request from Alice No.1 ] Worker-3 executes [ Request from Bobby No.1 ] Worker-1 executes [ Request from Alice No.2 ] Worker-0 executes [ Request from Chris No.1 ] Worker-4 executes [ Request from Chris No.2 ] Worker-1 executes [ Request from Chris No.3 ] Worker-3 executes [ Request from Bobby No.2 ] ps:第一次写,对于这个咚咚怎么弄还不太熟悉 :-(
|
|
|