Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@
import io.cdap.cdap.api.security.store.SecureStore;
import io.cdap.cdap.app.guice.AuditLogWriterModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.encryption.guice.UserCredentialAeadEncryptionModule;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.IOModule;
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
import io.cdap.cdap.common.guice.preview.PreviewDiscoveryRuntimeModule;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.common.utils.Networks;
import io.cdap.cdap.config.guice.ConfigStoreModule;
import io.cdap.cdap.data.runtime.DataSetServiceModules;
Expand All @@ -48,6 +50,8 @@
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService;
import io.cdap.cdap.data2.metadata.writer.MetadataServiceClient;
import io.cdap.cdap.data2.metadata.writer.NoOpMetadataServiceClient;
import io.cdap.cdap.internal.app.preview.PreviewRequestPollerInfoProvider;
import io.cdap.cdap.internal.app.preview.PreviewRunnerHttpHandlerInternal;
import io.cdap.cdap.internal.app.preview.PreviewRunnerService;
import io.cdap.cdap.internal.provision.ProvisionerModule;
import io.cdap.cdap.logging.appender.LogAppender;
Expand All @@ -56,18 +60,22 @@
import io.cdap.cdap.metadata.MetadataReaderWriterModules;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.guice.CoreSecurityRuntimeModule;
import io.cdap.cdap.security.guice.preview.PreviewSecureStoreModule;
import io.cdap.http.ChannelPipelineModifier;
import io.cdap.http.NettyHttpService;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContentDecompressor;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.common.Threads;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -88,22 +96,25 @@ public class DefaultPreviewRunnerManager extends AbstractIdleService implements
private final SecureStore secureStore;
private final TransactionSystemClient transactionSystemClient;
private final PreviewRunnerModule previewRunnerModule;
private final Set<PreviewRunnerService> previewRunnerServices;
// private final Set<PreviewRunnerService> previewRunnerServices;
private final LevelDBTableService previewLevelDBTableService;
private final PreviewRunnerServiceFactory previewRunnerServiceFactory;
private final DiscoveryService discoveryService;
private NettyHttpService httpService;
private Cancellable cancelDiscovery;
private PreviewRunner runner;
private final PreviewRequestPollerInfoProvider pollerInfoProvider;

@Inject
DefaultPreviewRunnerManager(@Named(PreviewConfigModule.PREVIEW_CCONF) CConfiguration previewCConf,
@Named(PreviewConfigModule.PREVIEW_HCONF) Configuration previewHConf,
@Named(PreviewConfigModule.PREVIEW_SCONF) SConfiguration previewSConf,
SecureStore secureStore,
DiscoveryServiceClient discoveryServiceClient,
SecureStore secureStore, DiscoveryServiceClient discoveryServiceClient,
@Named(DataSetsModules.BASE_DATASET_FRAMEWORK) DatasetFramework datasetFramework,
TransactionSystemClient transactionSystemClient,
PreviewRunnerModule previewRunnerModule,
TransactionSystemClient transactionSystemClient, PreviewRunnerModule previewRunnerModule,
@Named(PreviewConfigModule.PREVIEW_LEVEL_DB) LevelDBTableService previewLevelDBService,
PreviewRunnerServiceFactory previewRunnerServiceFactory) {
PreviewRunnerServiceFactory previewRunnerServiceFactory, DiscoveryService discoveryService,
PreviewRequestPollerInfoProvider pollerInfoProvider) {
this.previewCConf = previewCConf;
this.previewHConf = previewHConf;
this.previewSConf = previewSConf;
Expand All @@ -112,10 +123,12 @@ public class DefaultPreviewRunnerManager extends AbstractIdleService implements
this.discoveryServiceClient = discoveryServiceClient;
this.transactionSystemClient = transactionSystemClient;
this.maxConcurrentPreviews = previewCConf.getInt(Constants.Preview.POLLER_COUNT);
this.previewRunnerServices = ConcurrentHashMap.newKeySet();
// this.previewRunnerServices = ConcurrentHashMap.newKeySet();
this.previewRunnerModule = previewRunnerModule;
this.previewLevelDBTableService = previewLevelDBService;
this.previewRunnerServiceFactory = previewRunnerServiceFactory;
this.discoveryService = discoveryService;
this.pollerInfoProvider = pollerInfoProvider;
}

@Override
Expand All @@ -127,19 +140,50 @@ protected void startUp() throws Exception {
((Service) runner).startAndWait();
}

// Create and start the preview poller services.
for (int i = 0; i < maxConcurrentPreviews; i++) {
createPreviewRunnerService().startAndWait();
NettyHttpService.Builder builder = NettyHttpService.builder(
Constants.Service.PREVIEW_RUNNER_HTTP).setHost("0.0.0.0").setPort(44317)
.setExecThreadPoolSize(previewCConf.getInt(Constants.Preview.EXEC_THREADS))
.setBossThreadPoolSize(previewCConf.getInt(Constants.Preview.BOSS_THREADS))
.setWorkerThreadPoolSize(previewCConf.getInt(Constants.Preview.WORKER_THREADS))
.setChannelPipelineModifier(new ChannelPipelineModifier() {
@Override
public void modify(ChannelPipeline pipeline) {
pipeline.addAfter("compressor", "decompressor", new HttpContentDecompressor());
}
}).setHttpHandlers(
new PreviewRunnerHttpHandlerInternal(previewCConf, runner, this::stopPreview,
pollerInfoProvider));

if (previewCConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED)) {
new HttpsEnabler().configureKeyStore(previewCConf, previewSConf).enable(builder);
}
this.httpService = builder.build();

LOG.debug("sidhdirenge - Starting PreviewRunnerHttpService");
httpService.start();
cancelDiscovery = discoveryService.register(ResolvingDiscoverable.of(
URIScheme.createDiscoverable(Constants.Service.PREVIEW_RUNNER_HTTP, httpService)));
LOG.debug("sidhdirenge - Starting PreviewRunnerHttpService has completed");
}

private void stopPreview(ProgramId s) {
LOG.info("Stop called for preview {}", s.getApplication());
stop();
}

@Override
protected void shutDown() throws Exception {
// Should stop the polling service, hence individual preview runs, before stopping the top level preview runner.
previewRunnerServices.forEach(this::stopQuietly);
// previewRunnerServices.forEach(this::stopQuietly);
if (runner instanceof Service) {
stopQuietly((Service) runner);
}
LOG.debug("sidhdirenge - Shutting down PreviewRunnerHttpService");
if (httpService != null) {
httpService.stop(1, 2, TimeUnit.SECONDS);
}
cancelDiscovery.cancel();
LOG.debug("sidhdirenge - Shutting down PreviewRunnerHttpService has completed");
}

private void stopQuietly(Service service) {
Expand All @@ -152,63 +196,56 @@ private void stopQuietly(Service service) {

@Override
public void stop(ApplicationId preview) throws Exception {
PreviewRunnerService runnerService = previewRunnerServices.stream()
.filter(r -> r.getPreviewApplication().filter(preview::equals).isPresent())
.findFirst()
.orElse(null);

if (runnerService == null) {
throw new NotFoundException(
"Preview run cannot be stopped. Please try stopping again or start new preview run.");
}
// PreviewRunnerService runnerService = previewRunnerServices.stream()
// .filter(r -> r.getPreviewApplication().filter(preview::equals).isPresent()).findFirst()
// .orElse(null);
//
// if (runnerService == null) {
// throw new NotFoundException(
// "Preview run cannot be stopped. Please try stopping again or start new preview run.");
// }

PreviewRunnerService newRunnerService = createPreviewRunnerService();
runnerService.stopAndWait();
newRunnerService.startAndWait();
// PreviewRunnerService newRunnerService = createPreviewRunnerService();
// runnerService.stopAndWait();
// newRunnerService.startAndWait();
LOG.info("This stop was called");
stop();
}

/**
* Create injector for the given application id.
*/
@VisibleForTesting
public Injector createPreviewInjector() {
return Guice.createInjector(
new ConfigModule(previewCConf, previewHConf, previewSConf),
new IOModule(),
RemoteAuthenticatorModules.getDefaultModule(),
return Guice.createInjector(new ConfigModule(previewCConf, previewHConf, previewSConf),
new IOModule(), RemoteAuthenticatorModules.getDefaultModule(),
new CoreSecurityRuntimeModule().getInMemoryModules(),
new AuditLogWriterModule(previewCConf).getInMemoryModules(),
new UserCredentialAeadEncryptionModule(),
new AuthenticationContextModules().getMasterWorkerModule(),
new PreviewSecureStoreModule(secureStore),
new PreviewDiscoveryRuntimeModule(discoveryServiceClient),
new ConfigStoreModule(),
previewRunnerModule,
new ProgramRunnerRuntimeModule().getStandaloneModules(),
new PreviewDiscoveryRuntimeModule(discoveryServiceClient), new ConfigStoreModule(),
previewRunnerModule, new ProgramRunnerRuntimeModule().getStandaloneModules(),
new PreviewDataModules().getDataFabricModule(transactionSystemClient,
previewLevelDBTableService),
new PreviewDataModules().getDataSetsModule(datasetFramework),
new DataSetServiceModules().getStandaloneModules(),
// Use the in-memory module for metrics collection, which metrics still get persisted to dataset, but
// save threads for reading metrics from TMS, as there won't be metrics in TMS.
new MetricsClientRuntimeModule().getInMemoryModules(),
new AbstractModule() {
new MetricsClientRuntimeModule().getInMemoryModules(), new AbstractModule() {
@Override
protected void configure() {
bind(LogAppender.class).to(PreviewTMSLogAppender.class).in(Scopes.SINGLETON);
}
},
new MessagingServerRuntimeModule().getInMemoryModules(),
}, new MessagingServerRuntimeModule().getInMemoryModules(),
Modules.override(new MetadataReaderWriterModules().getInMemoryModules())
.with(new AbstractModule() {
@Override
protected void configure() {
// we don't start a metadata service in preview, so don't attempt to create any metadata
bind(MetadataServiceClient.class).to(NoOpMetadataServiceClient.class);
}
}),
new ProvisionerModule(),
new AbstractModule() {
}), new ProvisionerModule(), new AbstractModule() {
@Override
protected void configure() {
}
Expand All @@ -220,34 +257,7 @@ public InetAddress providesHostname(CConfiguration cConf) {
String address = cConf.get(Constants.Preview.ADDRESS);
return Networks.resolve(address, new InetSocketAddress("localhost", 0).getAddress());
}
}
);
});
}

/**
* Creates a {@link PreviewRunnerService}. It will automatically added to and removed from the
* {@link #previewRunnerServices} set.
*/
private PreviewRunnerService createPreviewRunnerService() {
PreviewRunnerService previewRunnerService = previewRunnerServiceFactory.create(runner);

previewRunnerService.addListener(new ServiceListenerAdapter() {

@Override
public void terminated(State from) {
previewRunnerServices.remove(previewRunnerService);
if (previewRunnerServices.isEmpty()) {
try {
stop();
} catch (Exception e) {
// should not happen
LOG.error("Failed to shutdown the preview runner manager service.", e);
}
}
}
}, Threads.SAME_THREAD_EXECUTOR);

previewRunnerServices.add(previewRunnerService);
return previewRunnerService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ Map<String, List<JsonElement>> getData(ApplicationId applicationId, String trace
* @param pollerInfo information about the poller
* @return {@code PreviewRequest} if such request is available in the queue
*/
@Deprecated
Optional<PreviewRequest> poll(@Nullable byte[] pollerInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface PreviewRequestQueue {
*/
Optional<PreviewRequest> poll(@Nullable byte[] pollerInfo);

Optional<PreviewRequest> poll();

/**
* Add a preview request in the queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ protected void configure() {
.annotatedWith(Names.named(DataSetsModules.BASE_DATASET_FRAMEWORK))
.to(RemoteDatasetFramework.class);

// TODO(sidhdirenge) - this can be removed.
bind(PreviewRequestFetcher.class).to(RemotePreviewRequestFetcher.class)
.in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.app.preview.PreviewManager;
import io.cdap.cdap.app.preview.PreviewRequest;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.http.AbstractHttpHandler;
import io.cdap.http.HttpHandler;
Expand Down Expand Up @@ -52,15 +50,8 @@ public class PreviewHttpHandlerInternal extends AbstractHttpHandler {
@POST
@Path("/requests/pull")
public void poll(FullHttpRequest request, HttpResponder responder) {
byte[] pollerInfo = Bytes.toBytes(request.content().nioBuffer());
PreviewRequest previewRequest = previewManager.poll(pollerInfo).orElse(null);

if (previewRequest != null) {
LOG.debug("Send preview request {} to poller {}", previewRequest.getProgram(),
Bytes.toString(pollerInfo));
responder.sendString(HttpResponseStatus.OK, GSON.toJson(previewRequest));
} else {
responder.sendStatus(HttpResponseStatus.OK);
}
// TODO(sidhdirenge): Deprecate.
LOG.info("sidhdirenge - this should not be called");
responder.sendStatus(HttpResponseStatus.OK);
}
}
Loading
Loading