博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
如何使用观测者模式实现监控和推送
阅读量:6883 次
发布时间:2019-06-27

本文共 9554 字,大约阅读时间需要 31 分钟。

       观测者模式已在博客""中介绍,下面说下如何将观察者模式应用在实际工作中。

问题描述

       某业务系统会定期接收到传回来的数据,数据放在一个目录下。由于业务的需要,当有新的数据产生时,需要将数据上传到多台机器上。你如何设计这个业务逻辑呢?

功能设计

       放在目录下的数据时不断更新的,我们需要一个守护线程来监控目录下数据的变化,当有新数据时就通知观测者observers。这里的观测者是需要将数据上传到FTP服务器的对象,当有新数据产生时,就上传数据到FTP服务器。

        这里很适合用观测者模式来解决,其中subject的功能是监控目录变化,和通知观测者变化的数据。观测者的功能是上传新的数据到FTP服务器,这里有多个观测者,而且虽这业务的发展,观察者的数目是变化的。

采用观测者模式,可以在不修改代码的情况下,很容易的添加观测者。

详细设计

监控目录变化的subject:
import java.io.File;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Observable;import zyang.designPattern.observerPattern.Observer;import zyang.designPattern.observerPattern.Subject;/** * Fuction: * 一个守护线程,用户监控目录下是否有新数据 * 如果有新数据,通知监听对象observers * @author zhonghua  * @version 2013-3-20 下午9:25:56 * @since 1.0 */public class DirectoryMonitorSubject extends Observable implements Runnable {		// -------------------------------------------------	// properties	// -------------------------------------------------    /**     * Whether or not this thread is active.     */    private boolean active = false;    /**     * The interval in seconds to run this thread     */    private int interval = -1;    /**     * The name of this thread     */    private String threadName;    /**     * This instance's thread     */    private Thread runner;        /**     * 监控目录     */    private String directoryFullPath;        /**     * The map of last recorded files and their timestamps (String fileName => Long lastMod)     */    private Map prevDatas=new HashMap
(); /** * The map of current files and their timestamps (String fileName => Long lastMod) */ private Map currentDatas=new HashMap
(); /** * The map of new files and their timestamps (String fileName => Long lastMod) */ private Map newDatas=new HashMap
(); // ------------------------------------------------- // constructor // ------------------------------------------------- /** * Construct a new interval thread that will run on the given interval * with the given name. * @param threadName the name of the thread * @param directoryFullPath * @param interval the number of seconds to run the thread on */ public DirectoryMonitorSubject(String threadName,String directoryFullPath, int interval) { this.threadName=threadName; this.directoryFullPath=directoryFullPath; this.interval=interval; System.out.println("staring moditing direcotry "+directoryFullPath); } // ------------------------------------------------- // public method // ------------------------------------------------- /** * Start the thread on the specified interval. */ public void start() { active = true; //If we don't have a thread yet, make one and start it. if (runner == null && interval > 0) { runner = new Thread(this); runner.start(); } }//end start() /** * Stop the interval thread. */ public void stop() { active = false; } //end stop() public void run() { //Make this a relatively low level thread Thread.currentThread().setPriority(Thread.MIN_PRIORITY); //Pause this thread for the amount of the interval while(active){ try { setNewDatas(); Thread.sleep(interval); //监控时间间隔 } catch (InterruptedException e) { e.printStackTrace(); } }//end while }//end run public void direcotryChanged(){ setChanged(); notifyObservers(newDatas); //将新增加的数据传给observers } //end temperatureChanged() //监控目录下是否有新数据,如果有新数据就传给observers public void setNewDatas(){ if(checkNewDatas()){ //目录下有新数据 System.out.println("subject notice:have "+newDatas.size()+" data in "+directoryFullPath); direcotryChanged(); }//end if } //end setNewDatas() // ------------------------------------------------- // private method // ------------------------------------------------- /** * 检查目录下是否有新的数据(线程会反复调用该方法),并将新数据放入newDatas * @return 如果有新的数据返回ture,否则返回false */ private boolean checkNewDatas(){ boolean isHaveNewData=false; //清空先前的数据 prevDatas.clear(); newDatas.clear(); //将上次的数据先保存在 prevDatas prevDatas.putAll(currentDatas); currentDatas.clear(); //清空数据 //添加当前目录下的数据到currentDatas File direcotryFile=new File(directoryFullPath); File[] filesList=direcotryFile.listFiles(); for(File file:filesList){ currentDatas.put(file.getAbsolutePath(), new Long(file.lastModified())); }//end for //将当前目录下数据与先前目录下数据进行比较 Iterator currentIt=currentDatas.keySet().iterator(); while(currentIt.hasNext()){ String fileName=(String)currentIt.next(); Long lastModified = (Long) currentDatas.get(fileName); if(!prevDatas.containsKey(fileName)){ newDatas.put(fileName, lastModified); }//end if else if(prevDatas.containsKey(fileName)){ Long prevModified = (Long) prevDatas.get(fileName); if (prevModified.compareTo(lastModified) != 0){ newDatas.put(fileName, lastModified); }//end if }//end if }//end while if(newDatas.size()>0) isHaveNewData=true; return isHaveNewData; }//end checkNewDatas() }//end DirectoryWatcher
观测者类:上传数据到服务器
import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Observable;import java.util.Observer;/** * Fuction: *  * @author zhonghua  * @version 2013-3-20 下午9:38:54 * @since 1.0 */public class DataObserver implements Observer {	private Observable observable;	private Map newDatas=new HashMap
(); //新增加的数据 // ------------------------------------------------- public DataObserver(Observable observable){ this.observable=observable; observable.addObserver(this); } // ------------------------------------------------- //当得到subject的通知,做something,上传数据到FTP public void update(Observable obs, Object dataFromSubject) { if(obs instanceof DirectoryMonitorSubject){ DirectoryMonitorSubject dw=(DirectoryMonitorSubject)obs; newDatas.clear(); //先清空数据 newDatas.putAll((Map)dataFromSubject); uploadData(); }//end if }//end update() /** * 上传新数据到FTP */ private void uploadData(){ System.out.println("starting upload new data to ftp"); //这里就不写真正上传的代码了,直接输出 System.out.println("have upload "+newDatas.size()+" number data to ftp,they are:"); Iterator newDatasIt=newDatas.keySet().iterator(); while(newDatasIt.hasNext()){ String fileName=(String)newDatasIt.next(); Long lastModified=(Long)newDatas.get(fileName); System.out.println("fileName="+fileName+",lastModified="+lastModified.toString()); }//end while }//end loadNewData()} //end class FileListener
现在看下如何使用
import zyang.DirectoryMonitor.DataObserver;import zyang.DirectoryMonitor.DirectoryMonitorSubject;/**  * Fuction: * How to use subject and observers * @author   zhonghua * @since    1.0  */public class WatcheNewDataApp {	public static void main(String[] args) {		//subject		DirectoryMonitorSubject wp=new DirectoryMonitorSubject("moniteDirectory","E:\\temp", 2000);		//observers		DataObserver fl=new DataObserver(wp);		wp.start();	//开启监控守护线程	}//end main()}//end class WatcheNewDataApp
运行main函数,结果如下

观测者模式模版

       下面写了一个通用的观测者模式模版代码,用户只需要在对应地方加入自己的业务逻辑即可
sunbject类:只需要修改logicMethod方法中的业务逻辑即可。
public class YourSubject extends Observable {	// -------------------------------------------------	// constructor	// -------------------------------------------------	public YourSubject() { 	}	/**	 * Must have this method's content, this method is called by the logicMethod.	 * of course,you can change this method's name	 */	public void informationChanged(){		setChanged();		notifyObservers(); 		//该方法的参数用于subject和observers传递数据,向observers传递数据,observers在其update方法中使用传过来的数据//		notifyObservers(dataSendToObservers) 	} //end informationChanged()		/**	 * the subject only need do one thing: write your logic in this method	 */    public void logicMethod(){     	//write your logic code here    	//TODO    	System.out.println("subject notice:monitor information has changed, observers can do their things now."); //for example    	//end TODO    	    	informationChanged();    } //end logicMethod()    } //end class YourSubject
observer类:
只需要在
update方法中写入你的业务逻辑即可
public class OneObserver implements Observer {	private Observable observable;		// -------------------------------------------------	// constructor	// -------------------------------------------------	public OneObserver(Observable observable){		this.observable=observable;		observable.addObserver(this);	}		/**	 * the observer only need do one thing: write your logic in this method	 */	public void update(Observable obs, Object dataFromSubject) { 		if(obs instanceof YourSubject){			YourSubject yourSubject=(YourSubject)obs;			//write your logic code here			//TODO	    	System.out.println("observer:have upload to ftp"); //for example		}//end if	}//end update}//end class OneObserver
如何使用呢?见代码
/**  * Fuction: * the example shows how to use subject object and observers objects * @author zhonghua * @version 2013-8-26 下午9:25:56 * @since 1.0 */public class UseApp {	public static void main(String[] args) {		//subject		YourSubject wp=new YourSubject();		//observers		OneObserver clientA=new OneObserver(wp);		//notify observers,and observers do their things after have the notice		wp.logicMethod();	}//end main()}
上面的例子和模版已共享,。

推送功能

现在很多手机软件的推送功能,比如百度新闻,微信公众平台,其实很适合用观测者模式。发消息的服务端即时subject,接收消息的观测者observers即手机软件使用者。服务端监控消息,当有消息时通知多个观测者,并发送消息给观测者。
                

 

你可能感兴趣的文章
MEF简单学习笔记
查看>>
Srping - bean的依赖注入(Dependency injection)
查看>>
NSAutoreleasePool 用处
查看>>
import matplotlib.pyplot as plt出错
查看>>
常用集合与Dictionary用例
查看>>
MVC
查看>>
AI - TensorFlow - 张量(Tensor)
查看>>
js table 导出 Excel
查看>>
陶哲轩实分析 习题 12.2.4
查看>>
.Net Core库类项目跨项目读取配置文件
查看>>
ahjesus动态生成表达式树
查看>>
[摘录]华为绩效管理法
查看>>
AHSC DAY2总结
查看>>
java.lang.SecurityException: class "javax.servlet.FilterRegistration"(spark下maven)
查看>>
[Vue CLI 3] 配置解析之 css.extract
查看>>
Linux——信息采集(三)dmitry、路由跟踪命令tracerouter
查看>>
提取ipa里面的资源图片 png
查看>>
wxpython ItemContainer
查看>>
第一次冲刺--个人工作总结03
查看>>
Castle Windsor 使MVC Controller能够使用依赖注入
查看>>