Steven's profile永远的第一天PhotosBlogLists Tools Help

Blog


    February 11

    volatile实现控制运行一次

    以下是volatile实现控制运行一次的例子
     
    if (!cInitialized) {
                
    synchronized (ServiceManager.class) {
                    
    if (!cInitialized) {
                        
    cInitialized = true;
                    
    } else {
                        
    return;
                    
    }
                
    }
            
    } else {
                
    return;
            
    }
    ...........
    }
    其中cInitialized
     private static volatile boolean cInitialized = false;
     
    这种方法能够确保只运行一次,如果对于运行时间较长,但又未必需要立刻使用,可使用此方法。但注意使用与运行的时间(其他线程虽然判定其不需要初始化运行,但是实际初始化运行工作未必已经完成)
    February 07

    State Pattern

    东东是在看portal中发现的,主要改模式主要用于表示状态,有很好的扩展性,又方便使用
     
    package com.newtouch.portal.core.portlet;

    import javax.portlet.PortletMode;


    public class NewtouchPortletMode extends PortletMode{


        
    public final static PortletMode ABOUT = new PortletMode("about");


        
    public final static PortletMode CONFIG = new PortletMode("config");


        
    public final static PortletMode EDIT_DEFAULTS =

            
    new PortletMode("edit_defaults");


        
    public final static PortletMode PREVIEW = new PortletMode("preview");


        
    public final static PortletMode PRINT = new PortletMode("print");


        
    public NewtouchPortletMode(String name) {

            
    super(name);

        
    }

        

    }

    其中PortletMode为
    package com.newtouch.portal.core.portlet;

    public class PortletMode {

        
        
    private String name = null;

        
        
    public final static PortletMode VIEW = new PortletMode("view");

        
        
    public PortletMode(String name){

            
    this.name = name;

        
    }

        
        
    public boolean equals(PortletMode pm){

            
            
    return pm.name.equals(this.name);

            
        
    }

        
        
    public String toString(){

            
    return name;

        
    }

        
    }

    在windowstate也是使用这样的模式
    December 13

    Ofbiz模式应用(下)

    JUNIT的简单测试用例
    package com.p6spy.engine.sqltrigger.test;

    import java.util.HashMap;


    public class EventTest extends TestCase {

        
    public EventTest(String arg0) {

            
    super(arg0);

            
    // TODO
     Auto-generated constructor stub
        }


        
    public void testEventFactory() {

            
    EventHandler handler = null;

            
    String handlerClass = "com.p6spy.engine.sqltrigger.event.JavaEventHandler";

            
    try {

                
    handler = EventFactory.getInstance().getEventHandler(handlerClass);

            
    } catch (Exception ex) {

                
    ex.printStackTrace();

            
    }

            
    Assert.assertTrue(handler.getClass().getName().equals(handlerClass));

        
    }


        
    public void testEventHandle() {

            
    EventHandler handler = new JavaEventHandler();

            
    Map map = new HashMap();

            
    map.put("tablename", "emp");

            
    try {

                
    String result = (String) handler.invoke(

                        
    "com.p6spy.engine.sqltrigger.test.SimpleEvent",

                        
    "EventTest", map);

                
    Assert.assertEquals("tablename:emp\n", result);

            
    } catch (Exception ex) {

                
    ex.printStackTrace();

            
    }

        
    }


        
    public void testMultiThreadJavaEventHandle() {

            
    EventHandler handler = new MultiThreadJavaEventHandler();

            
    try {

                
    String result = null;

                
    for (int i = 0; i < 50; i++) {

                    
    Map map = new HashMap();

                    
    map.put("tablename", "Employee-" + i);

                    
    result = (String) handler.invoke(

                            
    "com.p6spy.engine.sqltrigger.test.SimpleEvent",

                            
    "EventTest", map);

                
    }

                
    Assert.assertEquals("success", result);

            
    } catch (Exception ex) {

                
    ex.printStackTrace();

            
    }

        
    }

    }

     

    由于event在主线程中运行,所以当有大量event请求而且无须回应的事件时,此方法显然不够高效,所以我们采用前面所说的Work Thread Pattern创建一个MultiThreadJavaPattern

    package com.p6spy.engine.sqltrigger.event;

    import org.apache.commons.logging.Log;



    public class MultiThreadJavaEventHandler implements EventHandler{

        
        
    Channel channel = null;

        
        
    private static Log log = LogFactory.getLog(MultiThreadJavaEventHandler.class);

        
        
    public MultiThreadJavaEventHandler(){

             
    channel = new Channel(3);
     
             
    channel.startProcessors();
     
        
    }


        
    public synchronized Object invoke(String eventPath, String eventMethod, Object context) throws EventHandlerException {

            
    EventRequest request = new EventRequest(eventPath,eventMethod,context);

            
    channel.putRequest(request);

            
    return "success";

        
    }

        
    }


    class EventRequest{

        
    String eventPath = null;

        
    String eventMethod = null;

        
    Object context = null;

        
    private static Log log = LogFactory.getLog(EventRequest.class);

        
    private static final String DEFAULT_HANDLECLASS = "com.p6spy.engine.sqltrigger.event.JavaEventHandler";

        
    public EventRequest(String eventPath, String eventMethod, Object context){

            
    this.eventPath = eventPath;

            
    this.eventMethod = eventMethod;

            
    this.context = context;

            
    log.info("New Event Request"+this);

        
    }

        
        
    public void execute(){

            
    try {

                
    EventFactory.getInstance()

                
    .getEventHandler(DEFAULT_HANDLECLASS)

                
    .invoke(eventPath,

                        
    eventMethod,

                        
    context);

                
    log.debug(Thread.currentThread().getName() + " executes " + this);

            
    } catch (EventHandlerException e) {

                
    // TODO
     Auto-generated catch block
                e.printStackTrace();

            
    }

        
    }

        
        
    //only for debug
         public String toString() {
     
                
    return "[ EventPath " + eventPath +
     
                
    " EventMethod." + eventMethod
     
                
    + " EventMethod." + eventMethod
     
                
    +" ]";
     
            
    }
     
        
    }


    class Channel{

         
    private static final int MAX_REQUEST = 100;
     
            
    private final EventRequest[] requestQueue;
     
            
    private int tail;  
    // 下一个putRequest的地方 
            private int head;  
    // 下一个takeRequest的地方 
            private int count; 
    // Request的数量 

            
    private final EventProcessor[] threadPool; 

            
    public Channel(int threads) {
     
                
    this.requestQueue = new EventRequest[MAX_REQUEST];
     
                
    this.head = 0;
     
                
    this.tail = 0;
     
                
    this.count = 0;
     

                
    threadPool = new EventProcessor[threads];
     
                
    for (int i = 0; i < threadPool.length; i++) {
     
                    
    threadPool[i] = new EventProcessor("Proccessor-" + i, this);
     
                
    }
     
            
    }
     
            
    public void startProcessors() {
     
                
    for (int i = 0; i < threadPool.length; i++) {
     
                    
    threadPool[i].start();
     
                
    }
     
            
    }
     
            
    public synchronized void putRequest(EventRequest request) {
     
                
    while (count >= requestQueue.length) {
     
                    
    try {
     
                        
    wait();
     
                    
    } catch (InterruptedException e) {
     
                    
    }
     
                
    }
     
                
    requestQueue[tail] = request;
     
                
    tail = (tail + 1) % requestQueue.length;
     
                
    count++;
     
                
    notifyAll();
     
            
    }
     
            
    public synchronized EventRequest takeRequest() {
     
                
    while (count <= 0) {
     
                    
    try {
     
                        
    wait();
     
                    
    } catch (InterruptedException e) {
     
                    
    }
     
                
    }
     
                
    EventRequest request = requestQueue[head];
     
                
    head = (head + 1) % requestQueue.length;
     
                
    count--;
     
                
    notifyAll();
     
                
    return request;
     
            
    }
     
    }


    class EventProcessor extends Thread{

        
    private final Channel channel;
     
        
    public EventProcessor(String name, Channel channel) {
     
            
    super(name);
     
            
    this.channel = channel;
     
        
    }
     
        
    public void run() {
     
            
    while (true) {
     
                
    EventRequest request = channel.takeRequest();
     
                
    request.execute();
     
            
    }
     
        
    }
     
    }

    执行JUNIT的测试可见

    - New Event Request[ EventPath com.p6spy.engine.sqltrigger.test.SimpleEvent EventMethod.EventTest EventMethod.EventTest ]
    - New Event Request[ EventPath com.p6spy.engine.sqltrigger.test.SimpleEvent EventMethod.EventTest EventMethod.EventTest ]
    - invoking com.p6spy.engine.sqltrigger.test.SimpleEvent method : EventTest
    - New Event Request[ EventPath com.p6spy.engine.sqltrigger.test.SimpleEvent EventMethod.EventTest EventMethod.EventTest ]
    - invoking com.p6spy.engine.sqltrigger.test.SimpleEvent method : EventTest
    - New Event Request[ EventPath com.p6spy.engine.sqltrigger.test.SimpleEvent EventMethod.EventTest EventMethod.EventTest ]
    - injecting parameter type : java.lang.Object value : {tablename=Employee-1}
    - injecting parameter type : java.lang.Object value : {tablename=Employee-0}
    - invoking com.p6spy.engine.sqltrigger.test.SimpleEvent method : EventTest
    - injecting parameter type : java.lang.Object value : {tablename=Employee-2}
    - object is instance of Map .
    - [Event Return]: tablename:Employee-1

    - invoking com.p6spy.engine.sqltrigger.test.SimpleEvent method : EventTest
    - New Event Request[ EventPath com.p6spy.engine.sqltrigger.test.SimpleEvent EventMethod.EventTest EventMethod.EventTest ]
    - object is instance of Map .
    - injecting parameter type : java.lang.Object value : {tablename=Employee-3}
    - object is instance of Map .
    - New Event Request[ EventPath com.p6spy.engine.sqltrigger.test.SimpleEvent EventMethod.EventTest EventMethod.EventTest ]
    - [Event Return]: tablename:Employee-0

    。。。。。。。。。。。。。。。。。。。。。。。。。。。。

     

    PS:如果全部复制上面代码,只要加入相应的包,程序应该能够跑起来,以后也会尽肯能向这方面努力。可惜就是好象这个东东对字数有限制。。。就象这样要一拆二,以后真的不行就直接写在代码的注释里面了。如有问题,可随时加此MSN与本人交流

    Ofbiz模式应用(上)

      ofibz有多种EventHandle来处理各类事物,如JAVA,SOAP,JMS,Bsf等等。
      EventHandle接口只定义一个方法
    package com.p6spy.engine.sqltrigger.event;

    public interface EventHandler {

        
        
        
    public Object invoke(String eventPath, String eventMethod, Object context)

                
    throws EventHandlerException;

    }

    对于EventHandle的生产ofbiz采用了Simple Factory+singlton+double check的处理方法。此类方法主要适用于需要单态,对象较大的情况,在ofbiz中非常常见(本示例代码为了满足自身需要稍微改动了些源码)。
    package com.p6spy.engine.sqltrigger.event;

    import java.util.HashMap;

    import java.util.Map;


    public class EventFactory {


        
    protected Map handlers = null;


        
    private static EventFactory eventFactory = null;


        
    private EventFactory() {

            
    handlers = new HashMap();

        
    }

        
        
    //EventFactory 的单态方法
        public static EventFactory getInstance() {

            
    if (eventFactory == null) {

                
    synchronized (EventFactory.class) {

                    
    if (eventFactory == null) {

                        
    eventFactory = new EventFactory();

                    
    }

                
    }

            
    }

            
    return eventFactory;

        
    }

        
        
    //EventHandler的处理方法()删减了处理方法和别名的映射
        public EventHandler getEventHandler(String handlerClass)

                
    throws EventHandlerException {

            
    // attempt to get a pre-loaded handler
            EventHandler handler = (EventHandler) handlers.get(handlerClass);


            
    if (handler == null) {

                
    synchronized (EventHandler.class) {

                    
    handler = (EventHandler) handlers.get(handlerClass);

                    
    if (handler == null) {


                        
    try {

                            
    Class c = Thread.currentThread()

                                    
    .getContextClassLoader()

                                    
    .loadClass(handlerClass);

                            
    handler = (EventHandler) c.newInstance();

                            
    handlers.put(handlerClass, handler);


                        
    } catch (ClassNotFoundException cnf) {

                            
    throw new EventHandlerException(

                                    
    "Cannot load handler class", cnf);

                        
    } catch (InstantiationException ie) {

                            
    throw new EventHandlerException(

                                    
    "Cannot get instance of the handler", ie);

                        
    } catch (IllegalAccessException iae) {

                            
    throw new EventHandlerException(iae.getMessage(), iae);

                        
    }

                    
    }

                
    }

                
    if (handler == null)

                    
    throw new EventHandlerException("Invalid handler");

            
    }

            
    return handler;

        
    }


    }

    JaveEventHandle处理简单java方法,主要采用java的反射机制
    package com.p6spy.engine.sqltrigger.event;

    import java.lang.reflect.Method;

    import java.util.HashMap;

    import java.util.Map;


    import org.apache.commons.logging.Log;

    import org.apache.commons.logging.LogFactory;


    public class JavaEventHandler implements EventHandler {


        
    private Map eventClassMap = new HashMap();


        
    private static Log log = LogFactory.getLog(JavaEventHandler.class);


        
    public Object invoke(String eventPath, String eventMethod, Object context)

                
    throws EventHandlerException {

            
    Class eventClass = (Class) this.eventClassMap.get(eventPath);

            
    //又是典型的单态+DOUBLE CHECK模式
            if (eventClass == null) {

                
    synchronized (this) {

                    
    eventClass = (Class) this.eventClassMap.get(eventPath);

                    
    if (eventClass == null) {

                        
    try {

                            
    ClassLoader loader = Thread.currentThread()

                                    
    .getContextClassLoader();

                            
    eventClass = loader.loadClass(eventPath);

                        
    } catch (ClassNotFoundException e) {

                            
    log.error("Error loading class with name: " + eventPath

                                    
    + ", will not be able to run event...");

                        
    }

                        
    if (eventClass != null) {

                            
    eventClassMap.put(eventPath, eventClass);

                        
    }

                    
    }

                
    }

            
    }


            
    //原来主要处理Request 和 Response
            
    //现今改为简单的一个Object作为通用参数
            Class[] paramTypes = new Class[] { Object.class };


            
    Object[] params = new Object[] { context };


            
    return invoke(eventPath, eventMethod, eventClass, paramTypes, params);

        
    }


        
    private Object invoke(String eventPath, String eventMethod,

                
    Class eventClass, Class[] paramTypes, Object[] params)

                
    throws EventHandlerException {

            
    if (eventClass == null) {

                
    throw new EventHandlerException("Error invoking event, the class "

                        
    + eventPath + " was not found");

            
    }

            
    if (eventPath == null || eventMethod == null) {

                
    throw new EventHandlerException(

                        
    "Invalid event method or path; call initialize()");

            
    }


            
    try {

                
    log.info("invoking " + eventClass.getName() + " method : "

                        
    + eventMethod);

                
    for (int i = 0; i < paramTypes.length; i++) {

                    
    log.info("injecting parameter type : "

                            
    + paramTypes[i].getName() + " value : "

                            
    + params[i].toString());

                
    }


                
    Method m = eventClass.getMethod(eventMethod, paramTypes);

                
    Object objReturn = m.invoke(null, params);


                
    log.info("[Event Return]: " + objReturn);

                
    return objReturn.toString();

            
    } catch (java.lang.reflect.InvocationTargetException e) {

                
    Throwable t = e.getTargetException();


                
    if (t != null) {

                    
    log.error("Problems Processing Event : " + e);

                    
    throw new EventHandlerException("Problems processing event: "

                            
    + t.toString(), t);

                
    } else {

                    
    log.error("Problems Processing Event : " + e);

                    
    throw new EventHandlerException("Problems processing event: "

                            
    + e.toString(), e);

                
    }

            
    } catch (Exception e) {

                
    log.error("Problems Processing Event : " + e);

                
    throw new EventHandlerException("Problems processing event: "

                        
    + e.toString(), e);

            
    }

        
    }

    }

    我们现今写个简单的测试类
    package com.p6spy.engine.sqltrigger.test;

    import java.util.Iterator;


    public class SimpleEvent {

        
        
    private static Log log = LogFactory.getLog(SimpleEvent.class);

        
        
    public synchronized static String EventTest(Object obj) {

            
            
    String result = "";

            
            
    //this is only for simple test
            if(obj instanceof java.util.Map){

                
                
    log.info("object is instance of Map .");

                
                
    Map context = (Map)obj;

                
    Set keySet = ((Map)context).keySet();

                
    Iterator itr = keySet.iterator();

                
                
    StringBuffer sb = new StringBuffer(result);

                
    while (itr.hasNext()) {

                    
    String key = itr.next().toString();

                    
    sb.append(key + ":").append((String) ((Map)context).get(key)).append("\n");

                
    }

                
    result = sb.toString();

            
    }

            
            
    // generic speaking , this is useful
            if(obj instanceof Parser){

                
    log.info("object is instance of Parser .");

                
    Parser context = (Parser)obj;

                
    result = context.getTableName();

            
    }

            
            
    return result;

        
    }

    }

    注意:一般而言好象Event都不加同步限制,这个显然是线程不安全的,但是加的话可能影响性能,那该怎么办呢?
    December 12

    线程设计模式之八(Work Thread Pattern)

      今天决定开始正式写点东西了,希望能对学习JAVA的同道们有点帮助,和大家共同进步。这次设计模式主要是以一个实际项目而展开———P6SPY-EX,其主要完成功能是解析SQL语句,通过XML配置,触发相关的普通JAVA程序,JMS或SOAP等程序。涉及内容包括OFBIZ,HSQL,XSTREAM,P6SPY等,充分利用各个开源项目的优势,满足自己的需求,写出高效的代码。
     
      当要同时触发大量EVENT时候,靠单线程显然是不行的,但是用Thread-Per-Message Pattern显然也是不行的(对每个请求开启个线程),所以我门需要采用Work Thread Pattern,一个类不断的提交Request,然后开启几个线程处理Request,处理结束就等待,直至再有Request来。
     
      现在就举个比较典型的例子 
      一个发送请求的客户端(这里客户端也是多线程的,可以满足同时有多个客户提出Request)
    import java.util.Random;

    public class ClientThread extends Thread {

        
    private final Channel channel;

        
    private static final Random random = new Random();

        
    public ClientThread(String name, Channel channel) {

            
    super(name);

            
    this.channel = channel;

        
    }

        
    public void run() {

            
    try {

                
    for (int i = 0; true; i++) {

                    
    Request request = new Request(getName(), i);

                    
    channel.putRequest(request);

                    
    Thread.sleep(random.nextInt(1000));

                
    }

            
    } catch (InterruptedException e) {

            
    }

        
    }

    }

     

    一个是表示工作请求Request.java

    import java.util.Random;

    public class Request {

        
    private final String name; 
    //  委托者
        private final int number;  
    // 请求编号
        private static final Random random = new Random();

        
    public Request(String name, int number) {

            
    this.name = name;

            
    this.number = number;

        
    }

        
    public void execute() {

            
    System.out.println(Thread.currentThread().getName() + " executes " + this);

            
    try {

                
    Thread.sleep(random.nextInt(1000));

            
    } catch (InterruptedException e) {

            
    }

        
    }

        
    public String toString() {

            
    return "[ Request from " + name + " No." + number + " ]";

        
    }

    }

     
    一个是用以存放请求的Channel,以FIFO方式
    public class Channel {
        
    private static final int MAX_REQUEST = 100;

        
    private final Request[] requestQueue;

        
    private int tail;  
    // 下一个putRequest的地方
        private int head;  
    // 下一个takeRequest的地方
        private int count; 
    // Request的数量

        
    private final WorkerThread[] threadPool;

        
    public Channel(int threads) {

            
    this.requestQueue = new Request[MAX_REQUEST];

            
    this.head = 0;

            
    this.tail = 0;

            
    this.count = 0;


            
    threadPool = new WorkerThread[threads];

            
    for (int i = 0; i < threadPool.length; i++) {

                
    threadPool[i] = new WorkerThread("Worker-" + i, this);

            
    }

        
    }

        
    public void startWorkers() {

            
    for (int i = 0; i < threadPool.length; i++) {

                
    threadPool[i].start();

            
    }

        
    }

        
    public synchronized void putRequest(Request request) {

            
    while (count >= requestQueue.length) {

                
    try {

                    
    wait();

                
    } catch (InterruptedException e) {

                
    }

            
    }

            
    requestQueue[tail] = request;

            
    tail = (tail + 1) % requestQueue.length;

            
    count++;

            
    notifyAll();

        
    }

        
    public synchronized Request takeRequest() {

            
    while (count <= 0) {

                
    try {

                    
    wait();

                
    } catch (InterruptedException e) {

                
    }

            
    }

            
    Request request = requestQueue[head];

            
    head = (head + 1) % requestQueue.length;

            
    count--;

            
    notifyAll();

            
    return request;

        
    }

    }

     

    最后一个是工作类WorkThread.java

    public class WorkerThread extends Thread {
        
    private final Channel channel;

        
    public WorkerThread(String name, Channel channel) {

            
    super(name);

            
    this.channel = channel;

        
    }

        
    public void run() {

            
    while (true) {

                
    Request request = channel.takeRequest();

                
    request.execute();

            
    }

        
    }

    }

     
    使用方法:Main.java
    public class Main {
        
        
    public static void main(String[] args) {

            
            
    //开5个工人同时工作
            Channel channel = new Channel(5);

            
    channel.startWorkers();

            
            
    //三个人同时发出各自的请求
            new ClientThread("Alice", channel).start();

            
    new ClientThread("Bobby", channel).start();

            
    new ClientThread("Chris", channel).start();

        
    }

        
    }

     
    显示结果如下:
    Worker-1 executes [ Request from Alice No.0 ]
    Worker-0 executes [ Request from Bobby No.0 ]
    Worker-4 executes [ Request from Chris No.0 ]
    Worker-2 executes [ Request from Alice No.1 ]
    Worker-3 executes [ Request from Bobby No.1 ]
    Worker-1 executes [ Request from Alice No.2 ]
    Worker-0 executes [ Request from Chris No.1 ]
    Worker-4 executes [ Request from Chris No.2 ]
    Worker-1 executes [ Request from Chris No.3 ]
    Worker-3 executes [ Request from Bobby No.2 ]
     
     
    ps:第一次写,对于这个咚咚怎么弄还不太熟悉 :-(