Osmosis 工具代码解读
功能
先将 Osmosis 的主要功能列出如下,大体包括了 Osm 文件服务常用的功能。
XML Tasks
- 3.3.1–read-xml (–rx)
- outPipe.0 生产出一个entity Stream
- 3.3.2–fast-read-xml (no short option available)
- 3.3.3–write-xml (–wx)
- inPipe.0 消费一个 Entity Stream
- 3.3.4–read-xml-change (–rxc)
- 3.3.5–write-xml-change (–wxc)
Area Filtering Tasks
- 3.4.1–bounding-box (–bb)
- inPipe.0 outPipe.0
- 3.4.2–bounding-polygon (–bp)
Changeset Derivation and Merging 提供 osc 和 osm 的分离合并
- 3.5.1–derive-change (–dc)
- 比较两份osm,产出一份 osc
- inPipe.0 – entity stream. inPipe.1-- entity stream outPipe.0 – change stream
- 3.5.2–apply-change (–ac)
- 将一份更改应用到一份 osm 上
Pipeline Control 控制Pipeline 的结构,不操作数据
- 3.6.1–write-null (–wn)
- 弃掉所有的输入数据
- inPipe.0
- 3.6.2–write-null-change (–wnc)
- 3.6.3–buffer (–b)
- 可将一个Pipeline分离到多个线程上。input Task 的线程会将数据传送到一个固定容量大小的缓冲区中,当缓冲区满,block。该任务还会产生一个新Thread从缓冲区中读取数据,缓冲区空,block
- inPipe.0 outPipe.0
- 3.6.4–buffer-change (–bc)
- 3.6.5–log-progress (–lp)
- 加入到Pipeline中,固定时间间隔记录下log
- 3.6.6–log-progress-change(–lpc)
- 3.6.7–tee (–t)
- 接受一个数据流,将其送到多个目的地。当读取一个数据源并要在上面应用多个操作时可用
- inPipe.0
- outPipe.0 ~ n-1
- 3.6.8–tee-change (–tc)
- 3.6.9–read-empty (–rem)
- 3.6.10–ready-empty-change (–remc)
Set Manipulation Tasks
- 3.7.1–sort (–s)
- 3.7.2–sort-change (–sc)
- 3.7.3–merge (–m)
- 3.7.3.1Bound entity processing
- 将两份文件合成一份,使用前要先按照 TypeThenId sort
- 3.7.4–merge-change (–mc)
- 3.7.5–append-change (–apc)
- 3.7.6–simplify-change (–simc)
- 3.7.7–convert-change-to-full-history (–cctfh)
Data Manipulation Tasks
- 3.8.1–node-key (–nk)
- 给定 KeyList,所有 node 中只有有至少一个List中的key的才会被留下来
- 3.8.2–node-key-value (–nkv)
- 3.8.3–way-key (–wk)
- 3.8.4–way-key-value (–wkv)
- 3.8.5–tag-filter (–tf)
- 3.8.6–used-node (–un)
- 只留下那些被用到的 nodes
- 3.8.7–used-way (–uw)
- 3.8.8–tag-transform (–tt)
例子
下面通过两个例子演示 Osmosis 工具的基本使用方法。
osmosis \--read-xml input.osm \--tf accept-ways highway=* \ --tf reject-ways highway=motorway,motorway_link \--tf reject-relations \--used-node \--write-xml output.osm
- 只保留有 highway 标签的所有 way
- 保留下的 way 中,剔除掉 highway 标签值为 motorway, motorway_link 的way
- 删除所有 Relation
- 只保留被用到的 nodes
- 写出
osmosis
--rx /Users/cshi/Desktop/Momenta/Data/map.osm
--tf reject-relations
--tf accept-nodes speed=-1
--tf reject-ways --rx /Users/cshi/Desktop/Momenta/Data/map.osm
--tf reject-relations
--tf accept-ways highway=motorway
--used-node
--merge --wx /Users/cshi/Desktop/out.osm
- 如果 merge 一个线程中的两个输入,Osmosis 会发生死锁。拆成两个 Thread 执行
问题:这些任务如何编排?如何分类?哪些任务代表单起了一个线程?
结构
源码整体结构
.
├── build-support
├── gradle
├── osmosis-apidb
├── osmosis-areafilter
├── osmosis-core
├── osmosis-dataset
├── osmosis-extract
├── osmosis-hstore-jdbc
├── osmosis-osm-binary
├── osmosis-pbf
├── osmosis-pbf2
├── osmosis-pgsimple
├── osmosis-pgsnapshot
├── osmosis-replication
├── osmosis-replication-http
├── osmosis-set
├── osmosis-tagfilter
├── osmosis-tagtransform
├── osmosis-testutil
├── osmosis-xml
└── package
如何将任务分类,从而实现任务编排?
任务分类
按照是否主动启动线程分类:
- ActiveTaskManager—— Runnable
- 这类任务对单独启动一个线程执行,后变可以跟一些 PassiveTask。
- 例如: read-xml
- PassiveTaskManager
- 这类任务挂靠在前边任务的 Sink 中执行,不会单独启动线程
按照输入输出分类:对于每一个 Task, 输入为 Sink, 输出为 Source
- 输入—— (Multi)SInk
- 输出——(Multi)Source
通过这两种定义,任务可分为如下所有:
任务分类的实现
代码中,任务相关类有 Task, TaskManager, TaskManagerFactory, TaskManagerFactoryRegister 四个类直接相关的。
使用的是典型的工厂模式。 TaskManagerFactory 是单例的生产工厂,负责生产相应的类型的 TaskManager, 每个 Task 对应一个 TaskManager,负责着 Task 的产生,连接
任务之间如何连接——pipeline设计模式
Core分析
pipeline
-
TaskManager
-
TaskManager 分为 Active 和 Passive 两类。Active 类是主动执行的 Runnable 任务,会单独起一个 Thread,Passive 会 connect 到其他 Task 之后。
-
TaskManager , Pipeline 中的所有任务实例都对应一个 TaskManager,TaskManager 不负责产生任务,但负责链接任务的输入输出,并管理任务的调用。
-
三个抽象方法:
public abstract void connect(PipeTasks pipeTaskks);
连接 Source Task 作为输入,Sink Task 作为输出public abstract void execute()
Source Task 会在此开始一个Thread,Sink Task 什么都不做public abstract boolean waitForCompletion()
知道所有 Tasks 完成,返回 True。目标是确认运行在多个 Thread 上的 Source Tasks 运行结束。Sink Tassk 什么都不做
-
-
ActiveTaskManager extends TaskManager
-
execute()
都相同-
public void execute() {LOG.fine("Launching task " + getTaskId() + " in a new thread.");if (thread != null) {throw new OsmosisRuntimeException("Task " + getTaskId()+ " is already running.");}thread = new TaskRunner(getTask(), "Thread-" + getTaskId());thread.start();
-
}
```其中, `TaskRunner`是一个继承自`Thread`的类,构造函数为 ```java public TaskRunner(Runnable task, String name) { super(task, name); // Thread(Runnable target, String name)successful = true; //...
}
````getTask()` 为一个抽象函数,交给实现类去实现,返回一个 Runnable 的Task
protected abstract Runnable getTask();
实现 Runnable 的类示例```java public class XmlReader implements RunnableSource {//RunnableSource 是实现了 Source 和 Runnable 两个接口的接口public void run() {//...}
}
```waitForCompletion()
都相同
public boolean waitForCompletion() {LOG.fine("Waiting for task " + getTaskId() + " to complete.");if (thread != null) {boolean successful;try {thread.join();} catch (InterruptedException e) {// We are already in an error condition so log and continue.LOG.log(Level.WARNING, "The wait for task completion was interrupted.", e);}successful = thread.isSuccessful();if (!successful) {LOG.log(Level.SEVERE, "Thread for task " + getTaskId() + " failed", thread.getException());}thread = null;return successful;}return true;}
-
connect()
Source 的 TaskManagerpublic void connect(PipeTasks pipeTasks) {// Register the task as an output. A source only has one output, this// corresponds to pipe index 0.setOutputTask(pipeTasks, task, 0); }
Sink 的
public void connect(PipeTasks pipeTasks) {Source source;// Get the input task. A sink only has one input, this corresponds to// pipe index 0.source = (Source) getInputTask(pipeTasks, 0, Source.class);// Cast the input feed to the correct type.// Connect the tasks.source.setSink(task);}
MultiSinkSource
public void connect(PipeTasks pipeTasks) {// A multi sink receives multiple streams of data, so we must connect// them up one by one.for (int i = 0; i < task.getSinkCount(); i++) {Sink sink;Source source;// Retrieve the next sink.sink = task.getSink(i);// Retrieve the appropriate source.source = (Source) getInputTask(pipeTasks, i, Source.class);// Connect the tasks.source.setSink(sink);}// Register the source as an output task.setOutputTask(pipeTasks, task, 0);}
-
-
TaskManagerFactory
-
负责根据命令行参数实例化 Task 和 TaskManager
-
单例模式,全局复用
-
抽象类,由各个类型任务去具体实现
-
属性:
ThreadLocal<Set<String>> accessedTaskOptions
,储存任务的选项,如果有没有用到的 option ,抛出拼写错误的异常。使用 ThreadLocal ,保证每个进程安全访问自己的数据。 -
方法: accessedTaskOptions的初始化?
public TaskManager createTaskManager(TaskConfiguration taskConfig) {TaskManager taskManager;// Create a new accessed task options store.accessedTaskOptions.set(new HashSet<String>()); //往 ThreadLocal 中放数据taskManager = createTaskManagerImpl(taskConfig); // 交与实现类去实现for (String argName : taskConfig.getConfigArgs().keySet()) {if (!accessedTaskOptions.get().contains(argName)) {throw new OsmosisRuntimeException("Argument " + argName + " for task " + taskConfig.getId() + " was not recognised.");}}// Clear the accessed task options.accessedTaskOptions.set(null);return taskManager;}
-
-
TaskManagerFactoryRegister
维护一个 task manager factories 的集合。取相应的 TaskManagerFactory 的入口
属性:Map<String, TaskManagerFactory> factoryMap
-
TaskRegistrar
提供为了注册所有的 task factories 的初始化逻辑。
内部有一个TaskManagerFactoryRegister
,利用配置文件,将所有 TaskManagerFactory 放入其中。private void loadPluginClass(final String pluginClassName, final ClassLoader classLoader) {Class<?> untypedPluginClass;PluginLoader pluginLoader;Map<String, TaskManagerFactory> pluginTasks;// Load the plugin class.try {untypedPluginClass = classLoader.loadClass(pluginClassName);} catch (ClassNotFoundException e) {throw new OsmosisRuntimeException("Unable to load plugin class (" + pluginClassName + ").", e);}// Verify that the plugin implements the plugin loader interface.if (!PluginLoader.class.isAssignableFrom(untypedPluginClass)) {throw new OsmosisRuntimeException("The class (" + pluginClassName + ") does not implement interface ("+ PluginLoader.class.getName() + "). Maybe it's not a plugin?");}Class<PluginLoader> pluginClass = (Class<PluginLoader>) untypedPluginClass;// Instantiate the plugin loader.try {pluginLoader = pluginClass.newInstance();} catch (InstantiationException e) {throw new IllegalArgumentException("Unable to instantiate plugin class (" + pluginClassName + ").", e);} catch (IllegalAccessException e) {throw new IllegalArgumentException("Unable to instantiate plugin class (" + pluginClassName + ").", e);}// Obtain the plugin task factories with their names.pluginTasks = pluginLoader.loadTaskFactories();// Register the plugin tasks.for (Entry<String, TaskManagerFactory> taskEntry : pluginTasks.entrySet()) {factoryRegister.register(taskEntry.getKey(), taskEntry.getValue());}}
-
Pipeline
域:
TaskManagerFactoryRegister factoryRegister
List<TaskMnanger> taskManagers
从命令行指令中管理一个处理的 Pipeline,实例化任务并且等待他们完成
三步:prepare
——分为buildTasks
connectTasks
两步execute
waitForCompletion
-
Connect
用 pipeTasks 作为容器,放入所有 Task, 并把他们和 pipe 名 对应
对每一个 TaskManager ,依次connect(pipeTasks)
- Task 之间的连接,靠着每个 Task 内的 sink 来串联,分别 sink.complete()
Store
接口 StoreClassRegister
将类的标识符写入储存,以便在再次读取存储数据时可以标示该类。这为将多个类写入一个存储位置提供了基础