您当前的位置: 首页 >  宝哥大数据

MyCollector02---AppManager

宝哥大数据 发布时间:2017-10-27 16:49:17 ,浏览量:4

AppManager

1.1、appManager 提供一个队列,方便从kafka中消费的数据, 有缓存的地方

    /********data*****/
    /**
     * 消息队列
     * 由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选
     */
    protected LinkedBlockingQueue   kafkaQueue;

1.2创建一个变量noDatasToKafkaQueue, 用于管理是否向队列中写入数据

    /**
     * 停止向kafka队列中压入数据
     */
    private AtomicBoolean noDatasToKafkaQueue;




    /**
     * 向队列中压入数据
     * @param record    消息
     * @param timeout   超时时间
     * @param unit      时间单位
     * @return  是否成功压入队列
     * @throws InterruptedException
     */
    public boolean offerDataToQueue(ConsumerRecord record, long timeout, TimeUnit unit) 
            throws InterruptedException {
        return !noDatasToKafkaQueue.get() && kafkaQueue.offer(record, timeout, unit);
    }

    /**
     * 从队列中获取数据
     * @param timeout 
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public ConsumerRecord pollRecords(long timeout, TimeUnit unit) 
            throws InterruptedException {
        return kafkaQueue.poll(timeout, unit);
    }

1.3、 任务池存储运行的job, 任务调度Scheduler管理Job


    /**
     * 任务池
     */
    protected Map jobpool;
    /**
     * 任务调度
     */
    private Scheduler scheduler;

1.3.1、注册Job, 添加job到任务池jobpool, 注册job到Scheduler的busypool

    /**
     * 通过调用scheduler将job 注册到
     * @param job
     * @return
     */
    public boolean registerJob(Job job) {
        if (!scheduler.registerJob(job)) {
            LogUtils.log(getClass(), "the job-register of scheduler is unsuccessful .", LogUtils.WARN);
            return false;
        }
        jobpool.put(job.getID(), job);
        LogUtils.log(getClass(), "register " + job.getID() + " to scheduler");
        return true;
    }

1.3.2、删除Job

    /**
     * remove job 
     * @param jobId
     */
    public synchronized void removeJob(String jobId) {
        LogUtils.log(MyScheduler.class, "Thread-" + jobId + " removed");
        scheduler.removeJob(jobId);
        jobpool.remove(jobId);
    }

1.3.3、释放job

    /**
     * 释放运行的job
     * release job
     */
    public synchronized void releaseJob() {
        Map tmp = new HashMap(jobpool);
        for (Job job:tmp.values()) {
            if (job.isRun()) {
                removeJob(job.getID());
            }
        }
        scheduler.balanceThread();
    }

2、AppManager业务

2.1、初始化

    /**
     * 初始化AppManager
     *    第一步:初始化Cache
     *    第二步:初始化Schedule, 任务调度
     */
    public boolean init() {

        // initialize cache,加载appdata.conf
        if (!initCache()) return false;
        // init scheduler
        if (!initScheduler()) return false;

        return true;
    }

2.1.1、初始化CacheManager

读取配置文件到缓存中, CacheManager中实际业务, 就是将配置文件中的内容写到一个Map

    /**
     * 加载appdata.conf内容
     * @return
     */
    private boolean loadAppData() {
        LogUtils.log(getClass(), "Load AppData ......");
        appData.putAll(Configuration.readAppdata());
        return true;
    }

2.2、初始化Scheduler


    /**
     * com.nokia.server.AppManager.initScheduler()
* 根据appdata.conf中的参数设置初始化线程数
* 以及初始化Cheduler
* 启动scheduler * @return */ private boolean initScheduler() { scheduler = new Scheduler(); // appdata.conf minThread=10 int min = CacheManager.getInstance().getMinThread(); // appdata.conf maxThread=100 int max = CacheManager.getInstance().getMaxThread(); //根据appdata.conf中的minThead,和maxThread设置初始化scheduler的线程数 if (!scheduler.init(min, max)) { LogUtils.log(getClass(), "the initializtion of scheduler is unsuccessful .", LogUtils.WARN); return false; } /* //initialize scheduler,将SystemJob 以及业务Job注册到jobpool中 if (!Initializtion.getInstance().init()) { LogUtils.log(getClass(), "the job-initializtion of scheduler is unsuccessful .", LogUtils.WARN); return false; }*/ initJob(); // startup scheduler if (!scheduler.start()) { LogUtils.log(getClass(), "the startup of scheduler is unsuccessful .", LogUtils.WARN); return false; } return true; }

2.3、shutdown

    /**
     * 停止scheduler
     */
    public void shtudown() {
        if (scheduler != null) {
            LogUtils.println(getClass(), "begin shutdown scheduler");

            scheduler.shutdown();
        }
        LogUtils.println(getClass(), "AppManager:" + appkey + " shutdown");
    }

关注
打赏
查看更多评论

宝哥大数据

暂无认证

  • 4浏览

    0关注

    985博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录