Osmosis 解析

2023-09-25 13 0

Osmosis 工具代码解读

功能

先将 Osmosis 的主要功能列出如下,大体包括了 Osm 文件服务常用的功能。

XML Tasks

  • 3.3.1–read-xml (–rx)
    • outPipe.0 生产出一个entity Stream
  • 3.3.2–fast-read-xml (no short option available)
  • 3.3.3–write-xml (–wx)
    • inPipe.0 消费一个 Entity Stream
  • 3.3.4–read-xml-change (–rxc)
  • 3.3.5–write-xml-change (–wxc)

Area Filtering Tasks

  • 3.4.1–bounding-box (–bb)
    • inPipe.0 outPipe.0
  • 3.4.2–bounding-polygon (–bp)

Changeset Derivation and Merging 提供 osc 和 osm 的分离合并

  • 3.5.1–derive-change (–dc)
    • 比较两份osm,产出一份 osc
    • inPipe.0 – entity stream. inPipe.1-- entity stream outPipe.0 – change stream
  • 3.5.2–apply-change (–ac)
    • 将一份更改应用到一份 osm 上

Pipeline Control 控制Pipeline 的结构,不操作数据

  • 3.6.1–write-null (–wn)
    • 弃掉所有的输入数据
    • inPipe.0
  • 3.6.2–write-null-change (–wnc)
  • 3.6.3–buffer (–b)
    • 可将一个Pipeline分离到多个线程上。input Task 的线程会将数据传送到一个固定容量大小的缓冲区中,当缓冲区满,block。该任务还会产生一个新Thread从缓冲区中读取数据,缓冲区空,block
    • inPipe.0 outPipe.0
  • 3.6.4–buffer-change (–bc)
  • 3.6.5–log-progress (–lp)
    • 加入到Pipeline中,固定时间间隔记录下log
  • 3.6.6–log-progress-change(–lpc)
  • 3.6.7–tee (–t)
    • 接受一个数据流,将其送到多个目的地。当读取一个数据源并要在上面应用多个操作时可用
    • inPipe.0
    • outPipe.0 ~ n-1
  • 3.6.8–tee-change (–tc)
  • 3.6.9–read-empty (–rem)
  • 3.6.10–ready-empty-change (–remc)

Set Manipulation Tasks

  • 3.7.1–sort (–s)
  • 3.7.2–sort-change (–sc)
  • 3.7.3–merge (–m)
    • 3.7.3.1Bound entity processing
    • 将两份文件合成一份,使用前要先按照 TypeThenId sort
  • 3.7.4–merge-change (–mc)
  • 3.7.5–append-change (–apc)
  • 3.7.6–simplify-change (–simc)
  • 3.7.7–convert-change-to-full-history (–cctfh)

Data Manipulation Tasks

  • 3.8.1–node-key (–nk)
    • 给定 KeyList,所有 node 中只有有至少一个List中的key的才会被留下来
  • 3.8.2–node-key-value (–nkv)
  • 3.8.3–way-key (–wk)
  • 3.8.4–way-key-value (–wkv)
  • 3.8.5–tag-filter (–tf)
  • 3.8.6–used-node (–un)
    • 只留下那些被用到的 nodes
  • 3.8.7–used-way (–uw)
  • 3.8.8–tag-transform (–tt)

例子

下面通过两个例子演示 Osmosis 工具的基本使用方法。

osmosis \--read-xml input.osm \--tf accept-ways highway=* \ --tf reject-ways highway=motorway,motorway_link \--tf reject-relations \--used-node \--write-xml output.osm
  • 只保留有 highway 标签的所有 way
  • 保留下的 way 中,剔除掉 highway 标签值为 motorway, motorway_link 的way
  • 删除所有 Relation
  • 只保留被用到的 nodes
  • 写出
osmosis 
--rx /Users/cshi/Desktop/Momenta/Data/map.osm 
--tf reject-relations 
--tf accept-nodes speed=-1  
--tf reject-ways --rx /Users/cshi/Desktop/Momenta/Data/map.osm  
--tf reject-relations 
--tf accept-ways highway=motorway 
--used-node 
--merge --wx /Users/cshi/Desktop/out.osm
  • 如果 merge 一个线程中的两个输入,Osmosis 会发生死锁。拆成两个 Thread 执行

问题:这些任务如何编排?如何分类?哪些任务代表单起了一个线程?

结构

源码整体结构
.
├── build-support
├── gradle
├── osmosis-apidb
├── osmosis-areafilter
├── osmosis-core
├── osmosis-dataset
├── osmosis-extract
├── osmosis-hstore-jdbc
├── osmosis-osm-binary
├── osmosis-pbf
├── osmosis-pbf2
├── osmosis-pgsimple
├── osmosis-pgsnapshot
├── osmosis-replication
├── osmosis-replication-http
├── osmosis-set
├── osmosis-tagfilter
├── osmosis-tagtransform
├── osmosis-testutil
├── osmosis-xml
└── package

如何将任务分类,从而实现任务编排?

任务分类

按照是否主动启动线程分类:

  • ActiveTaskManager—— Runnable
    • 这类任务对单独启动一个线程执行,后变可以跟一些 PassiveTask。
    • 例如: read-xml
  • PassiveTaskManager
    • 这类任务挂靠在前边任务的 Sink 中执行,不会单独启动线程

按照输入输出分类:对于每一个 Task, 输入为 Sink, 输出为 Source

  • 输入—— (Multi)SInk
  • 输出——(Multi)Source

通过这两种定义,任务可分为如下所有:
在这里插入图片描述

任务分类的实现

代码中,任务相关类有 Task, TaskManager, TaskManagerFactory, TaskManagerFactoryRegister 四个类直接相关的。
使用的是典型的工厂模式。 TaskManagerFactory 是单例的生产工厂,负责生产相应的类型的 TaskManager, 每个 Task 对应一个 TaskManager,负责着 Task 的产生,连接

任务之间如何连接——pipeline设计模式

Core分析

pipeline

  • TaskManager

    • TaskManager 分为 Active 和 Passive 两类。Active 类是主动执行的 Runnable 任务,会单独起一个 Thread,Passive 会 connect 到其他 Task 之后。

    • TaskManager , Pipeline 中的所有任务实例都对应一个 TaskManager,TaskManager 不负责产生任务,但负责链接任务的输入输出,并管理任务的调用。

    • 三个抽象方法:

      • public abstract void connect(PipeTasks pipeTaskks);
        连接 Source Task 作为输入,Sink Task 作为输出
      • public abstract void execute()
        Source Task 会在此开始一个Thread,Sink Task 什么都不做
      • public abstract boolean waitForCompletion()
        知道所有 Tasks 完成,返回 True。目标是确认运行在多个 Thread 上的 Source Tasks 运行结束。Sink Tassk 什么都不做
  • ActiveTaskManager extends TaskManager

    • execute() 都相同

      • public void execute() {LOG.fine("Launching task " + getTaskId() + " in a new thread.");if (thread != null) {throw new OsmosisRuntimeException("Task " + getTaskId()+ " is already running.");}thread = new TaskRunner(getTask(), "Thread-" + getTaskId());thread.start();
        

    }
    ```

    其中, `TaskRunner`是一个继承自`Thread`的类,构造函数为 ```java
    public TaskRunner(Runnable task, String name) {		super(task, name);	// Thread(Runnable target, String name)successful = true;
    //...
    

    }
    ```

    `getTask()` 为一个抽象函数,交给实现类去实现,返回一个 Runnable 的Task
    
    protected abstract Runnable getTask();
    
    实现 Runnable 的类示例```java
    public class XmlReader implements RunnableSource {//RunnableSource 是实现了 Source 和 Runnable 两个接口的接口public void run() {//...}
    

    }
    ```

    • waitForCompletion() 都相同
    	public boolean waitForCompletion() {LOG.fine("Waiting for task " + getTaskId() + " to complete.");if (thread != null) {boolean successful;try {thread.join();} catch (InterruptedException e) {// We are already in an error condition so log and continue.LOG.log(Level.WARNING, "The wait for task completion was interrupted.", e);}successful = thread.isSuccessful();if (!successful) {LOG.log(Level.SEVERE, "Thread for task " + getTaskId() + " failed", thread.getException());}thread = null;return successful;}return true;}
    
    • connect()
      Source 的 TaskManager

      public void connect(PipeTasks pipeTasks) {// Register the task as an output. A source only has one output, this// corresponds to pipe index 0.setOutputTask(pipeTasks, task, 0);
      }
      

      Sink 的

      	public void connect(PipeTasks pipeTasks) {Source source;// Get the input task. A sink only has one input, this corresponds to// pipe index 0.source = (Source) getInputTask(pipeTasks, 0, Source.class);// Cast the input feed to the correct type.// Connect the tasks.source.setSink(task);}
      

      MultiSinkSource

      	public void connect(PipeTasks pipeTasks) {// A multi sink receives multiple streams of data, so we must connect// them up one by one.for (int i = 0; i < task.getSinkCount(); i++) {Sink sink;Source source;// Retrieve the next sink.sink = task.getSink(i);// Retrieve the appropriate source.source = (Source) getInputTask(pipeTasks, i, Source.class);// Connect the tasks.source.setSink(sink);}// Register the source as an output task.setOutputTask(pipeTasks, task, 0);}
      
  • TaskManagerFactory

    • 负责根据命令行参数实例化 Task 和 TaskManager

    • 单例模式,全局复用

    • 抽象类,由各个类型任务去具体实现

    • 属性:ThreadLocal<Set<String>> accessedTaskOptions,储存任务的选项,如果有没有用到的 option ,抛出拼写错误的异常。使用 ThreadLocal ,保证每个进程安全访问自己的数据。

    • 方法: accessedTaskOptions的初始化?

      	public TaskManager createTaskManager(TaskConfiguration taskConfig) {TaskManager taskManager;// Create a new accessed task options store.accessedTaskOptions.set(new HashSet<String>());	//往 ThreadLocal 中放数据taskManager = createTaskManagerImpl(taskConfig); // 交与实现类去实现for (String argName : taskConfig.getConfigArgs().keySet()) {if (!accessedTaskOptions.get().contains(argName)) {throw new OsmosisRuntimeException("Argument " + argName + " for task " + taskConfig.getId() + " was not recognised.");}}// Clear the accessed task options.accessedTaskOptions.set(null);return taskManager;}
      
  • TaskManagerFactoryRegister
    维护一个 task manager factories 的集合。取相应的 TaskManagerFactory 的入口
    属性: Map<String, TaskManagerFactory> factoryMap

  • TaskRegistrar
    提供为了注册所有的 task factories 的初始化逻辑。
    内部有一个 TaskManagerFactoryRegister ,利用配置文件,将所有 TaskManagerFactory 放入其中。

    	private void loadPluginClass(final String pluginClassName, final ClassLoader classLoader) {Class<?> untypedPluginClass;PluginLoader pluginLoader;Map<String, TaskManagerFactory> pluginTasks;// Load the plugin class.try {untypedPluginClass = classLoader.loadClass(pluginClassName);} catch (ClassNotFoundException e) {throw new OsmosisRuntimeException("Unable to load plugin class (" + pluginClassName + ").", e);}// Verify that the plugin implements the plugin loader interface.if (!PluginLoader.class.isAssignableFrom(untypedPluginClass)) {throw new OsmosisRuntimeException("The class (" + pluginClassName + ") does not implement interface ("+ PluginLoader.class.getName() + "). Maybe it's not a plugin?");}Class<PluginLoader> pluginClass = (Class<PluginLoader>) untypedPluginClass;// Instantiate the plugin loader.try {pluginLoader = pluginClass.newInstance();} catch (InstantiationException e) {throw new IllegalArgumentException("Unable to instantiate plugin class (" + pluginClassName + ").", e);} catch (IllegalAccessException e) {throw new IllegalArgumentException("Unable to instantiate plugin class (" + pluginClassName + ").", e);}// Obtain the plugin task factories with their names.pluginTasks = pluginLoader.loadTaskFactories();// Register the plugin tasks.for (Entry<String, TaskManagerFactory> taskEntry : pluginTasks.entrySet()) {factoryRegister.register(taskEntry.getKey(), taskEntry.getValue());}}
    
  • Pipeline

    域:

    • TaskManagerFactoryRegister factoryRegister
    • List<TaskMnanger> taskManagers

    从命令行指令中管理一个处理的 Pipeline,实例化任务并且等待他们完成
    三步:

    • prepare——分为 buildTasks connectTasks 两步
    • execute
    • waitForCompletion
  • Connect
    用 pipeTasks 作为容器,放入所有 Task, 并把他们和 pipe 名 对应
    对每一个 TaskManager ,依次 connect(pipeTasks)

    • Task 之间的连接,靠着每个 Task 内的 sink 来串联,分别 sink.complete()

Store

接口 StoreClassRegister

将类的标识符写入储存,以便在再次读取存储数据时可以标示该类。这为将多个类写入一个存储位置提供了基础

代码编程
赞赏

相关文章

不伤原图电脑在线去水印网站
在线视频转换器工具
图片如何在线无痕去除水印
图片带满屏水印怎么去除?
超详细Hyperledger Fabric2.3.3开发教程
入门卷积神经网络必备,基础、理论、实战一网打尽!